Skip to main content

Minimal wrapper over OpenAI Agent SDK with method chaining and ChatKit integration

Project description

AF - Agentic Flow Framework

PyPI Python 3.12+ License: MIT Docs

Write agent workflows like regular Python code.

async def research_flow(query: str) -> str:
    # Internal thinking - not saved to session
    async with af.phase("Research"):
        findings = await researcher(query).stream()

    # persist=True saves the final response to session
    async with af.phase("Analysis", persist=True):
        return await analyst(findings).stream()

No graphs. No YAML. No state machines. Just Python.


✨ Why the agentic flow approach?

OpenAI Agents SDK is powerful, but multi-agent workflows get verbose fast:

Pure SDK (~50 lines) AF (~15 lines)
# Manual orchestration
result1 = await Runner.run(
    researcher, messages
)
research = result1.final_output

result2 = await Runner.run(
    analyst,
    [{"role": "user", "content": research}]
)
# No streaming, no phases,
# no session management...
async def my_flow(query: str) -> str:
    # Internal thinking - not saved to session
    async with af.phase("Research"):
        research = await researcher(query).stream()

    # persist=True saves the final response to session
    async with af.phase("Analysis", persist=True):
        return await analyst(research).stream()

runner = af.Runner(flow=my_flow)
result = await runner(query)

The agentic flow approach gives you:

  • Callable agents - agent(prompt).stream() like PyTorch modules
  • Workflow phases - Structure and visibility for multi-step processes
  • Automatic streaming - Real-time output with zero configuration
  • Session injection - Conversation persistence without global state
  • ChatKit integration - Production-ready UI streaming

📋 Requirements

  • Python 3.12+
  • uv (recommended) or pip
  • OpenAI API key

📦 Installation

With uv (recommended)

# Install uv if you don't have it
curl -LsSf https://astral.sh/uv/install.sh | sh

# Add AF to your project
uv add ds-agentic-flow

With pip (alternative)

pip install ds-agentic-flow

Set your API key

Create a .env.local file from the example:

cp .env.example .env.local
# Edit .env.local and add your OpenAI API key

Or set it directly in your environment:

export OPENAI_API_KEY="your-api-key"

🚀 Quickstart Code

import agentic_flow as af

# Define agents
researcher = af.Agent(
    name="researcher",
    instructions="Research the topic thoroughly.",
    model="gpt-5.2",
)

writer = af.Agent(
    name="writer",
    instructions="Write clear, engaging content.",
    model="gpt-5.2",
)

# Define workflow as a regular async function
async def blog_flow(topic: str) -> str:
    # Internal thinking - not saved to session
    async with af.phase("Research"):
        research = await researcher(topic).stream()

    # persist=True saves the final response to session
    async with af.phase("Writing", persist=True):
        return await writer(f"Write about: {research}").stream()

# Run
runner = af.Runner(flow=blog_flow)
article = await runner("quantum computing")

💡 Core Concepts

AF is built on three primitives:

  • Agent - Callable wrapper around SDK Agent. Returns ExecutionSpec for deferred execution.
  • ExecutionSpec - Lazy specification configured with modifiers (.stream(), .isolated(), .silent(), .max_turns())
  • phase - Context manager for workflow boundaries. Controls session persistence with persist=True.

For details, see Concepts.


🔄 Patterns

Sequential Pipeline

async def pipeline(input: str) -> str:
    # Internal processing - not saved to session
    async with af.phase("Extract"):
        entities = await extractor(input).stream()

    async with af.phase("Enrich"):
        enriched = await enricher(entities).stream()

    # persist=True saves the final response to session
    async with af.phase("Format", persist=True):
        return await formatter(enriched).stream()

Conditional Branching

async def smart_process(data: str) -> str:
    # Internal classification - not saved to session
    async with af.phase("Classify"):
        category = await classifier(data).stream()

    # persist=True on whichever branch returns the final response
    if "technical" in category:
        async with af.phase("Technical Analysis", persist=True):
            return await tech_analyst(data).stream()
    else:
        async with af.phase("General Summary", persist=True):
            return await summarizer(data).stream()

Iterative Refinement

async def refine(draft: str) -> str:
    for i in range(3):
        # Internal review - not saved to session
        async with af.phase(f"Review #{i+1}"):
            feedback = await critic(draft).stream()

        if "APPROVED" in feedback:
            return draft

        # Internal revision - not saved to session
        async with af.phase(f"Revise #{i+1}"):
            draft = await reviser(f"{draft}\n\nFeedback: {feedback}").stream()

    # Note: This pattern returns the draft directly without session save.
    # Add a final phase with persist=True if you need to save the result.
    return draft

Parallel Execution

import asyncio

async def parallel_analysis(data: str) -> dict:
    # persist=True saves the final response to session
    # isolated() runs each agent without shared context
    async with af.phase("Parallel Analysis", persist=True):
        results = await asyncio.gather(
            sentiment_agent(data).isolated(),
            entity_agent(data).isolated(),
            summary_agent(data).isolated(),
        )

    return {
        "sentiment": results[0],
        "entities": results[1],
        "summary": results[2],
    }

🔌 ChatKit Integration

from pathlib import Path
import agentic_flow as af
from agents import SQLiteSession
from chatkit.server import ChatKitServer
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

# Persist data in a known location
DATA_DIR = Path(__file__).parent / "data"
DATA_DIR.mkdir(exist_ok=True)

app = FastAPI()


class MyServer(ChatKitServer):
    async def respond(self, thread, item, context):
        user_message = item.content[0].text if item else ""
        session = SQLiteSession(
            session_id=thread.id,
            db_path=str(DATA_DIR / "chat.db"),
        )
        runner = af.Runner(flow=my_flow, session=session)
        async for event in af.chatkit.run_with_chatkit_context(
            runner, thread, self.store, context, user_message
        ):
            yield event


server = MyServer(store)


@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
    result = await server.process(await request.body(), {})
    return StreamingResponse(result, media_type="text/event-stream")

Each af.phase() automatically creates workflow boundaries for proper reasoning display.


🎮 Try the Demos

First, install sample dependencies:

uv sync --group sample

CLI Quickstart

Experience the core concepts interactively:

uv run --group sample python sample/quickstart.py

Demonstrates:

  1. Flow & Runner separation
  2. Declaration vs execution (agent(prompt) returns spec, await executes)
  3. Modifiers (.stream(), .isolated(), .silent(), .max_turns())
  4. Typed output with Pydantic

Guide TUI

Interactive Textual UI that answers questions about AF:

uv run --group sample python -m sample.guide.cli

Guide Web Server (FastAPI + ChatKit)

Start the backend API:

uv run --group sample uvicorn sample.guide.server:app --reload --port 8000

Guide Frontend (Next.js + ChatKit)

In a separate terminal:

cd sample/guide/frontend
npm install
npm run dev

Visit http://localhost:3000


📚 Documentation

For the complete documentation site, visit https://daiichisankyo.github.io/AgenticFlow/


Contributing

# Clone and setup
git clone https://github.com/daiichisankyo/AgenticFlow.git
cd AgenticFlow
uv sync --group dev

# Run tests
uv run pytest

# Run linter
uv run ruff check src/
  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Run tests and linter
  5. Submit a pull request

📄 License

MIT - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ds_agentic_flow-0.36.tar.gz (248.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ds_agentic_flow-0.36-py3-none-any.whl (20.3 kB view details)

Uploaded Python 3

File details

Details for the file ds_agentic_flow-0.36.tar.gz.

File metadata

  • Download URL: ds_agentic_flow-0.36.tar.gz
  • Upload date:
  • Size: 248.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for ds_agentic_flow-0.36.tar.gz
Algorithm Hash digest
SHA256 b97f9f3fcbda06a61c3f209abac48758d9958418229cf9c16899f45f05548797
MD5 1eacae19b59186de1cbef6fc2ad788ee
BLAKE2b-256 c3881410d67fe414cd709e461d24385f9caa88cc4b5e5c1f4e4f67a5f9939161

See more details on using hashes here.

File details

Details for the file ds_agentic_flow-0.36-py3-none-any.whl.

File metadata

  • Download URL: ds_agentic_flow-0.36-py3-none-any.whl
  • Upload date:
  • Size: 20.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for ds_agentic_flow-0.36-py3-none-any.whl
Algorithm Hash digest
SHA256 8a8a0ce5b3d7842b2b5b012cff55a043c6fa058b127c8ae4c9889d3ea58fbd33
MD5 653a4d46cfe7822c9c047c1d7d47c8b1
BLAKE2b-256 1d79069f43fa57bf724dee57f9e70cf0c4549a145e75fa3834c9efd4eff1de25

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page