Skip to main content

No project description provided

Project description

WorkflowAI Python

PyPI version

A library to use WorkflowAI with Python.

Context

WorkflowAI is a platform for designing, building, and deploying agents.

Installation

workflowai requires a python >= 3.9.

pip install workflowai

Usage

Usage examples are available in the examples directory or end to end test directory.

Getting a workflowai api key

Create an account on workflowai.com, generate an API key and set it as an environment variable.

WORKFLOWAI_API_KEY=...

You can also set the WORKFLOWAI_API_URL environment variable to point to your own WorkflowAI instance.

The current UI does not allow to generate an API key without creating a task. Take the opportunity to play around with the UI. When the task is created, you can generate an API key from the Code section

Set up the workflowai client

If you have defined the api key using an environment variable, the shared workflowai client will be correctly configured.

You can override the shared client by calling the init function.

import workflowai

workflowai.init(
    url=..., # defaults to WORKFLOWAI_API_URL env var or https://api.workflowai.com
    api_key=..., # defaults to WORKFLOWAI_API_KEY env var
)

Using multiple clients

You might want to avoid using the shared client, for example if you are using multiple API keys or accounts. It is possible to achieve this by manually creating client instances

from workflowai import WorkflowAI

client = WorkflowAI(
    url=...,
    api_key=...,
)

# Use the client to create and run agents
@client.agent()
def my_agent(agent_input: Input) -> Output:
    ...

Build agents

An agent is in essence an async function with the added constraints that:

  • it has a single argument that is a pydantic model
  • it has a single return value that is a pydantic model
  • it is decorated with the @client.agent() decorator

Pydantic is a very popular and powerful library for data validation and parsing. It allows us to extract the input and output schema in a simple way

Below is an agent that analyzes customer feedback from call transcripts:

import workflowai
from pydantic import BaseModel, Field
from typing import List
from datetime import date

# Input model for the call feedback analysis
class CallFeedbackInput(BaseModel):
    """Input for analyzing a customer feedback call."""
    transcript: str = Field(description="The full transcript of the customer feedback call.")
    call_date: date = Field(description="The date when the call took place.")

# Model representing a single feedback point with supporting evidence
class FeedbackPoint(BaseModel):
    """A specific feedback point with its supporting quote."""
    point: str = Field(description="The main point or insight from the feedback.")
    quote: str = Field(description="The exact quote from the transcript supporting this point.")
    timestamp: str = Field(description="The timestamp or context of when this was mentioned in the call.")

# Model representing the structured analysis of the customer feedback call
class CallFeedbackOutput(BaseModel):
    """Structured analysis of the customer feedback call."""
    positive_points: List[FeedbackPoint] = Field(
        default_factory=list,
        description="List of positive feedback points, each with a supporting quote."
    )
    negative_points: List[FeedbackPoint] = Field(
        default_factory=list,
        description="List of negative feedback points, each with a supporting quote."
    )

@workflowai.agent(id="analyze-call-feedback", model=Model.GPT_4O_LATEST)
async def analyze_call_feedback(input: CallFeedbackInput) -> CallFeedbackOutput:
    """
    Analyze a customer feedback call transcript to extract key insights:
    1. Identify positive feedback points with supporting quotes
    2. Identify negative feedback points with supporting quotes
    3. Include timestamp/context for each point

    Be specific and objective in the analysis. Use exact quotes from the transcript.
    Maintain the customer's original wording in quotes.
    """
    ...

When you call that function, the associated agent will be created on workflowai.com if it does not exist yet and a run will be created. By default:

  • the docstring will be used as instructions for the agent
  • the default model (workflowai.DEFAULT_MODEL) is used to run the agent
  • the agent id will be a slugified version of the function name unless specified explicitly

Example usage:

# Example transcript
transcript = '''
[00:01:15] Customer: I've been using your software for about 3 months now, and I have to say the new dashboard feature is really impressive. It's saving me at least an hour each day on reporting.

[00:02:30] Customer: However, I'm really frustrated with the export functionality. It crashed twice this week when I tried to export large reports, and I lost all my work.

[00:03:45] Customer: On a positive note, your support team, especially Sarah, was very responsive when I reported the issue. She got back to me within minutes.

[00:04:30] Customer: But I think the pricing for additional users is a bit steep compared to other solutions we looked at.
'''

# Create input
feedback_input = CallFeedbackInput(
    transcript=transcript,
    call_date=date(2024, 1, 15)
)

