Skip to main content

Production-ready orchestration for AI agents, built with Pydantic.

Project description

Flujo logo

Beyond Agents: Build AI Employees.


TL;DR Flujo is a Python framework that turns AI agents into production‑grade digital employees—with persistent memory, proactive budgeting, and continuous learning.

Approaching v1.0 Core APIs (@step, >>, Flujo, state back‑ends) are stable. Pin your version and watch the changelog.

Flexible Licensing Dual‑licensed AGPL‑3.0 / Commercial. See LICENSE for details.


Meet Flujo — The Framework for AI Employees That Deliver

Moving AI from prototype to production shouldn't mean chaos. Flujo provides the framework to build AI workflows as reliable, accountable, and effective as your best employees.

Give Your AI Agents the Traits of a Perfect Employee

✅ Never Forgets (Durability)

  • Problem: AI workflows often crash and lose progress.
  • Flujo's Solution: Automatically save state with built‑in SQLite, resuming exactly where tasks left off.

✅ Keeps Spending in Check (Governance)

  • Problem: AI processes can overspend unpredictably.
  • Flujo's Solution: Set strict usage limits (e.g., $0.50 per run); proactive cost guards halt execution before you overspend.

✅ Learns from Mistakes (Improvement Loop)

  • Problem: Debugging AI is slow and manual.
  • Flujo's Solution: flujo improve analyzes failures and auto‑generates concrete prompt and config suggestions.

✅ Knows When to Escalate (Safety Rails)

  • Problem: AI can't handle every edge‑case alone.
  • Flujo's Solution: Route edge cases to human approval with Step.branch_on and Step.human_in_the_loop.

✅ Communicates Clearly (Observability)

  • Problem: AI tasks are a black box until they fail.
  • Flujo's Solution: Get real‑time updates via event hooks and full run histories with the flujo lens CLI.

Simple Python Workflow, Powerful Results

from flujo import step, Flujo, Step, make_agent_async

@step
async def validate_input(text: str) -> str:
    if not text:
        raise ValueError("Input required.")
    return text

summariser = make_agent_async(
    model="openai:gpt-4o-mini",
    system_prompt="You are an expert summariser.",
    output_type=str,
)

pipeline = validate_input >> Step.model_validate({"name": "Summarise", "agent": summariser})

print(Flujo(pipeline).run("Flujo is...").step_history[-1].output)

The Flujo Advantage

  • Build Autonomous Systems: Compose agents that handle routine work and escalate edge cases with Step.human_in_the_loop.
  • Run with Production-Grade Efficiency: Execute tasks concurrently with Step.parallel, eliminate redundant work with Step.cached, and rely on a high‑performance runtime.
  • Maintain Full Accountability: Get a complete, persistent history of every run. Use the flujo lens CLI to trace decisions and debug failures.
  • Integrate with Your Stack: @step turns any async Python code into a durable workflow component. Event hooks connect Flujo to your existing monitoring and notification tools.

Showcase: A Stateful, Budget‑Aware AI Financial Analyst

This example builds a multi‑agent workflow that analyzes a list of companies, persists its state to SQLite, and halts if the total cost exceeds a 15¢ budget.

# examples/financial_analyst.py
from flujo import (
    Flujo, Step, step, UsageLimits, make_agent_async
)
from flujo.state import SQLiteBackend
from flujo.domain.models import PipelineContext
from flujo.domain.dsl.step import adapter_step
from pydantic import Field

# 1️⃣ Define a shared context ("memory") for the run
class MarketCtx(PipelineContext):
    companies: list[str] = Field(default_factory=list)

# 2️⃣ Define steps using Python code and AI agents
@step
async def fetch_data(company: str) -> dict:
    return {"company": company, "data": f"Financial info for {company}"}

@adapter_step
async def select_data(fetched: dict) -> str:
    return fetched.get("data", "")

summariser = make_agent_async(
    model="openai:gpt-4o-mini",
    system_prompt="Summarise the financial data point.",
    output_type=str,
)

