Skip to main content

No project description provided

Project description

Synth Machine

python tests pre-commit

AI Agents are State Machines not DAGs

Synth Machines lets users create and run AI agent state machines (Synth) by providing a SynthDefinition to define a structured AI workflow.
State machines are a powerful construct as they enable a domain expert to deconstruct the problem into sets of states and transitions.
Transitions between states can then call an LLM, tool, data process or a mixture of many outputs.

Installation

API Models

Install the package. pip install synth_machine[openai,togetherai,anthropic] or poetry add synth_machine[openai,togetherai,anthropic]

Add either setup your API provider environment keys for which

# You only need to set the API providers you want to use.
export OPENAI_API_KUY=secret
export ANTHROPIC_API_KEY=secret
export TOGETHER_API_KEY=secret

(soon) Local Models

pip install synth_machine[vllm,llamacpp] or poetry add synth_machine[vllm,llamacpp]

You will likely need to setup CUDA, VLLM or Llama.cpp for local use.

Helpful links:

Define a Synth

agent = Synth(
    config: dict[SynthDefinition], # Synth state machine defining states, transitions and prompts.
    tools=[], # List of tools the agent will use
    memory={}, # Any existing memory to add on top of any model_config.initial_memory
    rag_runner: Optional[RAG] = None # Define a RAG integration for your agent.
    postprocess_functions = [] # Any glue code functions
    store : ObjectStore = ObjectStore(":memory:") # Any files created by tools will automatically go to you object store    

The SynthDefinition can be found in SynthDefinition Docs or synth_machine/synth_definition.py. The Pydantic BaseModels which make up SynthDefinition will be the most accurate representation of a Synth.
We expect the specification to have updates between major versions.

Agent state and possible triggers

At any point, you can check the current state and next triggers

# Check state
agent.current_state()

# Triggers
agent.interfaces_for_available_triggers()

Run a Synth

Batch

await agent.trigger(
    "[trigger_name]",
    params={
        "input_1": "hello"
    }
)

Batch transition calls will output any output variable generated in that transition.

Streaming

await agent.streaming_trigger(
    "[trigger_name]",
    params={
        "input_1": "hello"
    }
)

Streaming responses yield any of the following events:

class YieldTasks(StrEnum):
    CHUNK = "CHUNK"
    MODEL_CONFIG = "MODEL_CONFIG"
    SET_MEMORY = "SET_MEMORY"
    SET_ACTIVE_OUTPUT = "SET_ACTIVE_OUTPUT"

  • CHUNK : LLM generations are sent by chunks one token at a time.
  • MODEL_CONFIG : Yields which executor is currently being used for any provider specific frontend interfaces.
  • SET_MEMORP : Sends events setting new memory variables
  • SET_ACTIVE_OUTPUT : Yields the current transition output trigger.

This lets users experiment using trigger and then integrate to real time stream LLM generations to users using Server Side Events (SSE) and trigger_streaming.

LLMs

We offer multiple executors to generate local or API driven LLM chat completions.

API Models

Local (soon)

Model Config

You can specify the provider and model in either default-model-config and the synth base or model_config on transition output.

ModelConfig:
...
executor: [openai|togetherai|anthropic|vllm|llamacpp]
llm_name: [model_name]

Memory

Agent memory is a dictionary containing all interim variables creates in previous states and human / system inputs.

agent.memory
# -> {
#   "[memory_key]": [memory_value]
# }

Tools

Postprocess functions should only be used for basic glue code, all major functionality should be built into Tools.

Tools are RestAPIs and can be added by providing a JSON API schema

Go to "./tools/tofuTool/api.py to view the functionality.

Start API

cd tools/tofuTool
poetry install
poetry run uvicorn api:app --port=5001 --reload

Retrieve API spec

curl -X GET http://localhost:5001/openapi.json > openapi_schema.json

Define Tool

You can define a Tool as such with only the name, API endpoint and tool openapi schema.

tofu_tool = Tool(
    name="tofu_tool",
    api_endpoint="http://localhost:5001",
    api_spec=tool_spec
)

Synth Machine RAG

Retrieval augemented generation is a powerful tool to improve LLM responses by providing semantically similar examples or exerts to the material the LLM is attempting to generate.

synth_machine is flexibly in such that as long as you inherit from synth_machine.RAG and create:

  • embed(documents: List[str]) and
  • query(prompt: str, rag_config: Optional[synth_machine.RAGConfig])

It is easy to integrate multiple providers and vector databases. Over time there will be supported and community RAG implementations across a wide variety of embeddings providers and vector databases.

RAG Example Qdrant & FastEmbed

The following RAG class is ideal for experimenting with local RAG setups on CPU.

pip install qdrant-client, fastembed

Define RAG class

from synth_machine.rag import RAG
from qdrant_client import AsyncQdrantClient
from fastembed import TextEmbedding
from typing import List, Optional
from qdrant_client.models import Distance, VectorParams, PointStruct


class Qdrant(RAG):

    """
    VectorDB: Qdrant - https://github.com/qdrant/qdrant
    Embeddings: FastEmbed - https://github.com/qdrant/fastembed

    This provides fast and lightweight on-device CPU embeddings creation and 
    similarity search using Qdrant in memory.
    """
    
    def __init__(
        self,
        collection_name: str,
        embedding_model: str="BAAI/bge-small-en-v1.5",
        embedding_dimensions: int=384,
        embedding_threads: int=-1,
        qdrant_location: str=":memory:",
    ):
        self.embedding_model = TextEmbedding(
            model_name=embedding_model,
            threads=embedding_threads
        )
        self.embedding_dimensions = embedding_dimensions
        self.qdrant = AsyncQdrantClient(qdrant_location)
        self.collection_name = collection_name
    
    async def create_collection(self) -> bool:
        if await self.qdrant.collection_exists(self.collection_name):
            return True
        else:
            return await self.qdrant.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.embedding_dimensions, # maps to 'BAAI/bge-small-en-v1.5' model dimensions
                    distance=Distance.COSINE
                ) 
            )
    
    async def embed(self, documents: List[str], metadata: Optional[List[dict]]=None):
        if metadata and len(documents) != len(metadata):
            raise ValueError("documents and metadata must be the same length")
        embedding_list = list(
            self.embedding_model.embed(documents)
        )
        upsert_response = await self.qdrant.upsert(
            collection_name=self.collection_name,
            points=[
                PointStruct(
                    id=i,
                    vector=list(vector),
                    payload=metadata[i]
                )
                for i, vector in enumerate(embedding_list)
            ]
        )
        return upsert_response.status
        

    async def query(self, prompt: str, rag_config: RAGConfig) -> List[dict]:
        embedding = next(self.embedding_model.embed([prompt]))
            
        similar_responses = await self.qdrant.search(
            collection_name=self.collection_name,
            query_vector=embedding,
            limit=rag_config.n
        )
        return [
            point.payload for point in similar_responses
        ]