# Analyze the feedback
run = await analyze_call_feedback(feedback_input)

# Print the analysis
print("\nPositive Points:")
for point in run.positive_points:
    print(f"\n{point.point}")
    print(f"  Quote [{point.timestamp}]: \"{point.quote}\"")

print("\nNegative Points:")
for point in run.negative_points:
    print(f"\n{point.point}")
    print(f"  Quote [{point.timestamp}]: \"{point.quote}\"")

What is "..." ?

The ... is the ellipsis value in python. It is usually used as a placeholder. You could use "pass" here as well or anything really, the implementation of the function is handled by the decorator @workflowai.agent() and so the function body is not executed. ... is usually the right choice because it signals type checkers that they should ignore the function body.

Having the agent id determined at runtime can lead to unexpected changes, since changing the function name will change the agent id. A good practice is to set the agent id explicitly, @workflowai.agent(id="say-hello").

Using different models

WorkflowAI supports a long list of models. The source of truth for models we support is on workflowai.com. The Model enum is a good indication of what models are supported at the time of the sdk release, although it may be missing some models since new ones are added all the time.

You can set the model explicitly in the agent decorator:

from workflowai import Model

@workflowai.agent(model=Model.GPT_4O_LATEST)
async def analyze_call_feedback(input: CallFeedbackInput) -> CallFeedbackOutput:
    ...

Models do not become invalid on WorkflowAI. When a model is retired, it will be replaced dynamically by a newer version of the same model with the same or a lower price so calling the api with a retired model will always work.

Version from code or deployments

Setting a docstring or a model in the agent decorator signals the client that the agent parameters are fixed and configured via code.

Handling the agent parameters in code is useful to get started but may be limited in the long run:

  • it is somewhat hard to measure the impact of different parameters
  • moving to new models or instructions requires a deployment
  • iterating on the agent parameters can be very tedious

Deployments allow you to refer to a version of an agent's parameters from your code that's managed from the workflowai.com UI. The following code will use the version of the agent named "production" which is a lot more flexible than changing the function parameters when running in production.

@workflowai.agent(deployment="production") # or simply @workflowai.agent()
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
    ...

The Run object

Although having an agent only return the run output covers most use cases, some use cases require having more information about the run.

By changing the type annotation of the agent function to Run[Output], the generated function will return the full run object.

@workflowai.agent()
async def analyze_call_feedback(input: CallFeedbackInput) -> Run[CallFeedbackOutput]:
    ...


run = await analyze_call_feedback(feedback_input)
print(run.output) # the output, as before
print(run.model) # the model used for the run
print(run.cost_usd) # the cost of the run in USD
print(run.duration_seconds) # the duration of the inference in seconds

Streaming

You can configure the agent function to stream by changing the type annotation to an AsyncIterator.

Streaming the output only

Use AsyncIterator[Output] to get the output as it is generated.

from collections.abc import AsyncIterator

# Stream the output, the output is filled as it is generated
@workflowai.agent()
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
    ...

async for chunk in analyze_call_feedback(feedback_input):
    # Just get the output as it's generated
    print(chunk.output)

Note: no need to mark the agent as async here ! It is already asynchronous since it returns an AsyncIterator. The type checkers some times get confused since they consider that an async function that returns an AsyncIterator is async twice. For example, a function with the signature async def foo() -> AsyncIterator[int] may be called async for c in await foo():... in certain cases...

Streaming the run object

Use AsyncIterator[Run[Output]] to get the run object as it is generated, which allows you, for the last chunk, to access the cost and duration of the run.

import workflowai
from workflowai import Run
from collections.abc import AsyncIterator

# Stream the run object, the output is filled as it is generated
@workflowai.agent()
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
    ...

last_chunk = None

async for chunk in analyze_call_feedback(feedback_input):
    # Show output as it's generated
    print(chunk.output)
    last_chunk = chunk

if last_chunk:
    # Cost and duration are only available on the last chunk
    print(f"\nCost: ${last_chunk.cost_usd}")
    print(f"Latency: {last_chunk.duration_seconds:.2f}s")

Images

Add images as input to an agent by using the Image class. An image can either have:

  • a content, base64 encoded data
  • a url
from workflowai.fields import Image

class ImageInput(BaseModel):
    image: Image = Field(description="The image to analyze")

