Minimal wrapper over OpenAI Agent SDK with method chaining and ChatKit integration
Project description
AF - Agentic Flow Framework
Write agent workflows like regular Python code.
async def research_flow(query: str) -> str:
# Internal thinking - not saved to session
async with af.phase("Research"):
findings = await researcher(query).stream()
# persist=True saves the final response to session
async with af.phase("Analysis", persist=True):
return await analyst(findings).stream()
No graphs. No YAML. No state machines. Just Python.
✨ Why the agentic flow approach?
OpenAI Agents SDK is powerful, but multi-agent workflows get verbose fast:
| Pure SDK (~50 lines) | AF (~15 lines) |
|---|---|
# Manual orchestration
result1 = await Runner.run(
researcher, messages
)
research = result1.final_output
result2 = await Runner.run(
analyst,
[{"role": "user", "content": research}]
)
# No streaming, no phases,
# no session management...
|
async def my_flow(query: str) -> str:
# Internal thinking - not saved to session
async with af.phase("Research"):
research = await researcher(query).stream()
# persist=True saves the final response to session
async with af.phase("Analysis", persist=True):
return await analyst(research).stream()
runner = af.Runner(flow=my_flow)
result = await runner(query)
|
The agentic flow approach gives you:
- Callable agents -
agent(prompt).stream()like PyTorch modules - Workflow phases - Structure and visibility for multi-step processes
- Automatic streaming - Real-time output with zero configuration
- Session injection - Conversation persistence without global state
- ChatKit integration - Production-ready UI streaming
📋 Requirements
- Python 3.12+
- uv (recommended) or pip
- OpenAI API key
📦 Installation
With uv (recommended)
# Install uv if you don't have it
curl -LsSf https://astral.sh/uv/install.sh | sh
# Add AF to your project
uv add ds-agentic-flow
With pip (alternative)
pip install ds-agentic-flow
Set your API key
Create a .env.local file from the example:
cp .env.example .env.local
# Edit .env.local and add your OpenAI API key
Or set it directly in your environment:
export OPENAI_API_KEY="your-api-key"
🚀 Quickstart Code
import agentic_flow as af
# Define agents
researcher = af.Agent(
name="researcher",
instructions="Research the topic thoroughly.",
model="gpt-5.2",
)
writer = af.Agent(
name="writer",
instructions="Write clear, engaging content.",
model="gpt-5.2",
)
# Define workflow as a regular async function
async def blog_flow(topic: str) -> str:
# Internal thinking - not saved to session
async with af.phase("Research"):
research = await researcher(topic).stream()
# persist=True saves the final response to session
async with af.phase("Writing", persist=True):
return await writer(f"Write about: {research}").stream()
# Run
runner = af.Runner(flow=blog_flow)
article = await runner("quantum computing")
💡 Core Concepts
AF is built on three primitives:
- Agent - Callable wrapper around SDK Agent. Returns
ExecutionSpecfor deferred execution. - ExecutionSpec - Lazy specification configured with modifiers (
.stream(),.isolated(),.silent(),.max_turns()) - phase - Context manager for workflow boundaries. Controls session persistence with
persist=True.
For details, see Concepts.
🔄 Patterns
Sequential Pipeline
async def pipeline(input: str) -> str:
# Internal processing - not saved to session
async with af.phase("Extract"):
entities = await extractor(input).stream()
async with af.phase("Enrich"):
enriched = await enricher(entities).stream()
# persist=True saves the final response to session
async with af.phase("Format", persist=True):
return await formatter(enriched).stream()
Conditional Branching
async def smart_process(data: str) -> str:
# Internal classification - not saved to session
async with af.phase("Classify"):
category = await classifier(data).stream()
# persist=True on whichever branch returns the final response
if "technical" in category:
async with af.phase("Technical Analysis", persist=True):
return await tech_analyst(data).stream()
else:
async with af.phase("General Summary", persist=True):
return await summarizer(data).stream()
Iterative Refinement
async def refine(draft: str) -> str:
for i in range(3):
# Internal review - not saved to session
async with af.phase(f"Review #{i+1}"):
feedback = await critic(draft).stream()
if "APPROVED" in feedback:
return draft
# Internal revision - not saved to session
async with af.phase(f"Revise #{i+1}"):
draft = await reviser(f"{draft}\n\nFeedback: {feedback}").stream()
# Note: This pattern returns the draft directly without session save.
# Add a final phase with persist=True if you need to save the result.
return draft
Parallel Execution
import asyncio
async def parallel_analysis(data: str) -> dict:
# persist=True saves the final response to session
# isolated() runs each agent without shared context
async with af.phase("Parallel Analysis", persist=True):
results = await asyncio.gather(
sentiment_agent(data).isolated(),
entity_agent(data).isolated(),
summary_agent(data).isolated(),
)
return {
"sentiment": results[0],
"entities": results[1],
"summary": results[2],
}
🔌 ChatKit Integration
from pathlib import Path
import agentic_flow as af
from agents import SQLiteSession
from chatkit.server import ChatKitServer
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
# Persist data in a known location
DATA_DIR = Path(__file__).parent / "data"
DATA_DIR.mkdir(exist_ok=True)
app = FastAPI()
class MyServer(ChatKitServer):
async def respond(self, thread, item, context):
user_message = item.content[0].text if item else ""
session = SQLiteSession(
session_id=thread.id,
db_path=str(DATA_DIR / "chat.db"),
)
runner = af.Runner(flow=my_flow, session=session)
async for event in af.chatkit.run_with_chatkit_context(
runner, thread, self.store, context, user_message
):
yield event
server = MyServer(store)
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
result = await server.process(await request.body(), {})
return StreamingResponse(result, media_type="text/event-stream")
Each af.phase() automatically creates workflow boundaries for proper reasoning display.
🎮 Try the Demos
First, install sample dependencies:
uv sync --group sample
CLI Quickstart
Experience the core concepts interactively:
uv run --group sample python sample/quickstart.py
Demonstrates:
- Flow & Runner separation
- Declaration vs execution (
agent(prompt)returns spec,awaitexecutes) - Modifiers (
.stream(),.isolated(),.silent(),.max_turns()) - Typed output with Pydantic
Guide TUI
Interactive Textual UI that answers questions about AF:
uv run --group sample python -m sample.guide.cli
Guide Web Server (FastAPI + ChatKit)
Start the backend API:
uv run --group sample uvicorn sample.guide.server:app --reload --port 8000
Guide Frontend (Next.js + ChatKit)
In a separate terminal:
cd sample/guide/frontend
npm install
npm run dev
Visit http://localhost:3000
📚 Documentation
- Concepts - Call-Spec Discipline, ExecutionSpec, Phase, Modifiers
- API Reference - Complete API documentation
- Context Resolution - Advanced context management
For the complete documentation site, visit https://daiichisankyo.github.io/AgenticFlow/
Contributing
# Clone and setup
git clone https://github.com/daiichisankyo/AgenticFlow.git
cd AgenticFlow
uv sync --group dev
# Run tests
uv run pytest
# Run linter
uv run ruff check src/
- Fork the repository
- Create a feature branch
- Make your changes
- Run tests and linter
- Submit a pull request
📄 License
MIT - see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ds_agentic_flow-0.36.tar.gz.
File metadata
- Download URL: ds_agentic_flow-0.36.tar.gz
- Upload date:
- Size: 248.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b97f9f3fcbda06a61c3f209abac48758d9958418229cf9c16899f45f05548797
|
|
| MD5 |
1eacae19b59186de1cbef6fc2ad788ee
|
|
| BLAKE2b-256 |
c3881410d67fe414cd709e461d24385f9caa88cc4b5e5c1f4e4f67a5f9939161
|
File details
Details for the file ds_agentic_flow-0.36-py3-none-any.whl.
File metadata
- Download URL: ds_agentic_flow-0.36-py3-none-any.whl
- Upload date:
- Size: 20.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8a8a0ce5b3d7842b2b5b012cff55a043c6fa058b127c8ae4c9889d3ea58fbd33
|
|
| MD5 |
653a4d46cfe7822c9c047c1d7d47c8b1
|
|
| BLAKE2b-256 |
1d79069f43fa57bf724dee57f9e70cf0c4549a145e75fa3834c9efd4eff1de25
|