Skip to main content

Dynamiq is an orchestration framework for agentic AI and LLM applications

Project description

Dynamiq

Dynamiq is an orchestration framework for agentic AI and LLM applications

Website Release Notes Python 3.10+ License Documentation

Welcome to Dynamiq! 🤖

Dynamiq is your all-in-one Gen AI framework, designed to streamline the development of AI-powered applications. Dynamiq specializes in orchestrating retrieval-augmented generation (RAG) and large language model (LLM) agents.

Getting Started

Ready to dive in? Here's how you can get started with Dynamiq:

Installation

First, let's get Dynamiq installed. You'll need Python, so make sure that's set up on your machine. Then run:

pip install dynamiq

Or build the Python package from the source code:

git clone https://github.com/dynamiq-ai/dynamiq.git
cd dynamiq
poetry install

Documentation

For more examples and detailed guides, please refer to our documentation.

Examples

Simple LLM Flow

Here's a simple example to get you started with Dynamiq:

from dynamiq.nodes.llms.openai import OpenAI
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq import Workflow
from dynamiq.prompts import Prompt, Message

# Define the prompt template for translation
prompt_template = """
Translate the following text into English: {{ text }}
"""

# Create a Prompt object with the defined template
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

# Setup your LLM (Large Language Model) Node
llm = OpenAI(
    id="openai",  # Unique identifier for the node
    connection=OpenAIConnection(api_key="$OPENAI_API_KEY"),  # Connection using API key
    model="gpt-4o",  # Model to be used
    temperature=0.3,  # Sampling temperature for the model
    max_tokens=1000,  # Maximum number of tokens in the output
    prompt=prompt  # Prompt to be used for the model
)

# Create a Workflow object
workflow = Workflow()

# Add the LLM node to the workflow
workflow.flow.add_nodes(llm)

# Run the workflow with the input data
result = workflow.run(
    input_data={
        "text": "Hola Mundo!"  # Text to be translated
    }
)

# Print the result of the translation
print(result.output)

Simple ReAct Agent

An agent that has the access to E2B Code Interpreter and is capable of solving complex coding tasks.

from dynamiq.nodes.llms.openai import OpenAI
from dynamiq.connections import OpenAI as OpenAIConnection, E2B as E2BConnection
from dynamiq.nodes.agents.react import ReActAgent
from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool

# Initialize the E2B tool
e2b_tool = E2BInterpreterTool(
    connection=E2BConnection(api_key="$API_KEY")
)

# Setup your LLM
llm = OpenAI(
    id="openai",
    connection=OpenAIConnection(api_key="$API_KEY"),
    model="gpt-4o",
    temperature=0.3,
    max_tokens=1000,
)

# Create the ReAct agent
agent = ReActAgent(
    name="react-agent",
    llm=llm,
    tools=[e2b_tool],
    role="Senior Data Scientist",
    max_loops=10,
)

# Run the agent with an input
result = agent.run(
    input_data={
        "input": "Add the first 10 numbers and tell if the result is prime.",
    }
)

print(result.output.get("content"))

Multi-agent orchestration

from dynamiq.connections import (OpenAI as OpenAIConnection,
                                 ScaleSerp as ScaleSerpConnection,
                                 E2B as E2BConnection)
from dynamiq.nodes.llms import OpenAI
from dynamiq.nodes.agents.orchestrators.adaptive import AdaptiveOrchestrator
from dynamiq.nodes.agents.orchestrators.adaptive_manager import AdaptiveAgentManager
from dynamiq.nodes.agents.react import ReActAgent
from dynamiq.nodes.agents.reflection import ReflectionAgent
from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool
from dynamiq.nodes.tools.scale_serp import ScaleSerpTool


# Initialize tools
python_tool = E2BInterpreterTool(
    connection=E2BConnection(api_key="$E2B_API_KEY")
)
search_tool = ScaleSerpTool(
    connection=ScaleSerpConnection(api_key="$SCALESERP_API_KEY")
)

# Initialize LLM
llm = OpenAI(
    connection=OpenAIConnection(api_key="$OPENAI_API_KEY"),
    model="gpt-4o",
    temperature=0.1,
)

# Define agents
coding_agent = ReActAgent(
    name="coding-agent",
    llm=llm,
    tools=[python_tool],
    role=("Expert agent with coding skills."
          "Goal is to provide the solution to the input task"
          "using Python software engineering skills."),
    max_loops=15,
)

planner_agent = ReflectionAgent(
    name="planner-agent",
    llm=llm,
    role=("Expert agent with planning skills."
          "Goal is to analyze complex requests"
          "and provide a detailed action plan."),
)

