Skip to main content

The fastest way to deploy a production langchain API.

Project description

Steamship Python Client Library For LangChain (🦜️🔗)

Steamship is the fastest way to build, ship, and use full-lifecycle language AI.

This repository contains LangChain adapters for Steamship, enabling LangChain developers to rapidly deploy their apps on Steamship to automatically get:

  • Production-ready API endpoint(s)
  • Horizontal scaling across dependencies / backends
  • Persistent storage of app state (including caches)
  • Built-in support for Authn/z
  • Multi-tenancy support
  • Seamless integration with other Steamship skills (ex: audio transcription)
  • Usage Metrics and Logging
  • And much more...

Installing

Install via pip:

pip install steamship-langchain

Examples

Here are a few examples of using LangChain on Steamship.

The examples use temporary workspaces to provide full cleanup during experimentation. Workspaces provide a unit of tenant isolation within Steamship. For production uses, persistent workspaces can be created and retrieved via Steamship(workspace_handle="my_workspace") .

NOTE Thesee examples omit import blocks. Please See the examples/ directory for complete code.

NOTE Client examples assume that the user has a Steamship API key and that it is exposed to the environment (see: API Keys)

Basic Prompting

Example of a basic prompt using a Steamship LLM integration.

Run on Repl.it

Server Snippet

@post("greet")
def greet(self, user: str) -> str:
    prompt = PromptTemplate(
      input_variables=["user"],
      template=
      "Create a welcome message for user {user}. Thank them for running their LangChain app on Steamship. "
      "Encourage them to deploy their app via `ship deploy` when ready.",
    )
    llm = SteamshipGPT(client=self.client, temperature=0.8)
    return llm(prompt.format(user=user))

Client Snippet

with Steamship.temporary_workspace() as client:
    api = client.use("my-langchain-app")
    while True:
        name = input("Name: ")
        print(f'{api.invoke("/greet", user=name).strip()}\n')

Self Ask With Search

Executes the LangChain self-ask-with-search agent using production Steamship GPT LLM and SERP Tool.

Run on Repl.it

Server Snippet

@post("/self_ask_with_search")
def self_ask_with_search(self, query: str) -> str:
    llm = SteamshipGPT(client=self.client, temperature=0.0, cache=True)
    serp_tool = SteamshipSERP(client=self.client, cache=True)
    tools = [Tool(name="Intermediate Answer", func=serp_tool.search)]
    self_ask_with_search = initialize_agent(tools, llm, agent="self-ask-with-search", verbose=False)
    return self_ask_with_search.run(query)

Client Snippet

with Steamship.temporary_workspace() as client:
    api = client.use("my-langchain-app")
    query = "Who was president the last time the Twins won the World Series?"
    print(f"Query: {query}")
    print(f"Answer: {api.invoke('/self_ask_with_search', query=query)}")

ChatBot

Implements a basic Chatbot (similar to ChatGPT) in Steamship with LangChain.

Run on Repl.it

Server Snippet

@post("/send_message")
def send_message(self, message: str, chat_history_handle: str) -> str:
    mem = SteamshipPersistentConversationWindowMemory(client=self.client,
                                                      file_handle=chat_history_handle,
                                                      k=2)
    chatgpt = LLMChain(
        llm=SteamshipGPT(client=self.client, temperature=0), 
        prompt=CHATBOT_PROMPT, 
        memory=mem,
    )
    
    return chatgpt.predict(human_input=message)

Client Snippet

with Steamship.temporary_workspace() as client:
    api = client.use("my-langchain-app")
    session_handle = "foo-user-session-1234"
    while True:
        msg = input("> ")
        print(f"{api.invoke('/send_message', message=msg, chat_history_handle=session_handle)}")

Summarize Audio (Async Chaining)

This provides an example of using LangChain to process audio transcriptions obtained via Steamship's speech-to-text plugins (here, we use Whisper).

A brief introduction to the Task system (and Task dependencies, for chaining) is provided in this example. Here, we use task.wait() style polling, but time-based task.refresh() style polling, etc., is also available.

Run on Repl.it

Server Snippet

@post("summarize_file")
def summarize_file(self, file_handle: str) -> str:
    file = File.get(self.client, handle=file_handle)
    text_splitter = CharacterTextSplitter()
    texts = []
    for block in file.blocks:
        texts.extend(text_splitter.split_text(block.text))
    docs = [Document(page_content=t) for t in texts]
    llm = SteamshipGPT(client=self.client, cache=True)
    chain = load_summarize_chain(llm, chain_type="map_reduce")
    return chain.run(docs)