Now initiate the Qdrant class and provide when defining Synth.

qdrant = Qdrant(collection_name="tofu_examples")
await qdrant.create_collection()

agent = Synth(
    ...
    rag_runner=Qdrant
)

Store

Tools can return a variety of different objects. Any file created by a tool will automatically go to your agent.store. We use ObjectStore for file storage, with ObjectStore(":memory:")as the default.

To retrieve a file: agent.store.get(file_name)

ObjectStore allowing easy integration to:

  • Local file store
  • S3
  • GCS
  • Azure

Example GCS object store

from synth_machine.machine import ObjectStore

agent = Agent(
    ...
    store=ObjectStore("gs://[bucket_name]/[prefix]))
)

User Defined Functions

Any custom functionality can be defined as a user defined function (UDF).
These take Synth.memoryas input and allows you to run custom functionality as part of the synth-machine.

# Define postprocess function

from synth_machine.user_defined_functions import udf

@udf
def abc_postprocesss(memory):
    ...
    return memory["variable_key"]

agent = Synth(
  ...
  user_defined_functions = {
    "abc": abc_postprocess
  }
)

Example UDF Transition Config

...
- key: trigger_udf
  inputs:
    - key: variable_key
  outputs:
    - key: example_udf
      udf: abc

Note: Any non trivial functionality should be a tool and not UDF.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

synth_machine-0.4.0.tar.gz (32.5 kB view hashes)

Uploaded Source

Built Distribution

synth_machine-0.4.0-py3-none-any.whl (35.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page