search_agent = ReActAgent(
    name="search-agent",
    llm=llm,
    tools=[search_tool],
    role=("Expert agent with web search skills."
          "Goal is to provide the solution to the input task"
          "using web search and summarization skills."),
    max_loops=10,
)

# Initialize the adaptive agent manager
agent_manager = AdaptiveAgentManager(llm=llm)

# Create the orchestrator
orchestrator = AdaptiveOrchestrator(
    name="adaptive-orchestrator",
    agents=[coding_agent, planner_agent, search_agent],
    manager=agent_manager,
)

# Define the input task
input_task = (
    "Use coding skills to gather data about Nvidia and Intel stock prices for the last 10 years, "
    "calculate the average per year for each company, and create a table. Then craft a report "
    "and add a conclusion: what would have been better if I had invested $100 ten years ago?"
)

# Run the orchestrator
result = orchestrator.run(
    input_data={"input": input_task},
)

# Print the result
print(result.output.get("content"))

RAG - document indexing flow

This workflow takes input PDF files, pre-processes them, converts them to vector embeddings, and stores them in the Pinecone vector database. The example provided is for an existing index in Pinecone. You can find examples for index creation on the docs/tutorials/rag page.

from io import BytesIO

from dynamiq import Workflow
from dynamiq.connections import OpenAI as OpenAIConnection, Pinecone as PineconeConnection
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.writers import PineconeDocumentWriter

rag_wf = Workflow()

# PyPDF document converter
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
rag_wf.flow.add_nodes(converter)  # add node to the DAG

# Document splitter
document_splitter = (
    DocumentSplitter(
        split_by="sentence",
        split_length=10,
        split_overlap=1,
    )
    .inputs(documents=converter.outputs.documents)  # map converter node output to the expected input of the current node
    .depends_on(converter)
)
rag_wf.flow.add_nodes(document_splitter)

# OpenAI vector embeddings
embedder = (
    OpenAIDocumentEmbedder(
        connection=OpenAIConnection(api_key="$OPENAI_API_KEY"),
        model="text-embedding-3-small",
    )
    .inputs(documents=document_splitter.outputs.documents)
    .depends_on(document_splitter)
)
rag_wf.flow.add_nodes(embedder)

# Pinecone vector storage
vector_store = (
    PineconeDocumentWriter(
        connection=PineconeConnection(api_key="$PINECONE_API_KEY"),
        index_name="default",
        dimension=1536,
    )
    .inputs(documents=embedder.outputs.documents)
    .depends_on(embedder)
)
rag_wf.flow.add_nodes(vector_store)

# Prepare input PDF files
file_paths = ["example.pdf"]
input_data = {
    "files": [
        BytesIO(open(path, "rb").read()) for path in file_paths
    ],
    "metadata": [
        {"filename": path} for path in file_paths
    ],
}

# Run RAG indexing flow
rag_wf.run(input_data=input_data)

RAG - document retrieval flow

Simple retrieval RAG flow that searches for relevant documents and answers the original user question using retrieved documents.

from dynamiq import Workflow
from dynamiq.nodes import InputTransformer
from dynamiq.connections import OpenAI as OpenAIConnection, Pinecone as PineconeConnection
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.retrievers import PineconeDocumentRetriever
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Message, Prompt

# Initialize the RAG retrieval workflow
retrieval_wf = Workflow()

# Shared OpenAI connection
openai_connection = OpenAIConnection(api_key="$OPENAI_API_KEY")

# OpenAI text embedder for query embedding
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)
retrieval_wf.flow.add_nodes(embedder)

# Pinecone document retriever
document_retriever = (
    PineconeDocumentRetriever(
        connection=PineconeConnection(api_key="$PINECONE_API_KEY"),
        index_name="default",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)
    .depends_on(embedder)
)
retrieval_wf.flow.add_nodes(document_retriever)

# Define the prompt template
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}

