Production-ready orchestration for AI agents, built with Pydantic.
Project description
Beyond Agents: Build AI Employees.
TL;DR Flujo is a Python framework that turns AI agents into production‑grade digital employees—with persistent memory, proactive budgeting, and continuous learning.
Approaching v1.0 Core APIs (
@step,>>,Flujo, state back‑ends) are stable. Pin your version and watch the changelog.
Flexible Licensing Dual‑licensed AGPL‑3.0 / Commercial. See
LICENSEfor details.
Meet Flujo — The Framework for AI Employees That Deliver
Moving AI from prototype to production shouldn't mean chaos. Flujo provides the framework to build AI workflows as reliable, accountable, and effective as your best employees.
Give Your AI Agents the Traits of a Perfect Employee
✅ Never Forgets (Durability)
- Problem: AI workflows often crash and lose progress.
- Flujo's Solution: Automatically save state with built‑in SQLite, resuming exactly where tasks left off.
✅ Keeps Spending in Check (Governance)
- Problem: AI processes can overspend unpredictably.
- Flujo's Solution: Set strict usage limits (e.g.,
$0.50per run); proactive cost guards halt execution before you overspend.
✅ Learns from Mistakes (Improvement Loop)
- Problem: Debugging AI is slow and manual.
- Flujo's Solution:
flujo improveanalyzes failures and auto‑generates concrete prompt and config suggestions.
✅ Knows When to Escalate (Safety Rails)
- Problem: AI can't handle every edge‑case alone.
- Flujo's Solution: Route edge cases to human approval with
Step.branch_onandStep.human_in_the_loop.
✅ Communicates Clearly (Observability)
- Problem: AI tasks are a black box until they fail.
- Flujo's Solution: Get real‑time updates via event hooks and full run histories with the
flujo lensCLI.
Simple Python Workflow, Powerful Results
from flujo import step, Flujo, Step, make_agent_async
@step
async def validate_input(text: str) -> str:
if not text:
raise ValueError("Input required.")
return text
summariser = make_agent_async(
model="openai:gpt-4o-mini",
system_prompt="You are an expert summariser.",
output_type=str,
)
pipeline = validate_input >> Step.model_validate({"name": "Summarise", "agent": summariser})
print(Flujo(pipeline).run("Flujo is...").step_history[-1].output)
The Flujo Advantage
- ✅ Build Autonomous Systems: Compose agents that handle routine work and escalate edge cases with
Step.human_in_the_loop. - ✅ Run with Production-Grade Efficiency: Execute tasks concurrently with
Step.parallel, eliminate redundant work withStep.cached, and rely on a high‑performance runtime. - ✅ Maintain Full Accountability: Get a complete, persistent history of every run. Use the
flujo lensCLI to trace decisions and debug failures. - ✅ Integrate with Your Stack:
@stepturns anyasyncPython code into a durable workflow component. Event hooks connect Flujo to your existing monitoring and notification tools.
Showcase: A Stateful, Budget‑Aware AI Financial Analyst
This example builds a multi‑agent workflow that analyzes a list of companies, persists its state to SQLite, and halts if the total cost exceeds a 15¢ budget.
# examples/financial_analyst.py
from flujo import (
Flujo, Step, step, UsageLimits, make_agent_async
)
from flujo.state import SQLiteBackend
from flujo.domain.models import PipelineContext
from flujo.domain.dsl.step import adapter_step
from pydantic import Field
# 1️⃣ Define a shared context ("memory") for the run
class MarketCtx(PipelineContext):
companies: list[str] = Field(default_factory=list)
# 2️⃣ Define steps using Python code and AI agents
@step
async def fetch_data(company: str) -> dict:
return {"company": company, "data": f"Financial info for {company}"}
@adapter_step
async def select_data(fetched: dict) -> str:
return fetched.get("data", "")
summariser = make_agent_async(
model="openai:gpt-4o-mini",
system_prompt="Summarise the financial data point.",
output_type=str,
)
# 3️⃣ Compose steps into a high‑level workflow
analysis_flow = fetch_data >> select_data >> Step.model_validate({
"name": "Summarise",
"agent": summariser
})
pipeline = Step.map_over(
"AnalyseAllCompanies",
analysis_flow,
iterable_input="companies",
)
# 4️⃣ Execute the workflow with durability and governance
runner = Flujo(
pipeline,
context_model=MarketCtx,
state_backend=SQLiteBackend("financial_reports.db"),
usage_limits=UsageLimits(total_cost_usd_limit=0.15),
)
result = runner.run(
initial_input=None,
initial_context_data={"companies": ["Alpha", "Beta", "Gamma"]},
)
print(result.step_history[-1].output)
This example is fully runnable. Copy it to a file and run with your
OPENAI_API_KEY.
Flujo vs Alternatives
| Feature | Flujo | LangChain / LangGraph | Crew AI | n8n / Make |
|---|---|---|---|---|
| Built‑in Persistent State | ✅ SQLite | ⚠️ External store | ⚠️ External | 🔒 Hidden SaaS |
| Proactive Cost Governor | ✅ | ❌ | ❌ | ❌ |
| Self‑Improvement Loop | ✅ flujo improve |
⚠️ Observe‑only | ❌ | ❌ |
| Self‑Hosting Friendly | ✅ | ⚠️ Needs infra | ⚠️ Needs DB | ❌ |
| Licence | AGPL / Commercial | MIT | MIT | Proprietary |
Roadmap — Reliability at Scale
| Capability | Status |
|---|---|
| ✅ Persistent & Durable | Done |
| ✅ Budget Controls | Done |
| ✅ Parallel & Caching | Done |
| ✅ Conditional Routing | Done |
| 🟡 Notifications & Hooks | In Progress |
| 🗺️ Security & Compliance | Upcoming |
Quick Start (60 seconds)
For Users
pip install flujo
echo '
from flujo import step
@step
async def hello(name: str) -> str:
return f"Hello, {name}!"
# The `flujo run` CLI looks for a top‑level variable named "pipeline"
pipeline = hello
' > hello_pipeline.py
flujo run hello_pipeline.py --input "Flujo"
Expected output:
Hello, Flujo!
For Developers
# Clone and install with robust verification
git clone https://github.com/aandresalvarez/flujo.git
cd flujo
make install-robust
# Run tests
make test
# Try the quickstart example
python examples/00_quickstart.py
See INSTALLATION.md for comprehensive setup instructions.
Get Involved
- 📖 Documentation — Guides, tutorials, API reference
- 💰 Budget-Aware Workflows — Production cost control with loops and parallel execution
- 🤝 Contribute — Join the community and shape Flujo's future
Licensing
Flexible AGPL‑3.0 / Commercial. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flujo-0.4.35.tar.gz.
File metadata
- Download URL: flujo-0.4.35.tar.gz
- Upload date:
- Size: 509.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8321d7233d34c687455fbb96ef36e234140118d93b5e9926c7d183f77ea9d1f6
|
|
| MD5 |
0d92a3240090bf768de6e32b0db5c9bc
|
|
| BLAKE2b-256 |
f9b9632b7e545eda1c4002690ddd65e7e99304bf77219cbabb29707675994a81
|
Provenance
The following attestation bundles were made for flujo-0.4.35.tar.gz:
Publisher:
release.yml on aandresalvarez/flujo
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flujo-0.4.35.tar.gz -
Subject digest:
8321d7233d34c687455fbb96ef36e234140118d93b5e9926c7d183f77ea9d1f6 - Sigstore transparency entry: 294940695
- Sigstore integration time:
-
Permalink:
aandresalvarez/flujo@57dbdaac344962bc9c66064216797def449365be -
Branch / Tag:
refs/tags/v0.4.35 - Owner: https://github.com/aandresalvarez
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@57dbdaac344962bc9c66064216797def449365be -
Trigger Event:
push
-
Statement type:
File details
Details for the file flujo-0.4.35-py3-none-any.whl.
File metadata
- Download URL: flujo-0.4.35-py3-none-any.whl
- Upload date:
- Size: 186.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e110a70a6f9837080855547b4a8c72f574dca4a7e664680cbeef2a667fd18727
|
|
| MD5 |
f06242c4c5f424a98bff9bfca1773726
|
|
| BLAKE2b-256 |
21315e30917f559e56eed0da9a579d4297a7aa1b0c38b10e9263bc7e522ccd42
|
Provenance
The following attestation bundles were made for flujo-0.4.35-py3-none-any.whl:
Publisher:
release.yml on aandresalvarez/flujo
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flujo-0.4.35-py3-none-any.whl -
Subject digest:
e110a70a6f9837080855547b4a8c72f574dca4a7e664680cbeef2a667fd18727 - Sigstore transparency entry: 294940700
- Sigstore integration time:
-
Permalink:
aandresalvarez/flujo@57dbdaac344962bc9c66064216797def449365be -
Branch / Tag:
refs/tags/v0.4.35 - Owner: https://github.com/aandresalvarez
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@57dbdaac344962bc9c66064216797def449365be -
Trigger Event:
push
-
Statement type: