Skip to main content

Production-grade AI agent orchestration framework with web scraping, document extraction, webhooks, rate limiting, plugins, workflows, semantic caching, and hybrid memory. The complete alternative to LangGraph, CrewAI, and AutoGen.

Project description

FluxGraph

Production-grade AI agent orchestration framework for building secure, scalable multi-agent systems

PyPI version Python License Documentation


Overview

FluxGraph is the most complete open-source AI agent framework for production deployment, combining cutting-edge innovations with enterprise-grade reliability. Built for developers who need sophisticated AI agent systems without complexity or vendor lock-in.

Why FluxGraph?

Graph-Based Workflows - Visual agent orchestration with conditional routing and loops
Semantic Caching - Intelligent response caching reduces LLM costs by 70%+
Hybrid Memory System - Short-term + long-term + episodic memory with semantic search
Circuit Breakers - Built-in fault tolerance and resilience
Real-time Cost Tracking - Per-agent, per-model cost analytics
Blockchain Audit Logs - Immutable execution history
PII Detection - Automatic detection and redaction of 9 sensitive data types
Streaming Support - Server-Sent Events for real-time responses
Production Ready - Security, monitoring, and scaling out of the box

Installation

# Full installation with v3.0 features
pip install fluxgraph[full]

# Minimal installation
pip install fluxgraph

# Specific features
pip install fluxgraph[p0]           # Graph workflows, memory, caching
pip install fluxgraph[production]   # Streaming, sessions, retry
pip install fluxgraph[security]     # RBAC, audit logs, PII detection
pip install fluxgraph[rag]          # RAG with ChromaDB
pip install fluxgraph[all]          # Everything

Quick Start

Hello World

from fluxgraph import FluxApp

app = FluxApp(title="My AI App")

@app.agent()
async def assistant(message: str) -> dict:
    return {"response": f"You said: {message}"}

# Run: flux run app.py
# Test: curl -X POST http://localhost:8000/ask/assistant -d '{"message":"Hello!"}'

LLM-Powered Agent

from fluxgraph import FluxApp
from fluxgraph.models import OpenAIProvider

app = FluxApp(title="Smart Assistant")
llm = OpenAIProvider(api_key="your-key", model="gpt-4")

@app.agent()
async def assistant(query: str) -> dict:
    response = await llm.generate(f"Answer: {query}")
    return {"answer": response.get("text")}

Key Features

1. Graph-Based Workflows

Create complex agent workflows with conditional routing:

from fluxgraph.core import WorkflowBuilder

workflow = (WorkflowBuilder("research")
    .add_agent("researcher", research_agent)
    .add_agent("analyzer", analysis_agent)
    .connect("researcher", "analyzer")
    .branch("analyzer", quality_check, {
        "retry": "researcher",
        "complete": "__end__"
    })
    .start_from("researcher")
    .build())

result = await workflow.execute({"query": "AI trends 2025"})

2. Advanced Memory System

Hybrid memory with semantic search:

app = FluxApp(enable_advanced_memory=True)

@app.agent()
async def smart_agent(query: str, advanced_memory) -> dict:
    # Store in short-term memory
    advanced_memory.store(query, MemoryType.SHORT_TERM, importance=0.8)
    
    # Semantic search across all memories
    similar = advanced_memory.recall_similar(query, k=5)
    
    # Automatic consolidation to long-term
    advanced_memory.consolidate()
    
    return {"context": [e.content for e, score in similar]}

3. Semantic Caching

Reduce costs by 70%+ with intelligent caching:

app = FluxApp(enable_agent_cache=True, cache_strategy="hybrid")

@app.agent()
async def expensive_agent(query: str, cache) -> dict:
    # Automatically checks cache for similar queries
    result = await expensive_llm_call(query)
    return {"answer": result}

# Cache statistics
stats = cache.get_stats()  # Hit rate, size, savings

4. Multi-Agent Orchestration

@app.agent()
async def supervisor(task: str, call_agent, broadcast) -> dict:
    research = await call_agent("research_agent", query=task)
    analyses = await broadcast(
        ["technical_analyst", "business_analyst"],
        data=research
    )
    return {"results": analyses}

5. Security Features

Automatic protection against threats:

app = FluxApp(enable_security=True)

@app.agent()
async def secure_agent(user_input: str) -> dict:
    # Automatic PII detection (9 types)
    # Prompt injection shield (7 techniques)
    # Immutable audit logging
    # RBAC + JWT authentication
    
    return await process(user_input)

PII Detection: EMAIL, PHONE, SSN, CREDIT_CARD, IP_ADDRESS, PASSPORT, DRIVER_LICENSE, DATE_OF_BIRTH, MEDICAL_RECORD

Injection Prevention: IGNORE_PREVIOUS, ROLE_PLAY, ENCODED_INJECTION, DELIMITER_INJECTION, PRIVILEGE_ESCALATION, CONTEXT_OVERFLOW, PAYLOAD_SPLITTING

