SwarmFlow: A distributed multi-agent orchestration framework
Project description
SwarmFlow
A distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.
🚀 Features
- Dead-Simple API: Minimal
@swarm_taskdecorator andrun()function - Auto-Dependency Inference: Dependencies automatically inferred from function parameters
- Agent Orchestration: Create complex workflows with multiple AI agents
- Retry Logic: Built-in retry mechanisms for resilient agent execution
- Observability: OpenTelemetry integration for tracing and monitoring
- Error Handling: Graceful failure propagation and recovery
- Real-time Monitoring: Send task traces to your monitoring dashboard
- Cycle Detection: Automatic detection of circular dependencies
- Production Ready: Comprehensive error handling and logging
- Hooks System: Powerful before/after/error/final hooks for custom orchestration logic
- Shared Memory: Cross-task state sharing with
flow.memory - Policy Enforcement: DAG-level rules for cost limits, abort conditions, and validation
📦 Installation
pip install swarmflow
🎯 Quick Start
from swarmflow import swarm_task, run
@swarm_task
def fetch_data():
return "Some data from API"
@swarm_task
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task
def display_result(process_data):
print(f"Final result: {process_data}")
# Run workflow - that's it!
run()
That's it! No complex setup, no manual dependency management. SwarmFlow automatically:
- ✅ Registers your tasks
- ✅ Infers dependencies from function parameters
- ✅ Executes in the correct order
- ✅ Handles retries and errors
- ✅ Sends traces to your dashboard
🔧 Advanced Usage
Retry Logic
@swarm_task(retries=3)
def unreliable_task():
# This task will retry up to 3 times on failure
pass
Multiple Dependencies
@swarm_task
def step1():
return "Step 1 completed"
@swarm_task
def step2():
return "Step 2 completed"
@swarm_task
def step3():
return "Step 3 completed"
@swarm_task
def final_step(step1, step2, step3):
# Dependencies automatically inferred from parameter names
return f"Combined: {step1}, {step2}, {step3}"
run()
Auto-Extracted Groq Metadata
from groq import Groq
@swarm_task
def llm_task():
# This will automatically extract metadata from Groq responses
client = Groq()
response = client.chat.completions.create(
model="llama-3-70b",
messages=[{"role": "user", "content": "Hello"}]
)
return response
# SwarmFlow automatically detects and extracts:
# - Model name (llama-3-70b, llama-4-scout-17b, etc.)
# - Provider (Groq)
# - Token usage (prompt + completion tokens)
# - Precise cost calculation (USD)
# - Timing metrics (queue, prompt, completion, total time)
# - All added to task.metadata automatically
# Example output with metadata:
# [SwarmFlow] Task: llm_task
# ↳ Status: success
# ↳ Duration: 1234 ms
# ↳ Output: <Groq ChatCompletion object>
# ↳ Metadata: {'agent': 'LLMProcessor', 'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}
API Key Configuration
SwarmFlow automatically handles API keys with Martian-style simplicity:
# Option 1: Set environment variable
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically uses key from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No key (logs warning but continues)
run() # Shows warning but executes normally
Hooks System & Shared Memory
SwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:
from swarmflow import swarm_task, run
from swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output
@swarm_task(before=log_input_output()[0], after=log_input_output()[1])
def fetch_data():
return "Some data from API"
@swarm_task(after=write_output_to_memory("processed_data"))
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task(before=read_memory_into_arg("processed_data", "input_data"))
def display_result(process_data, input_data=None):
print(f"Final result: {process_data}")
print(f"From memory: {input_data}")
# Run workflow - that's it!
run()
Available Hooks:
before: Execute before task runsafter: Execute after task succeedson_error: Execute when task failson_final: Execute after task completes (success or failure)
Built-in Hook Utilities:
write_output_to_memory(key): Save task output to shared memoryread_memory_into_arg(mem_key, arg_name): Inject memory value into task argumentslog_input_output(): Log task inputs and outputsenforce_max_cost(max_usd): Abort if total cost exceeds limitset_flag_on_failure(flag_key): Set memory flag when task failsskip_if_flag_set(flag_key): Skip task if memory flag is True
Policy Enforcement
Set DAG-level policies for cost limits, abort conditions, and validation:
from swarmflow import swarm_task, run, SwarmFlow
# Create flow for policy configuration
flow = SwarmFlow()
flow.set_policy("max_cost", 0.10) # Abort if total cost > $0.10
flow.set_policy("abort_on_flag", "error_detected") # Abort if flag is True
flow.set_policy("require_outputs", ["final_result"]) # Abort if missing outputs
@swarm_task
def task1():
return "Task 1 result"
@swarm_task
def task2(task1):
return "Task 2 result"
# Run with policies enforced
run()
Real-time Monitoring
SwarmFlow automatically sends task traces to the SwarmFlow backend service at http://localhost:8000/api/trace for real-time monitoring and analytics.
Trace Structure:
{
"id": "task-uuid",
"run_id": "dag-run-uuid", // Consistent across all tasks in the same DAG run
"name": "task_name",
"status": "success|failure|retrying|skipped",
"duration_ms": 1234,
"output": "task output",
"metadata": {
"agent": "LLMProcessor",
"provider": "Groq",
"model": "llama-3-70b",
"tokens_used": 150,
"cost_usd": 0.000089
},
"dependencies": ["dep1", "dep2"],
"flow_memory": {"key": "value"}, // Shared memory state
"flow_policy": {"max_cost": 0.10} // Active policies
}
Observability
SwarmFlow automatically provides:
- Task execution traces with OpenTelemetry
- Performance metrics (execution time, success rates)
- Dependency visualization and cycle detection
- Error tracking and failure propagation
- Auto-extracted Groq metadata (model, provider, token usage, precise cost calculation, timing metrics)
🏗️ Architecture
SwarmFlow is designed for production multi-agent systems with dead-simple usage:
User's Agent Functions → @swarm_task decorator → run() → Observability Dashboard
- Minimal: Just decorator + run function
- Scalable: Handles complex dependency graphs
- Observable: Real-time monitoring and debugging
- Resilient: Built-in retry logic and error handling
📊 Monitoring Dashboard
Get comprehensive insights into your multi-agent workflows:
- Real-time execution monitoring
- Performance analytics and optimization
- Error tracking and debugging
- Cost analysis for LLM usage (auto-calculated)
- Workflow visualization and dependency graphs
- Groq metadata extraction (comprehensive model support with timing and cost analytics)
- DAG run tracking with unique run_id for grouping and analytics
🚀 Deployment Configuration
API Key Authentication
SwarmFlow supports API key authentication for secure trace reporting:
# Option 1: Environment variable (recommended)
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically picks up from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No authentication (logs warning but continues)
run() # Shows warning but executes normally
Backend Configuration
SwarmFlow automatically sends traces to http://localhost:8000/api/trace. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.
🤝 Contributing
We welcome contributions! Please see our Contributing Guidelines.
📚 Documentation
For detailed documentation, visit: https://github.com/anirame128/swarmflow
📄 License
SDK License
The SwarmFlow SDK is licensed under the MIT License - see LICENSE file for details.
Backend Services
SwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.
Why this model?
- ✅ Free SDK: Developers can use the SDK without restrictions
- ✅ Paid Services: Backend services and dashboard require API keys
- ✅ Industry Standard: Follows the same model as Google Maps, Stripe, AWS SDKs
- ✅ Developer Friendly: Maximizes adoption while protecting your business model
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file swarmflow-0.4.2.tar.gz.
File metadata
- Download URL: swarmflow-0.4.2.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e87623dfb29ac3096677edf28043cc212b1dfc9985a0011f914606ee19fd9484
|
|
| MD5 |
4db4dcd291032fd538139ddbc640a2be
|
|
| BLAKE2b-256 |
b8db4d0f4edee614310233b85a3e1895e928b297845f0483bc5f337607d92d1f
|
File details
Details for the file swarmflow-0.4.2-py3-none-any.whl.
File metadata
- Download URL: swarmflow-0.4.2-py3-none-any.whl
- Upload date:
- Size: 15.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e42600c2dea5fe087529ef412d29d154f4a55365854548f38f8e68a368f35ff
|
|
| MD5 |
44f6c200ae2a28ab1b8e16d1e0570007
|
|
| BLAKE2b-256 |
3741565a097bca0156cffdb73478810e9bf3abdf42ea5d8bd7fcdd8186653e4e
|