"""

# OpenAI LLM for answer generation
prompt = Prompt(messages=[Message(content=prompt_template, role="user")])

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=prompt,
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )  # take documents from the vector store node and query from the embedder
    .depends_on([document_retriever, embedder])
)
retrieval_wf.flow.add_nodes(answer_generator)

# Run the RAG retrieval flow
question = "What are the line intems provided in the invoice?"
result = retrieval_wf.run(input_data={"query": question})

answer = result.output.get(answer_generator.id).get("output", {}).get("content")
print(answer)

Simple Chatbot with Memory

A simple chatbot that uses the Memory module to store and retrieve conversation history.

from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.memory import Memory
from dynamiq.memory.backends.in_memory import InMemory
from dynamiq.nodes.agents.simple import SimpleAgent
from dynamiq.nodes.llms import OpenAI

AGENT_ROLE = "helpful assistant, goal is to provide useful information and answer questions"
llm = OpenAI(
    connection=OpenAIConnection(api_key="$OPENAI_API_KEY"),
    model="gpt-4o",
    temperature=0.1,
)

memory = Memory(backend=InMemory())
agent = SimpleAgent(
    name="Agent",
    llm=llm,
    role=AGENT_ROLE,
    id="agent",
    memory=memory,
)


def main():
    print("Welcome to the AI Chat! (Type 'exit' to end)")
    while True:
        user_input = input("You: ")
        if user_input.lower() == "exit":
            break

        response = agent.run({"input": user_input})
        response_content = response.output.get("content")
        print(f"AI: {response_content}")


if __name__ == "__main__":
    main()

Graph Orchestrator

Graph Orchestrator allows to create any architecture tailored to specific use cases. Example of simple workflow that manages iterative process of feedback and refinement of email.

from typing import Any

from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes.agents.orchestrators.graph import END, START, GraphOrchestrator
from dynamiq.nodes.agents.orchestrators.graph_manager import GraphAgentManager
from dynamiq.nodes.agents.simple import SimpleAgent
from dynamiq.nodes.llms import OpenAI

llm = OpenAI(
    connection=OpenAIConnection(api_key="$OPENAI_API_KEY")
    model="gpt-4o",
    temperature=0.1,
)

email_writer = SimpleAgent(
    name="email-writer-agent",
    llm=llm,
    role="Write personalized emails taking into account feedback.",
)


def gather_feedback(context: dict[str, Any]):
    """Gather feedback about email draft."""
    feedback = input(
        f"Email draft:\n"
        f"{context['messages'][-1]['content']}\n"
        f"Type in SEND to send email, CANCEL to exit, or provide feedback to refine email: \n"
    )

    reiterate = True

    result = f"Gathered feedback: {feedback}"
    if feedback == "SEND":
        print("####### Email was sent! #######")
        result = "Email was sent!"
        reiterate = False
    elif feedback == "CANCEL":
        print("####### Email was canceled! #######")
        result = "Email was canceled!"
        reiterate = False

    return {"result": result, "reiterate": reiterate}


def router(context: dict[str, Any]):
    """Determines next state based on provided feedback."""
    if context.get("reiterate", False):
        return "generate_sketch"

    return END


orchestrator = GraphOrchestrator(
    name="Graph orchestrator",
    manager=GraphAgentManager(llm=llm),
)

orchestrator.add_state_by_tasks("generate_sketch", [email_writer])
orchestrator.add_state_by_tasks("gather_feedback", [gather_feedback])

orchestrator.add_edge(START, "generate_sketch")
orchestrator.add_edge("generate_sketch", "gather_feedback")

orchestrator.add_conditional_edge("gather_feedback", ["generate_sketch", END], router)


if __name__ == "__main__":
    print("Welcome to email writer.")
    email_details = input("Provide email details: ")
    orchestrator.run(input_data={"input": f"Write and post email: {email_details}"})

Contributing

We love contributions! Whether it's bug reports, feature requests, or pull requests, head over to our CONTRIBUTING.md to see how you can help.

License

Dynamiq is open-source and available under the Apache 2 License.

Happy coding! 🚀

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamiq-0.7.0.tar.gz (209.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamiq-0.7.0-py3-none-any.whl (323.0 kB view details)

Uploaded Python 3

File details

Details for the file dynamiq-0.7.0.tar.gz.

File metadata

  • Download URL: dynamiq-0.7.0.tar.gz
  • Upload date:
  • Size: 209.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.8 Linux/6.8.0-1017-azure

File hashes

Hashes for dynamiq-0.7.0.tar.gz
Algorithm Hash digest
SHA256 922f7f04755c73bf96d2f54d2eb251f3c7adbadf3e920c76d3fd80be4300cc9c
MD5 e31b429104d27753ccc39d9243a8b79c
BLAKE2b-256 9c98c0d5425855a627ddbcd7e3a9f98d97951d056dec38e64c9e75c66058b2a1

See more details on using hashes here.

File details

Details for the file dynamiq-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: dynamiq-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 323.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.8 Linux/6.8.0-1017-azure

File hashes

Hashes for dynamiq-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e15d1e0eff2817fcb70db32f2c0faf2a4502ade7db1c73d8e0780aad9beb2d4e
MD5 e7c893d522021666bd6d6ebad4d8939c
BLAKE2b-256 b1349d556c569bd5a47c511f0f751bbd853ed5ff82c27cfcdefbc4fe24901910

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page