No project description provided
Project description
Table of Contents
- Introduction
- Installation
2.1 With pip
2.2 Local development - Examples
3.1 Text Style Transfer
3.2 De Bono's Six Thinking Hats
3.3 Retrieval Augmented Generation (RAG)
3.4 SQL Retrieval
3.5 Chatbot - Guides
4.1 Custom Actions
4.2 Writing Flows with Autocomplete
4.3 Caching with Redis
4.4 Setting up Ollama for Local Inference
4.5 Using Any Language Model
4.6 Prompting in-depth - License
Introduction
asyncflows is a framework for designing and running AI pipelines using simple YAML configuration.
Here is a simple flow that prompts the LLM to say "hello world", and prints the result.
YAML file that defines the flow:
# hello_world.yaml
default_model:
model: ollama/llama3
flow:
hello_world:
action: prompt
prompt:
- text: Can you say hello world for me?
default_output: hello_world.result
Python code that runs the flow:
from asyncflows import AsyncFlows
flow = AsyncFlows.from_file("hello_world.yaml")
result = await flow.run()
print(result)
Output of the python script:
Hello, world!
Run the example yourself with:
python -m asyncflows.examples.hello_world
Or:
Installation
With pip
Get started with:
pip install asyncflows
Depending on what you need, consider installing extra dependencies:
Model Providers
OpenAI
pip install asyncflows[openai]
Anthropic
pip install asyncflows[anthropic]
Google Cloud (Vertex AI)
pip install asyncflows[gcloud]
SQL Databases
Postgres
pip install asyncflows[pg]
SQLite
pip install asyncflows[sqlite]
Any SQL database implemented in sqlalchemy is supported, though you may need to install additional dependencies not shown here. Please open an issue if you run into this, we will happily add an extra category for your database.
Miscellaneous
Retrieve and Rerank
pip install asyncflows[transformers]
PDF Extraction
pip install asyncflows[pdf]
Local development
Create and activate a python3.11 virtual environment with, for example:
python3.11 -m venv .venv
source .venv/bin/activate
If not already installed, install poetry. Install dependencies with:
poetry install
To install all extra dependencies, run:
poetry install --all-extras
Examples
The examples default to Llama 3, and assume Ollama is running locally.
See Setting up Ollama for Local Inference to setup Ollama.
See Using Any Language Model to use a different model or provider.
Text Style Transfer
This example takes a writing sample, and writes about a topic in the style of the sample.
Running the example (will prompt you for a topic):
python -m asyncflows.examples.text_style_transfer
YAML file that defines the flow – click to expand
default_model:
model: ollama/llama3
flow:
# The `prompt` action asks the LLM to generate a writing sample
text_style_transfer:
action: prompt
prompt:
# We include a system message asking the LLM to respond in the style of the example
- role: system
text: You're a helpful assistant. Respond only in the style of the example.
# And a user message asking the LLM to write about the query
# This is a jinja template; the variables `writing_sample` and `query` will be replaced with
# values provided upon running the flow
- role: user
text: |
Here is a writing example:
```
{{ writing_sample }}
```
In the style of the example, write about {{ topic }}.
default_output: text_style_transfer.result
Running the flow (python and stdout)
Python script that runs the flow:
from asyncflows import AsyncFlows
writing_sample = """
Hullo mai neyms Kruubi Duubi, aI'm hear to rite yu a fanfic abowt enyting yu want.
"""
# Load the flow
flow = AsyncFlows.from_file("text_style_transfer.yaml").set_vars(
writing_sample=writing_sample,
)
# Run the flow
while True:
# Get the user's query via CLI interface (swap out with whatever input method you use)
try:
message = input("Write about: ")
except EOFError:
break
# Set the query and conversation history
topic_flow = flow.set_vars(
topic=message,
)
# Run the flow and get the result
result = await topic_flow.run()
print(result)
Output of the python script:
pizza in space
Heya spacie peeps! Its me, Zlorg, yer favret pizza enthusiast from Andromeda! Im here to rite a tale abowt da most tubular pizzas eva sent into da cosmos!
In da year 3050, da Pizza Federation (ya, thats what we call dem) launched its first-ever space-bound pizzaria, aptly named "Crust-ial Velocity." Captain Zara and her trusty crew of pizza artisans, including da infamous topping master, Rizzo, set off on a quest to deliver da most cosmic pies to da galaxy's most distant planets.
First stop: da planet Zorvath, where da alien inhabitants were famished for some good ol' Earth-style pepperoni. Captain Zara and crew whipped up a "Galactic Garage Band" pizza, topped with spicy peppers from da swamps of Saturn, mozzarella from da moon of Ganymede, and a drizzle of da finest olive oil from da vineyards of Mars. Da Zorvathians went wild, chanting "Pizza-za-zee!" in their native tongue.
De Bono's Six Thinking Hats
Edward De Bono's Six Thinking Hats is a business intelligence technique for creative problem-solving and decision support. The concept is based on the idea of using different colored hats to represent different modes of thinking, each focusing on a specific aspect of the problem or decision at hand.
This flow parallelizes the thinking under five hats, and synthesizes them under the blue hat.
Running the example (will prompt you for something to think about):
python -m asyncflows.examples.debono
YAML file that defines the flow – click to expand
# debono.yaml
# Set the default model for the flow (can be overridden in individual actions)
default_model:
model: ollama/llama3
# De Bono's Six Thinking Hats is a powerful technique for creative problem-solving and decision-making.
flow:
# The white hat focuses on the available information and facts about the problem.
white_hat:
action: prompt
prompt:
# This is a jinja template;
# the variable `query` will be replaced with the value provided upon running the flow
- text: |
Problem:
```
{{ query }}
```
List all the factual information you know about the problem.
What data and numbers are available?
Identify any gaps in your knowledge and consider how you might obtain this missing information.
# The red hat explores emotions, feelings, and intuitions about the problem.
red_hat:
action: prompt
prompt:
# This is syntactic sugar for referencing a variable,
# equivalent to the white hat's jinja template
- heading: Problem
var: query
- text: |
Express your feelings and intuitions about the problem without any need to justify them.
What are your initial reactions?
How do you and others feel about the situation?
# The black hat considers the risks, obstacles, and potential downsides of the problem.
black_hat:
action: prompt
prompt:
- heading: Problem
var: query
- text: |
Consider the risks and challenges associated with the problem.
What are the potential downsides?
Try to think critically about the obstacles, and the worst-case scenarios.
# The yellow hat focuses on the positive aspects, benefits, and opportunities of the problem.
yellow_hat:
action: prompt
prompt:
- heading: Problem
var: query
- text: |
Focus on the positives and the potential benefits of solving the problem.
What are the best possible outcomes?
How can this situation be an opportunity for growth or improvement?
# The green hat generates creative ideas, alternatives, and innovative solutions to the problem.
green_hat:
action: prompt
prompt:
- heading: Problem
var: query
- text: |
Think creatively about the problem.
Brainstorm new ideas and alternative solutions.
How can you overcome the identified challenges in an innovative way?
# The blue hat manages the thinking process, synthesizes insights, and outlines a plan of action.
blue_hat:
action: prompt
prompt:
# Var references an input variable provided when running the flow
- heading: Problem
var: query
# Link references another action's output
- heading: White Hat
link: white_hat.result
- heading: Red Hat
link: red_hat.result
- heading: Black Hat
link: black_hat.result
- heading: Yellow Hat
link: yellow_hat.result
- heading: Green Hat
link: green_hat.result
- text: |
Review and synthesize the information and ideas generated from the other hats.
Assess which ideas are most feasible and effective based on the facts (White Hat), emotions (Red Hat), risks (Black Hat), benefits (Yellow Hat), and creative solutions (Green Hat).
How can these insights be integrated into a coherent strategy?
Outline a plan with clear steps or actions, indicating responsibilities, deadlines, and milestones.
Consider how you will monitor progress and what criteria you will use to evaluate success.
default_output: blue_hat.result
Running the flow (python and stdout)
Python script that runs the flow:
from asyncflows import AsyncFlows
query = input("Provide a problem to think about: ")
flow = AsyncFlows.from_file("examples/debono.yaml").set_vars(
query=query,
)
# Run the flow and return the default output (result of the blue hat)
result = await flow.run()
print(result)
Output of the python script:
What should I plan for my mom's 60th birthday? She likes black and white, lives to work out, and loves piano music.
- Host a black/white themed party with piano music and fitness elements.
- Hire a pianist and plan a group workout or dance class beforehand.
- Create a memory video and fitness-themed gift (e.g. workout outfit).
- Incorporate black/white desserts and compile a personalized music playlist.
- Assign tasks, set deadlines. Monitor progress against mom's interests/feedback.
- Success criteria: Mom feels celebrated in a personalized, meaningful way.
Retrieval Augmented Generation (RAG)
This flow facilitates asking questions over a set of documents.
Given a list of texts, it uses retrieval augmented generation (RAG) to find the ones relevant to the question, and generates an answer.
The form of RAG we're using is retrieval followed by reranking. Retrieval is great for searching through a large dataset, while reranking is slower but better at matching against the query.
Running the example, running transformers locally, over examples/recipes/
(a folder with text files):
python -m asyncflows.examples.rag
YAML file that defines the flow – click to expand
# rag.yaml
default_model:
model: ollama/llama3
flow:
# `retrieve` performs a vector search, fast for large datasets
retrieval:
action: retrieve
k: 5
documents:
var: texts
query:
var: question
# `rerank` picks the most appropriate documents, it's slower than retrieve, but better at matching against the query
reranking:
action: rerank
k: 2
documents:
link: retrieval.result
query:
var: question
# `chat` prompts the LLM to summarize the top recipes
chat:
action: prompt
prompt:
- heading: Related documents
text: |
{% for doc in reranking.result %}
---
{{ doc }}
---
{% endfor %}
- heading: Question
var: question
- text: |
Based on the top papers, what is the most relevant information to the query?
Summarize the key points of the papers in a few sentences.
default_output: chat.result
Running the flow (python and stdout)
Python script that runs the flow:
from asyncflows import AsyncFlows
# Load text files from the `recipes` folder
document_paths = glob.glob("recipes/*.md")
texts = []
for document_path in document_paths:
with open(document_path, "r") as f:
texts.append(f.read())
# Load the rag flow
flow = AsyncFlows.from_file("rag.yaml").set_vars(
texts=texts,
)
# Run the flow
while True:
# Get the user's query via CLI interface (swap out with whatever input method you use)
try:
question = input("Ask me anything: ")
except EOFError:
break
# Set the query
question_flow = flow.set_vars(
question=question,
)
# Run the flow and get the result
result = await question_flow.run()
print(result)
Output of the python script:
What's something healthy I could make?
The two provided recipes, Mexican Guacamole and Lebanese Hummus, offer healthy and flavorful options to prepare. The key points are:
-
Guacamole is made by mashing ripe avocados and mixing in fresh vegetables like onions, tomatoes, cilantro, and jalapeños, along with lime juice and salt. It can be served with tortilla chips or as a topping for tacos.
-
Hummus is a blend of chickpeas, tahini (sesame seed paste), lemon juice, garlic, and olive oil, seasoned with salt and cumin. It is typically served as a dip with warm pita bread and garnished with paprika and parsley.
Both recipes are healthy, vegetarian options that incorporate fresh ingredients and can be easily prepared at home.
SQL Retrieval
This flow facilitates asking questions over a SQL database.
To use it with your database, install the corresponding extra packages and set the DATABASE_URL
environment variable.
Running the example with a database available at DATABASE_URL
passed as an environment variable:
DATABASE_URL=... python -m asyncflows.examples.sql_rag
YAML file that defines the flow – click to expand
# sql_rag.yaml
default_model:
model: ollama/llama3
temperature: 1
max_output_tokens: 2000
flow:
# Get the database schema as CREATE TABLE statements
get_db_schema:
action: get_db_schema
database_url:
env: DATABASE_URL
# Generate a SQL statement to get data from the database
generate_sql_statement:
action: prompt
quote_style: xml
prompt:
- heading: Database schema
link: get_db_schema.schema_text
- heading: User query
var: query
- text: |
Can you write a SQL statement to get data from the database, to help us answer the user query?
Wrap the statement in <sql> tags.
# Extract the SQL statement from the generated response
extract_sql_statement:
action: extract_xml_tag
text:
link: generate_sql_statement.result
tag: sql
# Execute the SQL statement
exec:
action: execute_db_statement
database_url:
env: DATABASE_URL
statement:
link: extract_sql_statement.result
# Answer the user query based on the result of the SQL statement
answer_user_query:
action: prompt
prompt:
- heading: SQL statement
link: extract_sql_statement.result
- text: |
Here is the result of executing the SQL statement:
```
{{ exec.text }}
```
Can you answer the user query based on this result?
- heading: User query
var: query
default_output: answer_user_query.result
Running the flow (python and stdout)
Python script that runs the flow:
from asyncflows import AsyncFlows
# Load the sql flow
flow = AsyncFlows.from_file("sql_rag.yaml")
# Show the database schema
schema = await flow.run("get_db_schema")
print(schema.schema_text)
# Run the question answering flow
while True:
# Get the user's query via CLI interface (swap out with whatever input method you use)
try:
query = input("Ask me anything: ")
except EOFError:
break
# Set the query
question_flow = flow.set_vars(
query=query,
)
# Run the flow and get the result
result = await question_flow.run()
print(result)
Output of the python script:
What are the top 5 most expensive products in the database?
Given the result of the SQL statement, the top 5 most expensive products in the database are:
- Product A: $100
- Product B: $90
- Product C: $80
- Product D: $70
- Product E: $60
Chatbot
This flow facilitates a chatbot over a set of PDF documents.
Given a list of filepaths, it extracts their text, uses retrieval augmented generation (RAG) to find the ones relevant to the question, and generates an answer.
The form of RAG we're using is retrieval followed by reranking. Retrieval is great for searching through a large dataset, while reranking is slower but better at matching against the query.
YAML file that defines the flow – click to expand
# chatbot.yaml
default_model:
model: ollama/llama3
flow:
# Iterate over the PDF filepaths
extract_pdf_texts:
for: filepath
in:
var: pdf_filepaths
flow:
# For each filepath, `extract_pdf_text` extracts text from PDF files
extractor:
action: extract_pdf_text
file:
var: filepath
# Analyze the user's query and generate a question for the retrieval system
generate_query:
action: prompt
quote_style: xml
prompt:
- heading: User's Message
var: message
- text: |
Carefully analyze the user's message and generate a clear, focused query that captures the key information needed to answer the message.
The query should be suitable for a vector search through relevant books.
Put the query between <query> and </query> tags.
# Extract the <query>{{query}}</query> from the generated response
extract_query:
action: extract_xml_tag
tag: query
text:
link: generate_query.result
# `retrieve` performs a vector search, fast for large datasets
retrieval:
action: retrieve
k: 20
documents:
lambda: |
[page
for flow in extract_pdf_texts
for page in flow.extractor.pages]
texts:
lambda: |
[page.title + "\n\n" + page.text # Include the title in the embeddings
for flow in extract_pdf_texts
for page in flow.extractor.pages]
query:
link: extract_query.result
# `rerank` picks the most appropriate documents, it's slower than retrieve, but better at matching against the query
reranking:
action: rerank
k: 5
documents:
link: retrieval.result
texts:
lambda: |
[page.text
for page in retrieval.result]
query:
link: extract_query.result
# `chatbot` prompts the LLM to summarize the top papers
chatbot:
action: prompt
quote_style: xml
prompt:
- role: system
- text: |
You are an expert literary theorist and critic analyzing several Relevant Pages with regards to a New Message.
Remember what your Conversation History is as you write your response.
- role: user
- heading: Relevant Pages
text: |
{% for page in reranking.result -%}
{{ page.title }}, page number {{ page.page_number }}
---
{{ page.text }}
---
{% endfor %}
- heading: Conversation History
text: |
{% for message in conversation_history -%}
{{ message }}
{% endfor %}
- heading: New Message
var: message
- text: |
Clearly and concisely respond to the New Message keeping in mind the Relevant Pages and Conversation History if any.
Provide your response to the New Message between <response> and </response> tags.
extract_chatbot:
action: extract_xml_tag
tag: response
text:
link: chatbot.result
default_output: extract_chatbot.result
Running the flow (python and stdout)
Python script that runs the flow:
import glob
from asyncflows import AsyncFlows
# Load PDFs from the `books` folder
document_paths = glob.glob("books/*.pdf")
# Load the chatbot flow
flow = AsyncFlows.from_file("chatbot.yaml").set_vars(
pdf_filepaths=document_paths,
)
# Keep track of the conversation history
conversation_history = []
# Run the flow
while True:
# Get the user's query via CLI interface (swap out with whatever input method you use)
try:
message = input("Ask me anything: ")
except EOFError:
break
# Set the query and conversation history
query_flow = flow.set_vars(
message=message,
conversation_history=conversation_history,
)
# Run the flow and get the result
result = await query_flow.run()
print(result)
# Update the conversation history
conversation_history.extend(
[
f"User: {message}",
f"Assistant: {result}",
]
)
Output of the python script:
How does the Red Queen's view of punishment shape the story?
The Red Queen's view of punishment is a significant theme that shapes the story of Wonderland. In the Relevant Pages, we see her ordering the beheading of Alice, three gardeners, and others without hesitation or remorse. Her fury is intense, and she screams "Off with her head!" with an air of absolute authority.
The Red Queen's perspective on punishment reveals a stark contrast between her cruel nature and the more benevolent attitudes of other characters, such as the King, who intervenes to save Alice from execution. The Queen's actions also create a sense of danger and chaos, making Wonderland an unpredictable and potentially deadly place for its inhabitants.
Guides
Custom Actions
You can create custom actions by subclassing Action
, and defining the input and output models using Pydantic.
Here is an example action that visits a webpage and returns its text content:
from asyncflows import Action, BaseModel, Field
import aiohttp
class Inputs(BaseModel):
url: str = Field(
description="URL of the webpage to GET",
)
class Outputs(BaseModel):
result: str = Field(
description="Text content of the webpage",
)
class GetURL(Action[Inputs, Outputs]):
name = "get_url"
async def run(self, inputs: Inputs) -> Outputs:
async with aiohttp.ClientSession() as session:
async with session.get(inputs.url) as response:
return Outputs(result=await response.text())
YAML file of an example flow using this action – click to expand
# get_page_title.yaml
default_model:
model: ollama/llama3
flow:
get_website:
action: get_url
url:
var: url
extract_title:
action: prompt
prompt:
- heading: Website content
link: get_website.result
- text: |
What is the title of the webpage?
default_output: extract_title.result
Running the flow (python and stdout)
Python script that runs the flow:
from asyncflows import AsyncFlows
flow = AsyncFlows.from_file("get_page_title.yaml")
# Run the flow and return the default output (result of the extract_title action)
result = await flow.set_vars(
url="https://en.wikipedia.org/wiki/Python_(programming_language)",
).run()
print(result)
Output of the python script:
The title of the webpage is "Python (programming language) - Wikipedia".
As long as your custom actions are imported before instantiating the flow, they will be available for use.
Alternatively, to ensure the action is always available,
and for it to show up when using the language server for YAML autocomplete,
register an entrypoint for the module that contains your actions.
For example, using poetry, with your actions located in my_package.actions
,
include the following at the end of your pyproject.toml
:
[tool.poetry.plugins."asyncflows"]
actions = "my_package.actions"
See the API Call Example for a custom actions template repository.
Writing Flows with Autocomplete
For an easier time writing flows, use YAML Language Server in your editor with our JsonSchema.
Put the following at the top of your YAML flow config file:
# yaml-language-server: $schema=https://raw.githubusercontent.com/asynchronous-flows/asyncflows/main/schemas/asyncflows_schema.json
The JsonSchema will only catch some errors, stay tuned for a domain-specific language server that will provide more advanced features.
Caching with Redis
By default, AsyncFlows caches action outputs with a shelve file in a temporary directory.
To cache between runs, we support Redis as a backend:
-
Run Redis locally or use a cloud provider.
-
Set the following environment variables:
REDIS_HOST
(required)REDIS_PASSWORD
(required)REDIS_PORT
(optional, defaults to 6379)REDIS_USERNAME
(optional, defaults to empty string)
- Override the default cache with:
from asyncflows import AsyncFlows, RedisCacheRepo
flow = AsyncFlows.from_file(
"flow.yaml",
cache_repo=RedisCacheRepo,
)
Setting up Ollama for Local Inference
To run the examples, you need to have Ollama running locally.
- Download and Install Ollama
- On some platforms like macOS Ollama runs in the background automatically. If not, start it with:
ollama serve
- Pull the
ollama/llama3
model (or the model you plan to use):
ollama pull ollama/llama3
That's it! You're ready to run the examples.
Using Any Language Model
You may set api_base
under default_model
to change the Ollama API endpoint:
default_model:
model: ollama/llama3
api_base: ...
Alternatively, you can change the model
in the corresponding YAML file to e.g., gpt-3.5-turbo
(an OpenAI model), or claude-3-haiku-20240307
(an Anthropic model).
If you do, run the example with the corresponding api key environment variable.
You may also use any other model available via litellm. Please note that most litellm models do not have async support, and will block the event loop during inference.
OpenAI Example
default_model:
model: gpt-3.5-turbo
Running the example with an OpenAI API key:
OPENAI_API_KEY=... python -m asyncflows.examples.hello_world
Anthropic Example
default_model:
model: claude-3-haiku-20240307
Running the example with an Anthropic API key:
ANTHROPIC_API_KEY=... python -m asyncflows.examples.hello_world
Prompting in-depth
The prompt
action constructs a prompt from a list of text and variables, and sends it to the LLM.
The simplest prompt contains a single string:
my_prompt:
action: prompt
prompt:
- text: "Can you say hello world for me?"
Each string in the prompt is a jinja template. A more complicated prompt includes a variable:
my_prompt:
action: prompt
prompt:
- text: "Can you say hello to {{ name }}?"
It's also possible to reference other actions' results in the template:
name_prompt:
action: prompt
prompt:
- text: "What's your name?"
my_prompt:
action: prompt
prompt:
- text: "Can you say hello to {{ name_prompt.result }}?"
Often-times, the prompt is more complex, and includes multiple variables in a multi-line string.
The following two prompts are equivalent, but the second one uses syntactic sugar for referring to the sample_text
variable:
my_prompt:
action: prompt
prompt:
- text: |
A writing sample:
```
{{ sample_text }}
```
Write a story about {{ subject }} in the style of the sample.
my_prompt:
action: prompt
prompt:
- heading: A writing sample
var: sample_text
- text: Write a story about {{ subject }} in the style of the sample.
For generating well-formatted output, it is often useful to persuade the language model to generate a response wrapped in XML tags. Prompting with XML tags often makes such a response better.
The prompt
action can use the quote_style
parameter to specify how to format variables in a prompt.
Specifically, xml
will wrap the variable in XML tags instead of triple-backticks.
The two prompts below are equivalent:
my_prompt:
action: prompt
prompt:
- text: |
<writing sample>
{{ sample_text }}
</writing sample>
Write a story about {{ subject }} in the style of the sample, placing it between <story> and </story> tags.
my_prompt:
action: prompt
quote_style: xml
prompt:
- heading: writing sample
var: sample_text
- text: |
Write a story about {{ subject }} in the style of the sample, placing it between <story> and </story> tags.
From this prompt's response, extract the story with the extract_xml_tag
action:
extract_story:
action: extract_xml_tag
tag: story
text:
link: my_prompt.result
Lastly, using roles (system and user messages) is easy.
Simply append role: system
or role: user
to a text element, or use it as a standalone element.
The following two prompts are equivalent:
my_prompt:
action: prompt
prompt:
- role: system
text: You are a detective investigating a crime scene.
- role: user
text: What do you see?
my_prompt:
action: prompt
prompt:
- role: system
- text: You are a detective investigating a crime scene.
- role: user
- text: What do you see?
License
asyncflows is licensed under the Business Source License 1.1 (BSL-1.1).
As we evolve our licensing, we will only ever become more permissive.
We do not intend to charge for production use of asyncflows, and are in the process of drafting an additional use grant.
If you wish to use asyncflows in a way that is not covered by the BSL-1.1, please reach out to us at legal@asyncflows.com. We will be happy to work with you to find a solution that makes your lawyers happy.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for asyncflows-0.1.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 327bd2f8eb4d1c8b7608f532b362960c82cb41c5d2c678cf12ac6e7f56fdd902 |
|
MD5 | 0c84b0be8912638fd0e5ed25fc840ca7 |
|
BLAKE2b-256 | abdb2179986ee65175ddbfb71c1a1a2e1445a3cd753b1aa2937bd841c067052a |