Framework for building LLM-based apps in Boehringer Ingelheim
Project description
tangbao
This is a Python package for building LLM-based applications in Boehringer. It provides utilities for document processing, RAG (Retrieval-Augmented Generation), and LLM integration.
"tangbao" is Chinese for "soup dumpling", a type of dim sum popular in Shanghai. It is made by filling a dumpling with meat and a mixture of ice and lard. When the dumpling steams, the frozen part melts and imparts the "tang" to the "bao". The word "bao" is also the name for a package or library in coding.
Contact Steven Brooks or Pietro Mascheroni for feedback/support.
For Users
Installation
Everything below assumes you have a python venv already created. If you dont, then run
python -m venv .venv
source .venv/bin/activate
To install the package run pip install tangbao.
Other dependencies
You might need to install additional dependencies based on the types of documents you plan to parse.
See the unstructured documentation here: https://pypi.org/project/unstructured/
In this package, we only install the minimal dependencies necessary.
Configuration
This project requires certain environment variables to be set. These variables are used for connecting to external APIs and services.
- Create a
.envfile in the root directory of the project. - Add the following content to the
.envfile, replacing placeholder values with your actual credentials:
Apollo
Apollo is the focus of the remainder of this guide.
APOLLO_CLIENT_ID=your_client_id
APOLLO_CLIENT_SECRET=your_client_secret
INDEX_NAME="" # Set this to your index name, see below for more details
To obtain the client id and client secret, please refer to the guide in the apollo website. We recommend starting from the "Experimentation Use Case" route for new projects.
Azure
To use the Azure endpoints, you'll need the following in your .env file:
AZURE_BASE_URL=https://azure.example.com
AZURE_API_KEY=your_azure_api_key
AZURE_DEPLOYMENT_VERSION=v1
AZURE_DEPLOYMENT_NAME=model_name
RAG Workflow
Step 1: Parse Documents
Note: This guide, and all following guides assume you've set up your environment properly. See above for instructions.
Before we can build the RAG, we need to parse the documents. This package provides functions to make that easier.
IMPORTANT: PDF images are not be parsed with the current release.
We provide a basic chunking strategy, i.e., unstructured chunking. This means that meta-information such as the chapter or section level is missed when chunking the documents.
Two parameters control the chunking structure:
- CHUNK_SIZE: controls the maximum number of characters in one text chunk
- CHUNK_OVERLAP: controls the characters that overlap between following chunks.
The chunk size controls the granularity in which the text is divided: small chunks provide very specific, almost keyword based, matches to the query. Larger chunks allow to grasp more context and subtle meaning of the text.
To start with, we suggest to go for CHUNK_SIZE = 500, CHUNK_OVERLAP = 0. From our experiments, these values provide a good starting point for many situations.
The following is a simple example to setup a parsing strategy. Please follow these steps:
- Store PDF documents in a folder named
./documents - Create a script like the following:
from tangbao import parse_docs
CHUNK_SIZE = 500
CHUNK_OVERLAP = 0
filenames_df = parse_docs.get_filenames("./documents")
processed_docs = parse_docs.process_documents(filenames_df, CHUNK_SIZE, CHUNK_OVERLAP)
processed_docs["Metadata"] = processed_docs["Metadata"].apply(parse_docs.parse_metadata)
# Save file for the next step
processed_docs.to_parquet(f'my_docs_cs_{CHUNK_SIZE}_co_{CHUNK_OVERLAP}.parquet')
Step 2: Index the RAG Database
After we've parsed the documents in Step 1, we can index the RAG's vector database with the document chunks and metadata.
- Make sure to use the same CHUNK_SIZE and CHUNK_OVERLAP values from the previous step.
- Make sure you have the
.parquetfile in your working directory. - The INDEX_NAME should have the following format:
app-id_your-index-name. Theapp-idfor your use case can be retrieven by following the code snippet below. For the index name, consider that it can have underscores, dashes, numbers and lower-case characters only. Note: The index name scheme for Apollo is set to be changed soon, so this code will need to be updated.
IMPORTANT: It is very important that you keep your index name a secret so others won't overwrite it with their documents. Consider using an environment variable for this, similar to how we treat an API Key. Another level of assurance that no one will overwrite your index with their documents would be to generate a unique index name, e.g., with
import uuid
from tangbao.apollo import Apollo
your_index_name = str(uuid.uuid4()) # can only include lower case alpha-numeric, underscores, and dashes
apollo = Apollo()
iam = apollo.iam()
app_id = iam["id"]
INDEX_NAME=f'{app_id}_{your_index_name}'
But then just remember to record this index name in your .env for use later on. If you call it INDEX_NAME, then
you can call on it with e.g., os.getenv("INDEX_NAME").
- Index the RAG DB. This can be done following a similar script:
from tangbao.apollo import Apollo
from tangbao.parse_docs import separate_for_indexing
from tangbao import config
import pandas as pd
import os
# use the same values from Step 1
CHUNK_SIZE = 500
CHUNK_OVERLAP = 0
PARQUET_FILE = f'my_docs_cs_{CHUNK_SIZE}_co_{CHUNK_OVERLAP}.parquet'
INDEX_NAME = os.getenv("INDEX_NAME")
EMBEDDING_MODEL = "openai-text-embedding-3-large" # you can see other embedding models with apollo.get_model_info()
processed_docs = pd.read_parquet(PARQUET_FILE, engine='pyarrow')
texts, ids, metadatas = separate_for_indexing(processed_docs)
# this can take a long time to run, depending on how many documents you have
apollo.index_multi_threaded(
texts=texts,
ids=ids,
metadatas=metadatas,
index_name=INDEX_NAME,
embedding_model=EMBEDDING_MODEL,
max_workers=8
)
If there are any failures in indexing doc chunks, they will be written to a log file. You can resubmit those chunks using this method:
if os.path.exists(config.LOG_FILE):
apollo.resubmit_failed_chunks(
log_file=config.LOG_FILE,
texts=texts,
ids=ids,
metadatas=metadatas,
index_name=INDEX_NAME,
embedding_model=EMBEDDING_MODEL,
max_workers=8
)
After the indexing is completed, it is possible to query the RAG dataset with a test question. This can be accomplished using the following script:
apollo.query_index(
user_query="YOUR QUERY HERE",
num_chunks=5,
index_name=INDEX_NAME,
embedding_model=EMBEDDING_MODEL
)
Alternative Retrievers
It is possible to also use alternative retrievers. To use a BM25 retriever (keyword based), follow this code. The vector store will be created in memory, so it will be necessary to index the documents inside the app.
from tangbao.parse_docs import separate_for_BM25
from tangbao.retrievers import BM25Retriever
# parse the documents according to the structure needed for BM25
documents = separate_for_BM25(processed_docs)
# create an instance of the BM25Retriever class
bm25_retriever = BM25Retriever(documents)
# index the corpus
bm25_retriever.index_corpus()
# query the corpus
output = bm25_retriever.query_corpus("YOUR QUERY HERE")
print(output)
It is also possible to call a hybrid retrieval that combines keyword-based and semantic searches. This is supported by the class HybridRetrieverApollo that combines a BM25 retriever with the vector database from apollo. Note that this requires the documents to be already indexed in the apollo vector store before performing the search (see step 4 above).
from tangbao.retrievers import HybridRetrieverApollo
INDEX_NAME = os.getenv("INDEX_NAME") # the index name used during indexing
EMBEDDING_MODEL = "openai-text-embedding-3-large" # the embedding model used during indexing
# define a sample query
sample_query = "YOUR QUERY HERE"
# initialize the retriever with the pandas dataframe that was used during indexing
hybrid_retriever = HybridRetrieverApollo(processed_docs, INDEX_NAME, EMBEDDING_MODEL)
# query the BM25 and apollo indices
combined_scores = hybrid_retriever.query_indices(sample_query, num_chunks=2)
# print the sorted documents and their scores
for doc in combined_scores:
print(f'{doc["text"]}: {doc["rank"]:.4f}, metadata: {doc["metadata"]}')
Note that it is also possible to assign a weight to the importance of keyword-based retrieval (relative to embedding-based).
# example usage with keyword weight
combined_scores = hybrid_retriever.query_indices(sample_query, num_chunks=2, keyword_weight=0.1)
# print the sorted documents and their scores
for doc in combined_scores:
print(f'{doc["text"]}: {doc["rank"]:.4f}, metadata: {doc["metadata"]}')
The weight controls the importance that is given to keyword-based retrieval when ranking the documents in the hybrid search. If the user is looking for very specific terms, than it would make sense to impose more weight to keyword-based retriever. On the other hand, if the user is unsure about the content and wordings that are present in the documents, then it is possible to assign more freedom in the search by selecting a lower weight. By default, an equal weight is assigned to both keyword- and embedding-based retrievers.
We also support an in-memory retriever from LangChain. This is convenient for relatively small documents that don't have to be stored between sessions (the in-memory retriever will indeed be erased when the application or notebook is closed). It performs a similarity search in the embedding space using cosine similarity (please see the full details at this link.) Here is a code snippet that shows how to initialize the retriever and use it for a simple query:
from tangbao.parse_docs import separate_for_indexing
from tangbao.retrievers import ApolloEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
# pre-process the documents in the format required for indexing
texts, ids, metadatas = separate_for_indexing(processed_docs)
# initialize the embedding model to be used with LangChain
embeddings = ApolloEmbeddings(model="openai-text-embedding-3-large", dimensions=3072)
# index the text chunks and their metadata in the vector store
vectorstore = InMemoryVectorStore.from_texts(
texts=texts,
embedding=embeddings,
ids=ids,
metadatas=metadatas,
)
# use the vectorstore as a retriever
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 5})
# retrieve the most similar chunks
retrieved_documents = retriever.invoke("YOUR QUERY HERE")
# show the retrieved documents
for doc in retrieved_documents:
print(doc.page_content)
Note that it is also possible to use a hybrid retriever that combines the in-memory embedding-based retrieval with keyword-based search. This is implemented through the class HybridRetrieverInMemory. See the following code snippet:
from tangbao.retrievers import HybridRetrieverInMemory
# define a sample query
sample_query = "YOUR QUERY HERE"
# initialize the retriever with the pandas dataframe that was used during indexing
hybrid_retriever = HybridRetrieverInMemory(processed_docs, emb_model="openai-text-embedding-3-large", emb_dimensions=3072)
# query the BM25 and apollo indices
combined_scores = hybrid_retriever.query_indices(sample_query, num_chunks=2, keyword_weight=0.2)
# print the sorted documents and their scores
for doc in combined_scores:
print(f'{doc["text"]}: RANK {doc["rank"]:.4f}, metadata: {doc["metadata"]}')
Reranking of retrieved documents
To improve the outcome of retrieval, it is possible to add a reranking step that improves the match between the user query and the retrieved chunks. This is supported through the ReRanker class and works for all the supported retrieval frameworks (keyword-, embedding-based and hybrid). Here is a code snippet showing how to setup reranking for documents retrieved using the apollo vector database:
pip install tangbao[rerank] # installs torch and sentence-transformers for reranking
from tangbao.retrievers import extract_docs_from_apollo_vs
from tangbao.rerankers import ReRanker
# define a sample user query
example_user_query = "YOUR QUERY HERE"
# query the apollo vector database
response_apollo_query_index = apollo.query_index(
user_query=example_user_query,
num_chunks=5,
index_name=INDEX_NAME,
embedding_model=EMBEDDING_MODEL
)
# reformat the retrieved chunks for further processing
example_docs_retrieved = extract_docs_from_apollo_vs(response_apollo_query_index)
# initialize the reranker with the default options
reranker = ReRanker()
# rerank the documents with the default options
reranker.rerank(example_user_query, example_docs_retrieved)
NOTE: the re-formatting of the retrieved chunks is not needed for BM25 and hybrid retrieval, since the output of these methods is already compatible with the ReRanker class. For documents retrieved using the in-memory retrieval strategy, the function extract_docs_from_langchain_re can be used to pre-process the documents before reranking. Check the docs of ReRanker by typing help(ReRanker) for additional reranking options.
Step 3: Build a Streamlit App (Optional Example)
This is an example of how to build a chat interface using Streamlit. First, install Streamlit:
pip install streamlit==1.31.0
Now that we have indexed our documents in the RAG database, we can build a Streamlit app to let users 'chat' with the document store.
To create the app, follow these steps:
- Make sure you have the INDEX_NAME from the previous step
- Create a file called
app.pyand use the following template. Make sure to change the custom prompt below if needed! Changing the prompt is a crucial step to assure that the generation phase of the RAG conforms to your specific use case. Invest some time in prompt engineering, to get the best out of the LLM used to generate the answers to the user queries.
import streamlit as st
import pandas as pd
from tangbao import utils
from tangbao.apollo import Apollo
import os
INDEX_NAME = os.getenv("INDEX_NAME")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL") # make sure its the same one you used for indexing above!
st.title("Chat with Docs")
# Define Session State
if "messages" not in st.session_state.keys():
st.session_state.messages = [{"role": "assistant", "content": "How may I help you?"}]
if "used_tokens" not in st.session_state:
st.session_state.used_tokens = 0
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
apollo = Apollo()
@st.cache_data
def cached_model_info(_apollo_client):
return _apollo_client.get_model_info()
model_info = cached_model_info(apollo)
chat_models = [model["model_name"] for model in model_info if model["model_info"]["mode"] == "chat"]
# Define Sidebar
with st.sidebar:
selected_model = st.selectbox("Select LLM:", chat_models)
CONTEXT_WINDOW = [model['model_info']['max_input_tokens'] for model in model_info if model['model_name'] == selected_model][0]
token_display = st.empty()
with token_display.container():
st.progress(st.session_state.used_tokens/CONTEXT_WINDOW, text = f"Context window used ({st.session_state.used_tokens} out of {CONTEXT_WINDOW})")
temperature = st.slider("Select model creativity (temperature)", min_value=0.0, max_value=1.0, value = 0.0)
chunk_num = st.slider("Select number of chunks", min_value=1, max_value=8, value=4)
# User Input
if user_query := st.chat_input("Ask a question"):
st.session_state.messages.append({"role": "user", "content": user_query})
with st.chat_message("user"):
st.markdown(user_query)
if st.session_state.messages[-1]["role"] != "assistant":
# RAG Output
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
context = apollo.query_index(user_query, chunk_num, INDEX_NAME)
#### ADAPT THE FOLLOWING PROMPT TO YOUR SPECIFIC NEEDS ####
prompt = f"""\
Use the following CONTEXT delimited by triple backticks to answer the QUESTION at the end.
If you don't know the answer, just say that you don't know.
Use three to five sentences and keep the answer as concise as possible.
You are also a language expert, and so can translate your responses to other languages upon request.
CONTEXT: ```
{context['docs']}
```
QUESTION: ```
{user_query}
```
Helpful Answer:"""
response_full = apollo.chat_completion(
messages=[{'role': 'user', 'content': prompt}] +
[{'role': m['role'], 'content': m['content']} for m in st.session_state.messages],
model=selected_model,
temperature=temperature,
seed=42,
is_stream=False
)
response = apollo.get_content(response_full)
st.session_state.used_tokens = apollo.get_token_usage(response_full)
st.write(response)
with st.sidebar:
with token_display.container():
st.progress(st.session_state.used_tokens/CONTEXT_WINDOW, text = f"Context window used ({st.session_state.used_tokens} out of {CONTEXT_WINDOW})")
sources, titles = utils.extract_source(context)
st.header("Sources:")
st.table(pd.DataFrame({"Documents referenced": titles}))
st.markdown(sources, unsafe_allow_html=True)
st.session_state.messages.append({"role": "assistant", "content": response})
Then run streamlit run app.py to see if it works!
For Developers
Testing
pip install -e .
The -e flag in pip install -e . installs the package in "editable" mode, which means:
- Changes you make to the source code will be reflected immediately without reinstalling
- The package will be available in your Python environment just like a normal installed package
- You can import it with
import tangbaoin your scripts
For unit testing, we'll use the pytest framework.
source .venv/bin/activate
python -m pytest tests/
Build
source .venv/bin/activate
pip install -e .
pip install --upgrade build wheel bumpversion
bumpversion patch # or major or minor
rm -rf dist
python -m build
Upload to PyPI
Requires a PyPI API Token. Get one at https://pypi.org
Set the token in your environment as TWINE_PASSWORD
source .venv/bin/activate
pip install --upgrade twine
twine upload --repository pypi dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tangbao-0.3.2.tar.gz.
File metadata
- Download URL: tangbao-0.3.2.tar.gz
- Upload date:
- Size: 30.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7021cd6cba35cca152ad4561b4b1f3374f242c9ff5dcb832fa04d7169b17b7f0
|
|
| MD5 |
a072ccc49a67ea9892176d788bfecaab
|
|
| BLAKE2b-256 |
e4b0ba2fb84c553ccc19a242e721bc47f1115e8deb7c2512956351656c269b31
|
File details
Details for the file tangbao-0.3.2-py3-none-any.whl.
File metadata
- Download URL: tangbao-0.3.2-py3-none-any.whl
- Upload date:
- Size: 22.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d735451d45fdf70fa42a00f7d9173f6b9d601a19eca19273d4bc1cb86a77f999
|
|
| MD5 |
d055234c800105014a76b4da11401bd9
|
|
| BLAKE2b-256 |
d178f69b929954d84e1fabddcb37a349af9ded4d470a8c6cb43268c50f850755
|