@post("summarize_audio_file")
def summarize_audio_file(self, audio_file_handle: str) -> Task[str]:
    transcriber = self.client.use_plugin("whisper-s2t-blockifier")
    audio_file = File.get(self.client, handle=audio_file_handle)
    transcribe_task = audio_file.blockify(plugin_instance=transcriber.handle)
    return self.invoke_later("summarize_file", wait_on_tasks=[transcribe_task], arguments={"file_handle": audio_file.handle})

Client Snippet

churchill_yt_url = "https://www.youtube.com/watch?v=MkTw3_PmKtc"

with Steamship.temporary_workspace() as client:
    api = client.use("my-langchain-app")
    yt_importer = client.use_plugin("youtube-file-importer")
    import_task = File.create_with_plugin(client=client,
                                         plugin_instance=yt_importer.handle, 
                                         url=churchill_yt_url)
    import_task.wait()
    audio_file = import_task.output
    
    summarize_task_response = api.invoke("/summarize_audio_file", audio_file_handle=audio_file.handle)
    summarize_task = Task(client=client, **summarize_task_response)
    summarize_task.wait()
    
    if summarize_task.state == TaskState.succeeded:
      summary = base64.b64decode(summarize_task.output).decode("utf-8")
      print(f"Summary: {summary.strip()}")

Question Answering with Sources (Embeddings)

Server Snippet

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    # create a persistent embedding store
    self.index = self.client.use_plugin(
        "embedding-index",
        config={
            "embedder": {
                "plugin_handle": "openai-embedder",
                "fetch_if_exists": True,
                "config": {
                    "model": "text-similarity-curie-001",
                    "dimensionality": 4096,
                }
            }
        },
        fetch_if_exists=True,
    )
    
@post("embed_file")
def embed_file(self, file_handle: str) -> bool:
    text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
    texts = []
    file = File.get(self.client, handle=file_handle)
    for block in file.blocks:
        texts.extend(text_splitter.split_text(block.text))

    items = [Tag(client=self.client, text=t, value={"source": f"{file.handle}-offset-{i*500}"})
             for i, t in enumerate(texts)]

    self.index.insert(items)
    return True

@post("search_embeddings")
def search_embeddings(self, query: str, k: int) -> List[SearchResult]:
    """Return the `k` closest items in the embedding index."""
    search_results = self.index.search(query, k=k)
    search_results.wait()
    items = search_results.output.items
    return items

@post("/qa_with_sources")
def qa_with_sources(self, query: str) -> Dict[str, Any]:
    llm = SteamshipGPT(client=self.client, temperature=0, cache=True)
    chain = load_qa_with_sources_chain(llm, chain_type="stuff")
    search_results = self.search_embeddings(query, k=4)
    docs = [Document(page_content=result.tag.text, metadata={"source": result.tag.value.get("source", "unknown")})
            for result in search_results]
    return chain({"input_documents": docs, "question": query})

Client Snippet

with Steamship.temporary_workspace() as client:
    api = client.use("my-langchain-ap")
    
    # Embed the State of the Union address
    with open("state-of-the-union.txt") as f:
        sotu_file = File.create(self.client, blocks=[Block(text=f.readlines())])
    
    api.invoke("/embed_file", file_handle=sotu_file)

    # Issue Query
    query = "What did the president say about Justice Breyer?"
    print(f"------\nQuery: {query}")
    response = api.invoke('/qa_with_sources', query=query)
    print(f"Answer: {response['output_text']}")

    # Print source
    # NB: assumes a single source is used in response
    last_line = response['output_text'].splitlines()[-1:][0]
    source = last_line[len("SOURCES: "):]
    print(f"------\nSource text ({source}):")
    for input_doc in response['input_documents']:
        metadata = input_doc['metadata']
        src = metadata['source']
        if source == src:
            print(input_doc['page_content'])
            break

API Keys

Steamship API Keys provide access to our SDK for AI models, including OpenAI, GPT, Cohere, Whisper, and more.

Get your free API key here: https://steamship.com/account/api.

Once you have an API Key, you can :

  • Set the env var STEAMSHIP_API_KEY for your client
  • Pass it directly via Steamship(api_key=) or Steamship.tempory_workspace(api_key=).

Alternatively, you can run ship login, which will guide you through setting up your environment.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

steamship-langchain-0.0.6.tar.gz (26.7 kB view hashes)

Uploaded Source

Built Distribution

steamship_langchain-0.0.6-py3-none-any.whl (12.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page