6. Human-in-the-Loop

@app.agent()
async def critical_agent(action: str) -> dict:
    approval = await app.hitl_manager.request_approval(
        agent_name="critical_agent",
        task_description=f"Execute: {action}",
        risk_level="HIGH",
        timeout_seconds=300
    )
    
    if await approval.wait_for_approval():
        return {"status": "executed"}
    return {"status": "rejected"}

7. Streaming Responses

from fastapi.responses import StreamingResponse

@app.api.get("/stream/{agent}")
async def stream(agent: str, query: str):
    async def generate():
        async for chunk in app.orchestrator.run_streaming(agent, {"query": query}):
            yield f"data: {chunk}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream")

8. Batch Processing

# Submit 1000 tasks
job_id = await app.batch_processor.submit_batch(
    agent_name="processor",
    payloads=tasks,
    max_concurrent=50
)

# Check progress
status = app.batch_processor.get_job_status(job_id)

9. Cost Tracking

# Automatic per-agent cost tracking
costs = app.orchestrator.cost_tracker.get_summary()
# {"research_agent": {"cost": "$2.34", "calls": 145}}

10. RAG Integration

app = FluxApp(enable_rag=True)

@app.agent()
async def rag_agent(query: str, rag) -> dict:
    # Add documents
    await rag.add_documents([
        {"text": "FluxGraph is awesome", "metadata": {"source": "docs"}}
    ])
    
    # Query with semantic search
    results = await rag.query(query, top_k=5)
    return {"sources": results}

Supported Integrations

LLM Providers

  • OpenAI: GPT-3.5, GPT-4, GPT-4 Turbo
  • Anthropic: Claude 3 (Haiku, Sonnet, Opus)
  • Google: Gemini Pro, Ultra
  • Groq: Mixtral, Llama 3
  • Ollama: All local models
  • Azure OpenAI: GPT models

Memory Backends

  • PostgreSQL: Production persistence
  • Redis: Fast session storage
  • SQLite: Development/testing
  • In-Memory: Temporary stateless

Production Configuration

app = FluxApp(
    # v3.0 Features
    enable_workflows=True,
    enable_advanced_memory=True,
    enable_agent_cache=True,
    cache_strategy="hybrid",
    
    # Production
    enable_streaming=True,
    enable_sessions=True,
    
    # Security
    enable_security=True,
    
    # Orchestration
    enable_orchestration=True
)

Performance

v3.0 Benchmarks:

  • Cache Hit Rate: 85-95% on similar queries
  • Cost Reduction: 70%+ with semantic caching
  • Memory Consolidation: <50ms for 1000 entries
  • Workflow Execution: 100+ steps/second
  • Latency: <10ms overhead

API Endpoints

Endpoint Method Description
/ask/{agent} POST Execute agent
/stream/{agent} GET Stream response
/workflows GET List workflows
/workflows/{name}/execute POST Execute workflow
/memory/stats GET Memory statistics
/cache/stats GET Cache statistics
/system/status GET System health
/system/costs GET Cost summary

Docker Deployment

version: '3.8'
services:
  fluxgraph:
    image: fluxgraph:3.0.0
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db/fluxgraph
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - db
      - redis

Documentation & Support

License

MIT License - Free and open-source forever.


FluxGraph 3.0 - The most advanced open-source AI agent framework

Graph workflows • Semantic caching • Hybrid memory • Enterprise security

⭐ Star us on GitHub if FluxGraph powers your AI systems!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fluxgraph-2.3.0.tar.gz (227.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fluxgraph-2.3.0-py3-none-any.whl (232.4 kB view details)

Uploaded Python 3

File details

Details for the file fluxgraph-2.3.0.tar.gz.

File metadata

  • Download URL: fluxgraph-2.3.0.tar.gz
  • Upload date:
  • Size: 227.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for fluxgraph-2.3.0.tar.gz
Algorithm Hash digest
SHA256 2d0b6fb9be5bc4ca73eb2acd6bd461f4d7b7bf47f133996098225535486a6192
MD5 8952080bf4f2eb79eb7b293a41a78d25
BLAKE2b-256 54672f3f27338865afa1d0b23f77624b995b89026361171d0dcc1222231f88d4

See more details on using hashes here.

File details

Details for the file fluxgraph-2.3.0-py3-none-any.whl.

File metadata

  • Download URL: fluxgraph-2.3.0-py3-none-any.whl
  • Upload date:
  • Size: 232.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for fluxgraph-2.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0b5036386dba2e4b069c061c7c9a06e29a18a274b62bb1fa5fdb09d098179b30
MD5 63ef2e1d05756ae68c55e94281b7db3f
BLAKE2b-256 4855d8335a1fc96ae33ae2c4c4057905cc0d1cdc53c94b772baaba50dcb7854f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page