# use base64 to include the image inline
image = Image(content_type='image/jpeg', data='<base 64 encoded data>')

# You can also use the `url` property to pass an image URL.
image = Image(url="https://example.com/image.jpg")

An example of using image as input is available in city_identifier.py.

Files (PDF, .txt, ...)

Use the File class to pass files as input to an agent. Different LLMs support different file types.

from workflowai.fields import File
...

class PDFQuestionInput(BaseModel):
    pdf: File = Field(description="The PDF document to analyze")
    question: str = Field(description="The question to answer about the PDF content")

class PDFAnswerOutput(BaseModel):
    answer: str = Field(description="The answer to the question based on the PDF content")
    quotes: List[str] = Field(description="Relevant quotes from the PDF that support the answer")

@workflowai.agent(id="pdf-answer", model=Model.CLAUDE_3_5_SONNET_LATEST)
async def answer_pdf_question(input: PDFQuestionInput) -> PDFAnswerOutput:
    """
    Analyze the provided PDF document and answer the given question.
    Provide a clear and concise answer based on the content found in the PDF.
    """
    ...

pdf = File(content_type='application/pdf', data='<base 64 encoded data>')
question = "What are the key findings in this report?"

output = await answer_pdf_question(PDFQuestionInput(pdf=pdf, question=question))
# Print the answer and supporting quotes
print("Answer:", output.answer)
print("Supporting quotes:", "\n -".join(("", *quotes))
for quote in output.quotes:
    print(f"- {quote}")

An example of using a PDF as input is available in pdf_answer.py.

Audio

Use the File class to pass audio files as input to an agent. Note that only some models support audio input.

from workflowai.fields import File
...

class AudioInput(BaseModel):
    audio: File = Field(description="The audio recording to analyze for spam/robocall detection")

class AudioClassification(BaseModel):
    is_spam: bool = Field(description="Whether the audio is classified as spam/robocall")

@workflowai.agent(id="audio-classifier", model=Model.GEMINI_1_5_FLASH_LATEST)
async def classify_audio(input: AudioInput) -> AudioClassification:
    ...

# Example 1: Using base64 encoded data
audio = File(content_type='audio/mp3', data='<base 64 encoded data>')

# Example 2: Using a URL
# audio = File(url='https://example.com/audio/call.mp3')

run = await classify_audio(AudioInput(audio=audio))
print(run)

See an example of audio classification in audio_classifier.py.

Caching

By default, the cache settings is auto, meaning that agent runs are cached when the temperature is 0 (the default temperature value). Which means that, when running the same agent twice with the exact same input, the exact same output is returned and the underlying model is not called a second time.

The cache usage string literal is defined in cache_usage.py file. There are 3 possible values:

  • auto: (default) Use cached results only when temperature is 0
  • always: Always use cached results if available, regardless of model temperature
  • never: Never use cached results, always execute a new run

The cache usage can be passed to the agent function as a keyword argument:

@workflowai.agent(id="analyze-call-feedback")
async def analyze_call_feedback(_: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]: ...

run = await analyze_call_feedback(CallFeedbackInput(...), use_cache="always")

Replying to a run

Some use cases require the ability to have a back and forth between the client and the LLM. For example:

  • tools see below use the reply ability internally
  • chatbots
  • correcting the LLM output

In WorkflowAI, this is done by replying to a run. A reply can contain:

  • a user response
  • tool results
# Important: returning the full run object is required to use the reply feature
@workflowai.agent()
async def say_hello(input: Input) -> Run[Output]:
    ...

run = await say_hello(Input(name="John"))
run = await run.reply(user_message="Now say hello to his brother James")

The output of a reply to a run has the same type as the original run, which makes it easy to iterate towards the construction of a final output.

To allow run iterations, it is very important to have outputs that are tolerant to missing fields, aka that have default values for most of their fields. Otherwise the agent will throw a WorkflowAIError on missing fields and the run chain will be broken.

Tools

Tools allow enhancing an agent's capabilities by allowing it to call external functions.

WorkflowAI Hosted tools

WorkflowAI hosts a few tools:

  • @browser-text allows fetching the content of a web page
  • @search allows performing a web search

Hosted tools tend to be faster because there is no back and forth between the client and the WorkflowAI API. Instead, if a tool call is needed, the WorkflowAI API will call it within a single request.

A single run will be created for all tool iterations.

To use a tool, simply add it's handles to the instructions (the function docstring):

@workflowai.agent()
async def analyze_call_feedback(input: CallFeedbackInput) -> CallFeedbackOutput:
    """
    You can use @search and @browser-text to retrieve information about the name.
    """
    ...

Custom tools

Custom tools allow using most functions within a single agent call. If an agent has custom tools, and the model deems that tools are needed for a particular run, the agent will:

  • call all tools in parallel
  • wait for all tools to complete
  • reply to the run with the tool outputs
  • continue with the next step of the run, and re-execute tools if needed
  • ...
  • until either no tool calls are requested, the max iteration (10 by default) or the agent has run to completion

Tools are defined as regular python functions, and can be async or sync. Examples for tools are available in the tools end 2 end test file.

Important: It must be possible to determine the schema of a tool from the function signature. This means that the function must have type annotations and use standard types or BaseModel only for now.

# Annotations for parameters are passed as property descriptions in the tool schema
def get_current_time(timezone: Annotated[str, "The timezone to get the current time in. e-g Europe/Paris"]) -> str:
    """Return the current time in the given timezone in iso format"""
    return datetime.now(ZoneInfo(timezone)).isoformat()

# Tools can also be async
async def fetch_webpage(url: str) -> str:
    """Fetch the content of a webpage"""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text

@agent(
    id="answer-question",
    tools=[get_current_time, fetch_webpage],
    version=VersionProperties(model=Model.GPT_4O_LATEST),
)
async def answer_question(_: AnswerQuestionInput) -> Run[AnswerQuestionOutput]: ...

run = await answer_question(AnswerQuestionInput(question="What is the current time in Paris?"))
assert run.output.answer

It's important to understand that there are actually two runs in a single agent call:

  • the first run returns an empty output with a tool call request with a timezone
  • the second run returns the current time in the given timezone

Only the last run is returned to the caller.

Error handling

Agents can raise errors, for example when the underlying model fails to generate a response or when there are content moderation issues.

All errors are wrapped in a WorkflowAIError that contains details about what happened. The most interesting fields are:

  • code is a string that identifies the type of error, see the errors.py file for more details
  • message is a human readable message that describes the error

The WorkflowAIError is raised when the agent is called, so you can handle it like any other exception.

try:
    await analyze_call_feedback(
        CallFeedbackInput(
            transcript="[00:01:15] Customer: The product is great!",
            call_date=date(2024, 1, 15)
        )
    )
except WorkflowAIError as e:
    print(e.code)
    print(e.message)

Recoverable errors

Sometimes, the LLM outputs an object that is partially valid, good examples are:

  • the model context window was exceeded during the generation
  • the model decided that a tool call result was a failure

In this case, an agent that returns an output only will always raise an InvalidGenerationError which subclasses WorkflowAIError.

However, an agent that returns a full run object will try to recover from the error by using the partial output.

run = await agent(input=Input(name="John"))

# The run will have an error
assert run.error is not None

# The run will have a partial output
assert run.output is not None

Defining input and output types

There are some important subtleties when defining input and output types.

Descriptions and examples

Field description and examples are passed to the model and can help stir the output in the right direction. A good use case is to describe a format or style for a string field

# point has no examples or description so the model will be less guided
class BasicFeedbackPoint(BaseModel):
    point: str

# passing the description helps guide the model's output format
class DetailedFeedbackPoint(BaseModel):
    point: str = Field(
        description="A clear, specific point of feedback extracted from the transcript."
    )

# passing examples can help as well
class FeedbackPoint(BaseModel):
    point: str = Field(
        description="A clear, specific point of feedback extracted from the transcript.",
        examples=[
            "Dashboard feature saves significant time on reporting",
            "Export functionality is unstable with large reports"
        ]
    )

Some notes:

  • there are very little use cases for descriptions and examples in the input type. The model will most of the infer from the value that is passed. An example use case is to use the description for fields that can be missing.
  • adding examples that are too numerous or too specific can push the model to restrict the output value

Required versus optional fields

In short, we recommend using default values for most output fields.

Pydantic is by default rather strict on model validation. If there is no default value, the field must be provided. Although the fact that a field is required is passed to the model, the generation can sometimes omit null or empty values.

class CallFeedbackOutputStrict(BaseModel):
    positive_points: List[FeedbackPoint]
    negative_points: List[FeedbackPoint]

@workflowai.agent()
async def analyze_call_feedback_strict(input: CallFeedbackInput) -> CallFeedbackOutputStrict:
    ...

try:
    run = await analyze_call_feedback_strict(
        CallFeedbackInput(
            transcript="[00:01:15] Customer: The product is great!",
            call_date=date(2024, 1, 15)
        )
    )
except WorkflowAIError as e:
    print(e.code) # "invalid_generation" error code means that the generation did not match the schema

class CallFeedbackOutputTolerant(BaseModel):
    positive_points: List[FeedbackPoint] = Field(default_factory=list)
    negative_points: List[FeedbackPoint] = Field(default_factory=list)

@workflowai.agent()
async def analyze_call_feedback_tolerant(input: CallFeedbackInput) -> CallFeedbackOutputTolerant:
    ...

# The invalid_generation is less likely
run = await analyze_call_feedback_tolerant(
    CallFeedbackInput(
        transcript="[00:01:15] Customer: The product is great!",
        call_date=date(2024, 1, 15)
    )
)
if not run.positive_points and not run.negative_points:
    print("No feedback points were generated!")

WorkflowAI automatically retries invalid generations once. If a model outputs an object that does not match the schema, a new generation is triggered with the previous response and the error message as context.

Another reason to prefer optional fields in the output is for streaming. Partial outputs are constructed using BaseModel.model_construct when streaming. If a default value is not provided for a field, fields that are absent will cause AttributeError when queried.

@workflowai.agent()
async def analyze_call_feedback_stream(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
    ...

async for run in analyze_call_feedback_stream(
    CallFeedbackInput(
        transcript="[00:01:15] Customer: The product is great!",
        call_date=date(2024, 1, 15)
    )
):
    print(f"Positive points so far: {len(run.positive_points)}")
    print(f"Negative points so far: {len(run.negative_points)}")

Field properties

Pydantic allows a variety of other validation criteria for fields: minimum, maximum, pattern, etc. This additional criteria are included the JSON Schema that is sent to WorkflowAI, and are sent to the model.

class Input(BaseModel):
    name: str = Field(min_length=3, max_length=10)
    age: int = Field(ge=18, le=100)
    email: str = Field(pattern=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")

These arguments can be used to stir the model in the right direction. The caveat is have a validation that is too strict can lead to invalid generations. In case of an invalid generation:

  • WorkflowAI retries the inference once by providing the model with the invalid output and the validation error
  • if the model still fails to generate a valid output, the run will fail with an InvalidGenerationError. the partial output is available in the partial_output attribute of the InvalidGenerationError
@agent()
def my_agent(_: Input) -> :...

Workflows

For advanced workflow patterns and examples, please refer to the Workflows README for more details.

Contributing

See the CONTRIBUTING.md file for more details.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

workflowai-0.6.0.dev9.tar.gz (40.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

workflowai-0.6.0.dev9-py3-none-any.whl (43.7 kB view details)

Uploaded Python 3

File details

Details for the file workflowai-0.6.0.dev9.tar.gz.

File metadata

  • Download URL: workflowai-0.6.0.dev9.tar.gz
  • Upload date:
  • Size: 40.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for workflowai-0.6.0.dev9.tar.gz
Algorithm Hash digest
SHA256 06e0a507191eb84d3801256a97e70bac581bd95ebd6cba816db7c5ef60cc0310
MD5 7185c76fb8ffe75e8bf0a36a574cf4f6
BLAKE2b-256 6b3a6e3290e95f3a938a135768977ad21c1d701aed36e352ed1b9229e2de0324

See more details on using hashes here.

Provenance

The following attestation bundles were made for workflowai-0.6.0.dev9.tar.gz:

Publisher: publish.yml on WorkflowAI/workflowai-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file workflowai-0.6.0.dev9-py3-none-any.whl.

File metadata

File hashes

Hashes for workflowai-0.6.0.dev9-py3-none-any.whl
Algorithm Hash digest
SHA256 24a0ad14165c04f2fc78cc7180001ad0384a78131f2f6fc60b40a2318ef839bd
MD5 8b8c5bc0b9389791242288e7d4670504
BLAKE2b-256 5cf13afe78153b8f270eaf808899d8d4c89cb379bc21fca4ae6aec522c0d5484

See more details on using hashes here.

Provenance

The following attestation bundles were made for workflowai-0.6.0.dev9-py3-none-any.whl:

Publisher: publish.yml on WorkflowAI/workflowai-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page