Skip to main content

SwarmFlow: A distributed multi-agent orchestration framework

Project description

SwarmFlow

A distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.

🚀 Features

  • Dead-Simple API: Minimal @swarm_task decorator and run() function
  • Auto-Dependency Inference: Dependencies automatically inferred from function parameters
  • Agent Orchestration: Create complex workflows with multiple AI agents
  • Retry Logic: Built-in retry mechanisms for resilient agent execution
  • Observability: OpenTelemetry integration for tracing and monitoring
  • Error Handling: Graceful failure propagation and recovery
  • Real-time Monitoring: Send task traces to your monitoring dashboard
  • Cycle Detection: Automatic detection of circular dependencies
  • Production Ready: Comprehensive error handling and logging
  • Hooks System: Powerful before/after/error/final hooks for custom orchestration logic
  • Shared Memory: Cross-task state sharing with flow.memory
  • Policy Enforcement: DAG-level rules for cost limits, abort conditions, and validation

📦 Installation

pip install swarmflow

🎯 Quick Start

from swarmflow import swarm_task, run

@swarm_task
def fetch_data():
    return "Some data from API"

@swarm_task
def process_data(fetch_data):
    return f"Processed: {fetch_data}"

@swarm_task
def display_result(process_data):
    print(f"Final result: {process_data}")

# Run workflow - that's it!
run()

That's it! No complex setup, no manual dependency management. SwarmFlow automatically:

  • ✅ Registers your tasks
  • ✅ Infers dependencies from function parameters
  • ✅ Executes in the correct order
  • ✅ Handles retries and errors
  • ✅ Sends traces to your dashboard

🔧 Advanced Usage

Retry Logic

@swarm_task(retries=3)
def unreliable_task():
    # This task will retry up to 3 times on failure
    pass

Multiple Dependencies

@swarm_task
def step1():
    return "Step 1 completed"

@swarm_task
def step2():
    return "Step 2 completed"

@swarm_task
def step3():
    return "Step 3 completed"

@swarm_task
def final_step(step1, step2, step3):
    # Dependencies automatically inferred from parameter names
    return f"Combined: {step1}, {step2}, {step3}"

run()

Auto-Extracted Groq Metadata

from groq import Groq

@swarm_task
def llm_task():
    # This will automatically extract metadata from Groq responses
    client = Groq()
    response = client.chat.completions.create(
        model="llama-3-70b",
        messages=[{"role": "user", "content": "Hello"}]
    )
    return response

# SwarmFlow automatically detects and extracts:
# - Model name (llama-3-70b, llama-4-scout-17b, etc.)
# - Provider (Groq)
# - Token usage (prompt + completion tokens)
# - Precise cost calculation (USD)
# - Timing metrics (queue, prompt, completion, total time)
# - All added to task.metadata automatically

# Example output with metadata:
# [SwarmFlow] Task: llm_task
#   ↳ Status: success
#   ↳ Duration: 1234 ms
#   ↳ Output: <Groq ChatCompletion object>
#   ↳ Metadata: {'agent': 'LLMProcessor', 'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}

API Key Configuration

SwarmFlow automatically handles API keys with Martian-style simplicity:

# Option 1: Set environment variable
export SWARMFLOW_API_KEY="sk_abc123..."
run()  # Automatically uses key from environment

# Option 2: Pass directly
run(api_key="sk_abc123...")

# Option 3: No key (logs warning but continues)
run()  # Shows warning but executes normally

Hooks System & Shared Memory

SwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:

from swarmflow import swarm_task, run
from swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output

@swarm_task(before=log_input_output()[0], after=log_input_output()[1])
def fetch_data():
    return "Some data from API"

@swarm_task(after=write_output_to_memory("processed_data"))
def process_data(fetch_data):
    return f"Processed: {fetch_data}"

@swarm_task(before=read_memory_into_arg("processed_data", "input_data"))
def display_result(process_data, input_data=None):
    print(f"Final result: {process_data}")
    print(f"From memory: {input_data}")

# Run workflow - that's it!
run()

Available Hooks:

  • before: Execute before task runs
  • after: Execute after task succeeds
  • on_error: Execute when task fails
  • on_final: Execute after task completes (success or failure)

Built-in Hook Utilities:

  • write_output_to_memory(key): Save task output to shared memory
  • read_memory_into_arg(mem_key, arg_name): Inject memory value into task arguments
  • log_input_output(): Log task inputs and outputs
  • enforce_max_cost(max_usd): Abort if total cost exceeds limit
  • set_flag_on_failure(flag_key): Set memory flag when task fails
  • skip_if_flag_set(flag_key): Skip task if memory flag is True

Policy Enforcement

Set DAG-level policies for cost limits, abort conditions, and validation:

from swarmflow import swarm_task, run, SwarmFlow

# Create flow for policy configuration
flow = SwarmFlow()
flow.set_policy("max_cost", 0.10)  # Abort if total cost > $0.10
flow.set_policy("abort_on_flag", "error_detected")  # Abort if flag is True
flow.set_policy("require_outputs", ["final_result"])  # Abort if missing outputs

@swarm_task
def task1():
    return "Task 1 result"

@swarm_task
def task2(task1):
    return "Task 2 result"

# Run with policies enforced
run()

Real-time Monitoring

SwarmFlow automatically sends task traces to the SwarmFlow backend service at http://localhost:8000/api/trace for real-time monitoring and analytics.

Trace Structure:

{
  "id": "task-uuid",
  "run_id": "dag-run-uuid",  // Consistent across all tasks in the same DAG run
  "name": "task_name",
  "status": "success|failure|retrying|skipped",
  "duration_ms": 1234,
  "output": "task output",
  "metadata": {
    "agent": "LLMProcessor",
    "provider": "Groq",
    "model": "llama-3-70b",
    "tokens_used": 150,
    "cost_usd": 0.000089
  },
  "dependencies": ["dep1", "dep2"],
  "flow_memory": {"key": "value"},  // Shared memory state
  "flow_policy": {"max_cost": 0.10}  // Active policies
}

Observability

SwarmFlow automatically provides:

  • Task execution traces with OpenTelemetry
  • Performance metrics (execution time, success rates)
  • Dependency visualization and cycle detection
  • Error tracking and failure propagation
  • Auto-extracted Groq metadata (model, provider, token usage, precise cost calculation, timing metrics)

🏗️ Architecture

SwarmFlow is designed for production multi-agent systems with dead-simple usage:

User's Agent Functions → @swarm_task decorator → run() → Observability Dashboard
  • Minimal: Just decorator + run function
  • Scalable: Handles complex dependency graphs
  • Observable: Real-time monitoring and debugging
  • Resilient: Built-in retry logic and error handling

📊 Monitoring Dashboard

Get comprehensive insights into your multi-agent workflows:

  • Real-time execution monitoring
  • Performance analytics and optimization
  • Error tracking and debugging
  • Cost analysis for LLM usage (auto-calculated)
  • Workflow visualization and dependency graphs
  • Groq metadata extraction (comprehensive model support with timing and cost analytics)
  • DAG run tracking with unique run_id for grouping and analytics

🚀 Deployment Configuration

API Key Authentication

SwarmFlow supports API key authentication for secure trace reporting:

# Option 1: Environment variable (recommended)
export SWARMFLOW_API_KEY="sk_abc123..."
run()  # Automatically picks up from environment

# Option 2: Pass directly
run(api_key="sk_abc123...")

# Option 3: No authentication (logs warning but continues)
run()  # Shows warning but executes normally

Backend Configuration

SwarmFlow automatically sends traces to http://localhost:8000/api/trace. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.

🤝 Contributing

We welcome contributions! Please see our Contributing Guidelines.

📚 Documentation

For detailed documentation, visit: https://github.com/anirame128/swarmflow

📄 License

SDK License

The SwarmFlow SDK is licensed under the MIT License - see LICENSE file for details.

Backend Services

SwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.

Why this model?

  • Free SDK: Developers can use the SDK without restrictions
  • Paid Services: Backend services and dashboard require API keys
  • Industry Standard: Follows the same model as Google Maps, Stripe, AWS SDKs
  • Developer Friendly: Maximizes adoption while protecting your business model

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swarmflow-0.4.2.tar.gz (19.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swarmflow-0.4.2-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file swarmflow-0.4.2.tar.gz.

File metadata

  • Download URL: swarmflow-0.4.2.tar.gz
  • Upload date:
  • Size: 19.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for swarmflow-0.4.2.tar.gz
Algorithm Hash digest
SHA256 e87623dfb29ac3096677edf28043cc212b1dfc9985a0011f914606ee19fd9484
MD5 4db4dcd291032fd538139ddbc640a2be
BLAKE2b-256 b8db4d0f4edee614310233b85a3e1895e928b297845f0483bc5f337607d92d1f

See more details on using hashes here.

File details

Details for the file swarmflow-0.4.2-py3-none-any.whl.

File metadata

  • Download URL: swarmflow-0.4.2-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for swarmflow-0.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4e42600c2dea5fe087529ef412d29d154f4a55365854548f38f8e68a368f35ff
MD5 44f6c200ae2a28ab1b8e16d1e0570007
BLAKE2b-256 3741565a097bca0156cffdb73478810e9bf3abdf42ea5d8bd7fcdd8186653e4e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page