# 3️⃣ Compose steps into a high‑level workflow
analysis_flow = fetch_data >> select_data >> Step.model_validate({
    "name": "Summarise",
    "agent": summariser
})

pipeline = Step.map_over(
    "AnalyseAllCompanies",
    analysis_flow,
    iterable_input="companies",
)

# 4️⃣ Execute the workflow with durability and governance
runner = Flujo(
    pipeline,
    context_model=MarketCtx,
    state_backend=SQLiteBackend("financial_reports.db"),
    usage_limits=UsageLimits(total_cost_usd_limit=0.15),
)

result = runner.run(
    initial_input=None,
    initial_context_data={"companies": ["Alpha", "Beta", "Gamma"]},
)
print(result.step_history[-1].output)

This example is fully runnable. Copy it to a file and run with your OPENAI_API_KEY.


Flujo vs Alternatives

Feature Flujo LangChain / LangGraph Crew AI n8n / Make
Built‑in Persistent State ✅ SQLite ⚠️ External store ⚠️ External 🔒 Hidden SaaS
Proactive Cost Governor
Self‑Improvement Loop flujo improve ⚠️ Observe‑only
Self‑Hosting Friendly ⚠️ Needs infra ⚠️ Needs DB
Licence AGPL / Commercial MIT MIT Proprietary

Roadmap — Reliability at Scale

Capability Status
✅ Persistent & Durable Done
✅ Budget Controls Done
✅ Parallel & Caching Done
✅ Conditional Routing Done
🟡 Notifications & Hooks In Progress
🗺️ Security & Compliance Upcoming

Quick Start (60 seconds)

For Users

pip install flujo

echo '
from flujo import step

@step
async def hello(name: str) -> str:
    return f"Hello, {name}!"

# The `flujo run` CLI looks for a top‑level variable named "pipeline"
pipeline = hello
' > hello_pipeline.py

flujo run hello_pipeline.py --input "Flujo"

Expected output: Hello, Flujo!

For Developers

# Clone and install with robust verification
git clone https://github.com/aandresalvarez/flujo.git
cd flujo
make install-robust

# Run tests
make test

# Try the quickstart example
python examples/00_quickstart.py

See INSTALLATION.md for comprehensive setup instructions.


Get Involved


Licensing

Flexible AGPL‑3.0 / Commercial. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flujo-0.4.35.tar.gz (509.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flujo-0.4.35-py3-none-any.whl (186.5 kB view details)

Uploaded Python 3

File details

Details for the file flujo-0.4.35.tar.gz.

File metadata

  • Download URL: flujo-0.4.35.tar.gz
  • Upload date:
  • Size: 509.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for flujo-0.4.35.tar.gz
Algorithm Hash digest
SHA256 8321d7233d34c687455fbb96ef36e234140118d93b5e9926c7d183f77ea9d1f6
MD5 0d92a3240090bf768de6e32b0db5c9bc
BLAKE2b-256 f9b9632b7e545eda1c4002690ddd65e7e99304bf77219cbabb29707675994a81

See more details on using hashes here.

Provenance

The following attestation bundles were made for flujo-0.4.35.tar.gz:

Publisher: release.yml on aandresalvarez/flujo

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flujo-0.4.35-py3-none-any.whl.

File metadata

  • Download URL: flujo-0.4.35-py3-none-any.whl
  • Upload date:
  • Size: 186.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for flujo-0.4.35-py3-none-any.whl
Algorithm Hash digest
SHA256 e110a70a6f9837080855547b4a8c72f574dca4a7e664680cbeef2a667fd18727
MD5 f06242c4c5f424a98bff9bfca1773726
BLAKE2b-256 21315e30917f559e56eed0da9a579d4297a7aa1b0c38b10e9263bc7e522ccd42

See more details on using hashes here.

Provenance

The following attestation bundles were made for flujo-0.4.35-py3-none-any.whl:

Publisher: release.yml on aandresalvarez/flujo

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page