Production-grade enterprise AI agent orchestration framework
Project description
FluxGraph
Production-grade AI agent orchestration framework for building secure, scalable multi-agent systems
Overview
FluxGraph is the most complete open-source AI agent framework for production deployment, combining enterprise-grade features with zero vendor lock-in. Built for developers who need sophisticated AI agent systems without complexity.
Why FluxGraph?
Unique capabilities not found in other open-source frameworks:
- Native circuit breakers for automatic failure recovery
- Real-time cost tracking per agent and model
- Blockchain-style immutable audit logs for compliance (GDPR, HIPAA, SOC2)
- Built-in PII detection (9 types) and prompt injection shields (7 techniques)
- Agent-to-agent handoff protocol with context preservation
- MCP support for ecosystem compatibility
- Production-ready from installation: streaming, sessions, retry, validation
Feature Comparison
| Feature | FluxGraph | LangGraph | CrewAI | AutoGen |
|---|---|---|---|---|
| Streaming Responses | ✅ Native SSE | ⚠️ Callbacks | ❌ | ❌ |
| Session Management | ✅ SQL-backed | ⚠️ External | ⚠️ Basic | ⚠️ Manual |
| Immutable Audit Logs | ✅ Blockchain-style | ❌ | ❌ | ❌ |
| PII Detection | ✅ 9 types | ❌ | ❌ | ❌ |
| Prompt Injection Shield | ✅ 7 techniques | ❌ | ❌ | ❌ |
| Circuit Breakers | ✅ Unique | ❌ | ❌ | ❌ |
| Real-time Cost Tracking | ✅ Per-agent | ❌ | ❌ | ❌ |
| Agent Handoffs | ✅ Context-aware | ⚠️ Basic | ⚠️ Basic | ✅ |
| HITL Workflows | ✅ Built-in | ❌ | ❌ | ✅ |
| MCP Support | ✅ Full protocol | ⚠️ Adapters | ❌ | ❌ |
| RAG Integration | ✅ Native | ✅ Via adapters | ⚠️ Basic | ⚠️ Basic |
| Agent Versioning | ✅ A/B testing | ❌ | ❌ | ❌ |
| Batch Processing | ✅ Priority queues | ❌ | ❌ | ❌ |
| RBAC + JWT Auth | ✅ Granular | ❌ | ❌ | ❌ |
Architecture
System Components
┌─────────────────────────────────────────────────────────────────────┐
│ Client Applications │
│ (Web, Mobile, CLI, API Clients) │
└────────────────────────────────┬────────────────────────────────────┘
│
┌────────────▼────────────┐
│ Load Balancer │
│ (Nginx/CloudFlare) │
└────────────┬────────────┘
│
┌────────────────────────────────┼────────────────────────────────────┐
│ │ │
│ ┌─────────────────▼─────────────────┐ │
│ │ FluxGraph API Layer │ │
│ │ (FastAPI + Gunicorn Workers) │ │
│ └─────────────────┬─────────────────┘ │
│ │ │
│ ┌─────────────────▼─────────────────┐ │
│ │ FluxGraph Core Orchestrator │ │
│ │ • Agent Registry & Routing │ │
│ │ • Circuit Breakers │ │
│ │ • Cost Tracking │ │
│ │ • Security Layer │ │
│ └─────────────────┬─────────────────┘ │
│ │ │
│ ┌────────────┬───────────┼───────────┬────────────┐ │
│ │ │ │ │ │ │
│ ┌────▼────┐ ┌───▼────┐ ┌──▼───┐ ┌────▼─────┐ ┌───▼────┐ │
│ │ Agent │ │ Tool │ │Memory│ │ RAG │ │ MCP │ │
│ │Registry │ │Registry│ │Store │ │ System │ │ Server │ │
│ └────┬────┘ └───┬────┘ └──┬───┘ └────┬─────┘ └───┬────┘ │
│ │ │ │ │ │ │
└───────┼───────────┼──────────┼───────────┼───────────┼──────────┘
│ │ │ │ │
┌────▼─────┐┌───▼────┐┌────▼─────┐┌────▼─────┐┌───▼────┐
│ Custom ││External││PostgreSQL││ ChromaDB ││Protocol│
│ Agents ││ APIs ││ Redis ││Embeddings││ Tools │
└──────────┘└────────┘└──────────┘└──────────┘└────────┘
│
┌─────▼─────┐
│ LLM │
│ Providers │
│ (OpenAI, │
│Anthropic, │
│ Groq) │
└───────────┘
Data Flow
User Request → API Gateway → Security Layer → Agent Router
↓
┌───────────────────────┴───────────────────────┐
│ │
┌───────▼───────┐ ┌──────────▼────────┐
│ Audit Logger │ │ Cost Tracker │
│ (Blockchain) │ │ (Real-time) │
└───────────────┘ └───────────────────┘
│ │
└───────────────────┬───────────────────────────┘
▼
┌───────────────┐
│ Agent Engine │
│ • PII Check │
│ • Injection │
│ • Circuit │
└───────┬───────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Tools │ │ Memory │ │ RAG │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└───────────┼───────────┘
▼
┌───────────────┐
│ LLM Provider │
└───────┬───────┘
│
Response Generated
│
▼
Stream/Return to User
Installation
Quick Start
pip install fluxgraph[all]
Feature-Specific Installations
# Production features (streaming, sessions, retry)
pip install fluxgraph[production]
# Security features (audit, PII, RBAC)
pip install fluxgraph[security]
# Advanced orchestration (handoffs, HITL, batch)
pip install fluxgraph[orchestration]
# RAG capabilities
pip install fluxgraph[rag]
# Memory persistence
pip install fluxgraph[postgres]
# Standard installation
pip install fluxgraph
Development Installation
git clone https://github.com/ihtesham-jahangir/fluxgraph.git
cd fluxgraph
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -e ".[dev]"
Quick Start Guide
1. Basic Agent (Hello World)
from fluxgraph import FluxApp
# Initialize application
app = FluxApp(title="My First Agent API", version="1.0.0")
@app.agent()
def echo_agent(message: str) -> dict:
"""Simple echo agent that returns your message."""
return {"response": f"You said: {message}"}
# Run: flux run app.py
# Test: curl -X POST http://localhost:8000/ask/echo_agent -d '{"message":"Hello!"}'
2. LLM-Powered Agent
import os
from fluxgraph import FluxApp
from fluxgraph.models import OpenAIProvider
app = FluxApp(title="Smart Assistant")
# Configure LLM
llm = OpenAIProvider(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4",
temperature=0.7
)
@app.agent()
async def assistant(query: str) -> dict:
"""AI assistant powered by GPT-4."""
response = await llm.generate(
prompt=f"You are a helpful assistant. Answer: {query}",
max_tokens=150
)
return {
"query": query,
"answer": response.get("text", "No response"),
"model": "gpt-4"
}
3. Agent with Tools
@app.tool()
def calculate(expression: str) -> float:
"""Safely evaluate math expressions."""
import ast
return ast.literal_eval(expression)
@app.tool()
def get_weather(city: str) -> dict:
"""Get weather information (mock example)."""
return {"city": city, "temp": 72, "condition": "Sunny"}
@app.agent()
async def smart_agent(task: str, tools) -> dict:
"""Agent that can use tools."""
if "weather" in task.lower():
result = tools["get_weather"]("New York")
return {"task": task, "weather": result}
elif "calculate" in task.lower():
result = tools["calculate"]("2+2")
return {"task": task, "result": result}
return {"task": task, "message": "Task not recognized"}
4. Multi-Agent System
@app.agent()
async def supervisor(task: str, call_agent, broadcast) -> dict:
"""Supervisor that delegates to other agents."""
# Delegate to specific agent
research = await call_agent("research_agent", query=task)
# Broadcast to multiple agents
results = await broadcast(
["analysis_agent", "summary_agent"],
data=research
)
return {"task": task, "results": results}
@app.agent()
async def research_agent(query: str) -> dict:
"""Specialized research agent."""
return {"findings": f"Research results for: {query}"}
@app.agent()
async def analysis_agent(data: dict) -> dict:
"""Analyzes research data."""
return {"analysis": f"Analysis of {data}"}
5. Agent with Memory
from fluxgraph.core import PostgresMemory, DatabaseManager
from datetime import datetime
# Setup memory
db_manager = DatabaseManager(os.getenv("DATABASE_URL"))
memory_store = PostgresMemory(db_manager)
app = FluxApp(memory_store=memory_store)
@app.agent()
async def conversation_agent(message: str, session_id: str, memory) -> dict:
"""Agent with conversation memory."""
# Store user message
await memory.add(session_id, {
"role": "user",
"content": message,
"timestamp": datetime.utcnow().isoformat()
})
# Get conversation history
history = await memory.get(session_id, limit=10)
# Generate response with context
context = "\n".join([f"{msg['role']}: {msg['content']}"
for msg in reversed(history)])
response = await llm.generate(f"Context:\n{context}\n\nRespond to: {message}")
# Store response
await memory.add(session_id, {
"role": "assistant",
"content": response,
"timestamp": datetime.utcnow().isoformat()
})
return {"response": response, "history_length": len(history)}
6. RAG-Enabled Agent
@app.agent()
async def knowledge_agent(question: str, rag) -> dict:
"""Agent with knowledge base access."""
# Query knowledge base
docs = await rag.query(question, top_k=5)
if not docs:
return {"answer": "No information found"}
# Build context from retrieved documents
context = "\n\n".join([doc.get("content", "") for doc in docs])
# Generate answer with LLM
prompt = f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
answer = await llm.generate(prompt, max_tokens=200)
return {
"question": question,
"answer": answer,
"sources": [doc.get("metadata", {}) for doc in docs]
}
# Ingest documents
@app.api.post("/knowledge/ingest")
async def ingest_document(file: UploadFile = File(...)):
"""Add documents to knowledge base."""
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
buffer.write(await file.read())
await app.rag_connector.ingest(temp_path, metadata={
"filename": file.filename,
"upload_time": datetime.utcnow().isoformat()
})
os.remove(temp_path)
return {"message": f"Ingested {file.filename}"}
Enterprise Features
Streaming Responses
from fastapi.responses import StreamingResponse
app = FluxApp(enable_streaming=True)
@app.api.get("/stream/{agent_name}")
async def stream_agent(agent_name: str, query: str):
"""Stream agent responses in real-time."""
async def generate():
async for chunk in app.orchestrator.run_streaming(
agent_name,
{"query": query}
):
yield f"data: {chunk}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Human-in-the-Loop
app = FluxApp(enable_orchestration=True)
@app.agent()
async def critical_agent(action: str) -> dict:
"""Agent requiring human approval."""
approval = await app.hitl_manager.request_approval(
agent_name="critical_agent",
task_description=f"Execute: {action}",
risk_level="HIGH",
timeout_seconds=300
)
if await approval.wait_for_approval():
result = execute_action(action)
return {"status": "executed", "result": result}
return {"status": "rejected", "reason": approval.rejection_reason}
Batch Processing
@app.api.post("/batch/process")
async def batch_process(agent_name: str, tasks: list):
"""Process multiple tasks in batch."""
job_id = await app.batch_processor.submit_batch(
agent_name=agent_name,
payloads=tasks,
priority=0,
max_concurrent=50
)
return {"job_id": job_id, "total_tasks": len(tasks)}
@app.api.get("/batch/{job_id}")
async def get_batch_status(job_id: str):
"""Check batch job status."""
status = app.batch_processor.get_job_status(job_id)
return status # {completed: 850, failed: 2, pending: 148}
Security Features
app = FluxApp(enable_security=True)
@app.agent()
async def secure_agent(user_input: str) -> dict:
"""Agent with automatic security checks."""
# Automatic prompt injection detection
is_safe, detections = app.prompt_shield.is_safe(user_input)
if not is_safe:
return {"error": "Security violation detected"}
# Automatic PII redaction
redacted_input, pii_found = app.pii_detector.redact(user_input)
# Immutable audit logging
app.audit_logger.log(
AuditEventType.AGENT_EXECUTION,
user_id="user_123",
details={"pii_count": len(pii_found)}
)
response = await process(redacted_input)
return {"response": response, "pii_redacted": len(pii_found)}
Agent Versioning & A/B Testing
# Register versions
app.version_manager.register_version(
agent_name="assistant",
version="1.0.0",
agent_code=agent_v1,
description="Original"
)
app.version_manager.register_version(
agent_name="assistant",
version="2.0.0",
agent_code=agent_v2,
description="Improved"
)
# Start A/B test
test_id = app.version_manager.start_ab_test(
agent_name="assistant",
version_a="1.0.0",
version_b="2.0.0",
split_ratio=0.5 # 50/50 split
)
# Route user to version
version = app.version_manager.select_ab_version(test_id, user_id)
MCP Protocol Integration
app = FluxApp(enable_mcp=True)
# Register MCP tool
app.mcp_server.register_fluxgraph_tool(
tool_name="web_search",
tool_func=web_search_function,
description="Search the web",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string"}
}
}
)
@app.agent()
async def mcp_agent(task: str, tools) -> dict:
"""Agent using MCP tools."""
result = await app.mcp_server.execute_tool("web_search", {"query": task})
return result
Security & Compliance
Immutable Audit Logs
# Automatic blockchain-style logging
app.audit_logger.log(
AuditEventType.AGENT_EXECUTION,
user_id="user_123",
details={"agent": "assistant"},
severity="INFO"
)
# Verify integrity
integrity = app.audit_logger.verify_integrity()
# {is_valid: True, total_entries: 1543}
# Export compliance report
app.audit_logger.export_compliance_report(
start_date=datetime(2025, 1, 1),
end_date=datetime(2025, 12, 31),
output_path="./compliance_2025.json"
)
PII Detection (9 Types)
# Supported: EMAIL, PHONE, SSN, CREDIT_CARD, IP_ADDRESS,
# PASSPORT, DRIVER_LICENSE, DATE_OF_BIRTH, MEDICAL_RECORD
detections = app.pii_detector.detect(user_input)
redacted, detected = app.pii_detector.redact(user_input)
# "Contact me at [REDACTED:email]"
# Scan entire payloads
data = {"message": "My SSN is 123-45-6789"}
clean_data, findings = app.pii_detector.scan_dict(data, redact=True)
Prompt Injection Shield (7 Techniques)
# Detects: IGNORE_PREVIOUS, ROLE_PLAY, ENCODED_INJECTION,
# DELIMITER_INJECTION, PRIVILEGE_ESCALATION, CONTEXT_OVERFLOW
is_safe, detections = app.prompt_shield.is_safe(user_input)
if not is_safe:
raise SecurityException("Prompt injection detected")
# Sanitize input
sanitized, removed = app.prompt_shield.sanitize(user_input)
RBAC with JWT
# Create user
user = app.rbac_manager.create_user(
username="john",
email="john@company.com",
password="secure_password",
roles=[Role.DEVELOPER]
)
# Generate JWT
token = app.rbac_manager.generate_jwt_token(user, expires_in_hours=24)
# Check permissions
if app.rbac_manager.check_permission(user, Permission.EXECUTE_AGENT):
result = await agent.run(query)
Configuration
Environment Variables
# LLM providers
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
# Database
DATABASE_URL=postgresql://user:password@localhost:5432/fluxgraph
# RAG
RAG_PERSIST_DIR=./knowledge_base
RAG_EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2
# Application
FLUXGRAPH_DEBUG=false
FLUXGRAPH_LOG_LEVEL=INFO
FLUXGRAPH_CORS_ORIGINS=["http://localhost:3000"]
Production Configuration
from fluxgraph import FluxApp
from fluxgraph.core import PostgresMemory, DatabaseManager
import logging
logging.basicConfig(level=logging.INFO)
app = FluxApp(
title="Production AI API",
version="1.0.0",
memory_store=PostgresMemory(DatabaseManager(os.getenv("DATABASE_URL"))),
cors_enabled=True,
auto_init_rag=True,
debug=False
)
Production Deployment
Local Development
# FluxGraph CLI
flux run app.py --host 0.0.0.0 --port 8000
# Uvicorn
uvicorn app:app --reload --host 0.0.0.0 --port 8000
# Gunicorn
gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Docker Compose
version: '3.8'
services:
fluxgraph:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/fluxgraph
- OPENAI_API_KEY=${OPENAI_API_KEY}
- FLUXGRAPH_ENABLE_SECURITY=true
depends_on:
- db
- redis
command: gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker
db:
image: postgres:15
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: fluxgraph
spec:
replicas: 5
template:
spec:
containers:
- name: fluxgraph
image: fluxgraph:latest
env:
- name: FLUXGRAPH_ENABLE_STREAMING
value: "true"
- name: FLUXGRAPH_ENABLE_SECURITY
value: "true"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["gunicorn", "app:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
API Reference
| Endpoint | Method | Description | Enterprise Feature |
|---|---|---|---|
/ask/{agent_name} |
POST | Execute agent | Cost tracking, audit |
/stream/{agent_name} |
GET | Stream responses | SSE streaming |
/sessions |
POST | Create session | Session management |
/sessions/{id} |
GET | Get history | Conversation persistence |
/approvals |
GET | Pending HITL | Human-in-the-loop |
/approvals/{id}/approve |
POST | Approve request | HITL workflows |
/batch |
POST | Submit batch | Batch processing |
/batch/{job_id} |
GET | Batch status | Job monitoring |
/agents |
GET | List agents | Agent registry |
/system/status |
GET | System health | Circuit breakers |
/system/costs |
GET | Cost summary | Real-time tracking |
/audit/verify |
GET | Verify integrity | Blockchain logs |
/knowledge/ingest |
POST | Upload documents | RAG system |
Request/Response Format
Request:
{
"parameter1": "value1",
"parameter2": "value2"
}
Success Response:
{
"success": true,
"result": {
"response": "Agent output"
},
"metadata": {
"agent": "agent_name",
"execution_time": 0.145,
"request_id": "uuid-string"
}
}
Error Response:
{
"success": false,
"error": {
"message": "Error description",
"code": "ERROR_CODE"
},
"metadata": {
"request_id": "uuid-string",
"timestamp": "2025-01-01T00:00:00Z"
}
}
Supported Integrations
LLM Providers
| Provider | Models | Streaming | Cost Tracking |
|---|---|---|---|
| OpenAI | GPT-3.5, GPT-4, GPT-4 Turbo | ✅ | ✅ |
| Anthropic | Claude 3 (Haiku, Sonnet, Opus) | ✅ | ✅ |
| Gemini Pro, Gemini Ultra | ✅ | ✅ | |
| Groq | Mixtral, Llama 3 | ✅ | ✅ |
| Ollama | All local models | ✅ | ❌ |
| Azure OpenAI | GPT models | ✅ | ✅ |
Memory Backends
| Backend | Use Case | Configuration |
|---|---|---|
| PostgreSQL | Production persistence | DATABASE_URL |
| Redis | Fast session storage | REDIS_URL |
| SQLite | Development/testing | Local file |
| In-Memory | Temporary stateless | None |
RAG Components
| Component | Implementation | Purpose |
|---|---|---|
| Vector Store | ChromaDB | Embedding storage |
| Embeddings | SentenceTransformers | Vectorization |
| Loaders | Unstructured | Document parsing |
| Chunking | LangChain | Text segmentation |
Performance
- Execution Speed: 45 simple tasks/min, 32 multi-agent tasks/min
- Cost Tracking: <1% error rate across all providers
- Memory Overhead: Low with FastAPI async architecture
- Scalability: Horizontal scaling with Kubernetes
Real-time Cost Breakdown
{
"research_agent": {"total_cost": 2.34, "calls": 145},
"summary_agent": {"total_cost": 0.87, "calls": 89},
"assistant_agent": {"total_cost": 5.12, "calls": 312}
}
Development Roadmap
✅ v2.0 (Current - October 2025)
Complete enterprise feature set: streaming, security, orchestration, MCP support
🚧 v2.1 (Q1 2026)
- Visual workflow designer
- Enterprise SSO (SAML, OAuth2)
- Analytics dashboard
- SOC2/HIPAA certifications
📋 v2.2 (Q2 2026)
- Distributed execution
- Auto-scaling infrastructure
- Multi-cloud deployment
Community & Support
- Documentation: fluxgraph.readthedocs.io
- Discord: Join Community
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Enterprise: enterprise@fluxgraph.com
Enterprise Support
For companies requiring custom integrations, dedicated SLA, security audits, compliance assistance, or training:
Contact: enterprise@fluxgraph.com
Contributing
We welcome contributions from the community. FluxGraph is built by developers, for developers.
Development Setup
# Clone repository
git clone https://github.com/ihtesham-jahangir/fluxgraph.git
cd fluxgraph
# Setup development environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -e ".[dev]"
# Run tests
pytest tests/
# Run linting
black fluxgraph/
flake8 fluxgraph/
Contribution Areas
- Core Features: Enhance orchestrator, add LLM providers
- Security: Improve PII detection, add injection patterns
- Analytics: Build monitoring dashboards
- Documentation: Improve guides and tutorials
- Testing: Increase test coverage
- Integrations: Add new tool integrations
Contribution Guidelines
- Follow PEP 8 style guidelines
- Include comprehensive tests for new features
- Update documentation for API changes
- Submit detailed pull request descriptions
Examples & Use Cases
Customer Support Bot
@app.agent()
async def support_bot(query: str, rag, memory, session_id: str) -> dict:
"""Customer support with knowledge base and conversation history."""
# Check knowledge base
kb_results = await rag.query(query, top_k=3)
# Get conversation history
history = await memory.get(session_id, limit=5)
# Generate response
context = f"Knowledge base: {kb_results}\nHistory: {history}"
response = await llm.generate(f"{context}\n\nUser: {query}")
return {"response": response, "sources": kb_results}
Content Moderation Agent
@app.agent()
async def moderation_agent(content: str) -> dict:
"""Automatically moderate user content for safety."""
# Check for PII
redacted, pii = app.pii_detector.redact(content)
# Check for prompt injection
is_safe, detections = app.prompt_shield.is_safe(content)
# Check content policy with LLM
policy_check = await llm.generate(
f"Is this content appropriate? {redacted}"
)
return {
"original_flagged": not is_safe,
"pii_found": len(pii),
"policy_compliant": "appropriate" in policy_check.lower(),
"safe_content": redacted
}
Data Analysis Pipeline
@app.agent()
async def analysis_pipeline(data_query: str, call_agent) -> dict:
"""Multi-step data analysis pipeline."""
# Step 1: Collect data
raw_data = await call_agent("data_collector", query=data_query)
# Step 2: Clean data
clean_data = await call_agent("data_cleaner", data=raw_data)
# Step 3: Analyze
analysis = await call_agent("data_analyzer", data=clean_data)
# Step 4: Generate insights
insights = await call_agent("insight_generator", analysis=analysis)
return {
"query": data_query,
"insights": insights,
"pipeline_steps": 4
}
Research Assistant
@app.agent()
async def research_assistant(topic: str, rag, tools) -> dict:
"""Comprehensive research assistant with web search and RAG."""
# Search internal knowledge base
internal_docs = await rag.query(topic, top_k=5)
# Search external sources
external_results = tools["web_search"](topic, max_results=10)
# Synthesize findings
all_sources = internal_docs + external_results
synthesis = await llm.generate(
f"Synthesize research on {topic} from: {all_sources}"
)
return {
"topic": topic,
"synthesis": synthesis,
"internal_sources": len(internal_docs),
"external_sources": len(external_results)
}
Testing
Unit Tests
import pytest
from fluxgraph import FluxApp
@pytest.fixture
def app():
return FluxApp(title="Test App")
def test_agent_registration(app):
@app.agent()
def test_agent(message: str) -> dict:
return {"response": message}
assert "test_agent" in app.orchestrator.agents
def test_agent_execution(app):
@app.agent()
def echo_agent(message: str) -> dict:
return {"echo": message}
result = app.orchestrator.run("echo_agent", {"message": "test"})
assert result["echo"] == "test"
Integration Tests
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_api_endpoint():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/ask/echo_agent",
json={"message": "Hello"}
)
assert response.status_code == 200
assert "response" in response.json()
Troubleshooting
Common Issues
Issue: Agent not found
# Ensure agent is registered before running
@app.agent() # Decorator must be present
def my_agent(query: str) -> dict:
return {"response": query}
Issue: Memory not persisting
# Check DATABASE_URL is set
import os
assert os.getenv("DATABASE_URL"), "DATABASE_URL not set"
# Verify connection
from fluxgraph.core import DatabaseManager
db = DatabaseManager(os.getenv("DATABASE_URL"))
db.test_connection()
Issue: RAG not working
# Install RAG dependencies
# pip install fluxgraph[rag]
# Verify ChromaDB is initialized
assert app.rag_connector is not None, "RAG not initialized"
Issue: Cost tracking inaccurate
# Ensure LLM provider reports token usage
llm = OpenAIProvider(
api_key=os.getenv("OPENAI_API_KEY"),
track_costs=True # Enable cost tracking
)
Debug Mode
# Enable detailed logging
app = FluxApp(
debug=True,
log_level="DEBUG"
)
# Check system status
status = app.orchestrator.get_system_status()
print(status)
Best Practices
Agent Design
- Keep agents focused: One responsibility per agent
- Use type hints: Enable better validation and documentation
- Handle errors gracefully: Return error messages in response dict
- Document agents: Use docstrings for API documentation
@app.agent()
async def good_agent(query: str, tools) -> dict:
"""
Well-designed agent with clear purpose.
Args:
query: User query string
tools: Available tool functions
Returns:
dict: Response with 'answer' and 'metadata' keys
"""
try:
result = await process_query(query)
return {"answer": result, "metadata": {"status": "success"}}
except Exception as e:
return {"error": str(e), "metadata": {"status": "failed"}}
Security
- Always enable security features in production
- Validate all user inputs
- Use environment variables for secrets
- Enable audit logging for compliance
- Regular security reviews
Performance
- Use async agents for I/O operations
- Enable caching for repeated queries
- Batch similar requests
- Monitor circuit breaker status
- Scale horizontally with Kubernetes
Monitoring
# Enable comprehensive monitoring
app = FluxApp(
enable_streaming=True,
enable_security=True,
enable_orchestration=True
)
# Regular health checks
@app.api.get("/health")
async def health_check():
return {
"status": "healthy",
"agents": len(app.orchestrator.agents),
"circuit_breakers": app.orchestrator.get_circuit_status(),
"costs": app.cost_tracker.get_summary()
}
FAQ
Q: How do I migrate from LangGraph/CrewAI? A: FluxGraph provides adapters and migration guides in our documentation.
Q: Can I use local LLMs? A: Yes, FluxGraph supports Ollama for local models.
Q: Is FluxGraph production-ready? A: Yes, FluxGraph v2.0 includes all enterprise features for production deployment.
Q: How much does FluxGraph cost? A: FluxGraph is free and open-source (MIT License). You only pay for LLM API usage.
Q: Can I contribute? A: Absolutely! Check our Contributing section above.
Q: What's the difference between FluxGraph and Microsoft Agent Framework? A: FluxGraph is open-source with no vendor lock-in, free cost tracking, and unique features like circuit breakers.
Q: How do I get enterprise support? A: Contact enterprise@fluxgraph.com for dedicated support, SLAs, and custom integrations.
License
MIT License
Copyright (c) 2025 FluxGraph
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
See LICENSE file for full details.
Acknowledgments
Built with contributions from the open-source community. Special thanks to all contributors, early adopters, and the AI developer community.
Technologies:
- FastAPI for high-performance API
- PostgreSQL for reliable persistence
- ChromaDB for vector storage
- LangChain for RAG components
- Various LLM providers for AI capabilities
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fluxgraph-2.0.0.tar.gz.
File metadata
- Download URL: fluxgraph-2.0.0.tar.gz
- Upload date:
- Size: 121.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50f46f8994aa2f49d4667fa812963e9cb8be6db43e4f81387f3a3a1bb7877fc4
|
|
| MD5 |
41dc2b0f2f17207d5b012cbe6a0b8972
|
|
| BLAKE2b-256 |
75e68ecf9fe8d329c173fce3dc00edf6d79bc354f799f784392b44cacca601cd
|
File details
Details for the file fluxgraph-2.0.0-py3-none-any.whl.
File metadata
- Download URL: fluxgraph-2.0.0-py3-none-any.whl
- Upload date:
- Size: 86.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3792abd3aba8a53547688302d81aed9ba9ebcc93645be035ba5cc610617a6b67
|
|
| MD5 |
5f59a76c11101da74a4e8bf46cdef921
|
|
| BLAKE2b-256 |
94e4c4f469cb35579d77ba8c5218b75fa0f196037526b3cab50aa6ca0d3d9d21
|