Skip to main content

Flock: A declrative framework for building and orchestrating AI agents.

Project description

Flock Banner

Documentation PyPI Version Python Version License Built by white duck Test Coverage Tests Ask DeepWiki


Flock 0.5: Declarative Blackboard Multi-Agent Orchestration

Stop engineering prompts. Start declaring contracts.

Flock is a production-focused framework for orchestrating AI agents through declarative type contracts and blackboard architecture—proven patterns from distributed systems, decades of microservice experience, and classical AI—now applied to modern LLMs.

📖 Read the full documentation →

Quick links:


The Problem With Current Approaches

Building production multi-agent systems today means dealing with:

🔥 Prompt Engineering Hell

prompt = """You are an expert code reviewer. When you receive code, you should...
[498 more lines of instructions that the LLM ignores half the time]"""

# 500-line prompt that breaks when models update
# How do I know this is the best prompt? (you don't)
# Proving 'best possible performance' is impossible

🧪 Testing Nightmares

# How do you unit test this?
result = llm.invoke(prompt)  # Hope for valid JSON
data = json.loads(result.content)  # Crashes in production

📐 Rigid Topology & Tight Coupling

# Want to add a new agent? Rewrite the entire graph.
workflow.add_edge("agent_a", "agent_b")
workflow.add_edge("agent_b", "agent_c")
# Add agent_d? Start rewiring...

💀 Single Point of Failure

# Orchestrator dies? Everything dies.

🧠 God Object Anti-Pattern

# One orchestrator needs domain knowledge of 20+ agents to route correctly
# Orchestrator 'guesses' next agent based on natural language
# Not suitable for critical systems

These aren't framework limitations—they're architectural choices that don't scale. Decades of microservice experience have taught us about decoupling, orchestration, and reliability. Let's apply those lessons!


The Flock Approach

Flock combines two proven patterns:

1. Declarative Type Contracts (Not Prompts)

Traditional approach:

prompt = """You are an expert bug analyst. Analyze bug reports and provide structured diagnostics.

INSTRUCTIONS:
1. Read the bug report carefully
2. Determine severity (Critical|High|Medium|Low)
3. Classify bug category
4. Formulate root cause hypothesis (minimum 50 characters)
5. Assign confidence score (0.0-1.0)

OUTPUT FORMAT:
You MUST return valid JSON with this exact structure:
{
  "severity": "string (Critical|High|Medium|Low)",
  "category": "string",
  "root_cause_hypothesis": "string (minimum 50 characters)",
  "confidence_score": "number (0.0 to 1.0)"
}

VALIDATION RULES:
- severity: Must be exactly one of: Critical, High, Medium, Low
- category: Must be a single word or short phrase
- root_cause_hypothesis: Must be at least 50 characters
- confidence_score: Must be between 0.0 and 1.0

[...hundreds more lines...]"""

result = llm.invoke(prompt)  # 500-line prompt that breaks
data = json.loads(result.content)  # Crashes in production 🔥

The Flock way:

@flock_type
class BugDiagnosis(BaseModel):
    severity: str = Field(pattern="^(Critical|High|Medium|Low)$")
    category: str = Field(description="Bug category")
    root_cause_hypothesis: str = Field(min_length=50)
    confidence_score: float = Field(ge=0.0, le=1.0)

# The schema IS the instruction. No 500-line prompt needed.
agent.consumes(BugReport).publishes(BugDiagnosis)

Bug Diagnosis

Why this matters:

  • Survives model upgrades - GPT-6 will still understand Pydantic schemas
  • Runtime validation - Errors caught at parse time, not in production
  • Testable - Mock inputs/outputs with concrete types
  • Self-documenting - The code tells you what agents do

2. Blackboard Architecture (Not Directed Graphs)

Graph-based approach:

# Explicit workflow with hardcoded edges
workflow.add_edge("radiologist", "diagnostician")
workflow.add_edge("lab_tech", "diagnostician")
# Add performance_analyzer? Rewrite the graph.

The Flock way (blackboard):

# Agents subscribe to types, workflows emerge
radiologist = flock.agent("radiologist").consumes(Scan).publishes(XRayAnalysis)
lab_tech = flock.agent("lab_tech").consumes(Scan).publishes(LabResults)
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)

# Add performance_analyzer? Just subscribe it:
performance = flock.agent("perf").consumes(Scan).publishes(PerfAnalysis)
# Done. No graph rewiring. Diagnostician can optionally consume it.

What just happened:

  • Parallel execution - Radiologist and lab_tech run concurrently (automatic)
  • Dependency resolution - Diagnostician waits for both inputs (automatic)
  • Loose coupling - Agents don't know about each other, just data types
  • Scalable - O(n) complexity, not O(n²) edges

This is not a new idea. Blackboard architecture has powered AI systems since the 1970s (Hearsay-II, HASP/SIAP, BB1). We're applying proven patterns to modern LLMs.


Quick Start (60 Seconds)

pip install flock-core
export OPENAI_API_KEY="sk-..."
export DEFAULT_MODEL="openai/gpt-4.1"  # Optional, has defaults
import os
import asyncio
from pydantic import BaseModel, Field
from flock import Flock, flock_type

# 1. Define typed artifacts
@flock_type
class CodeSubmission(BaseModel):
    code: str
    language: str

@flock_type
class BugAnalysis(BaseModel):
    bugs_found: list[str]
    severity: str = Field(pattern="^(Critical|High|Medium|Low|None)$")
    confidence: float = Field(ge=0.0, le=1.0)

@flock_type
class SecurityAnalysis(BaseModel):
    vulnerabilities: list[str]
    risk_level: str = Field(pattern="^(Critical|High|Medium|Low|None)$")

@flock_type
class FinalReview(BaseModel):
    overall_assessment: str = Field(pattern="^(Approve|Approve with Changes|Reject)$")
    action_items: list[str]

# 2. Create the blackboard
flock = Flock(os.getenv("DEFAULT_MODEL", "openai/gpt-4.1"))

# 3. Agents subscribe to types (NO graph wiring!)
bug_detector = flock.agent("bug_detector").consumes(CodeSubmission).publishes(BugAnalysis)
security_auditor = flock.agent("security_auditor").consumes(CodeSubmission).publishes(SecurityAnalysis)

# AND gate: This agent AUTOMATICALLY waits for BOTH analyses
final_reviewer = flock.agent("final_reviewer").consumes(BugAnalysis, SecurityAnalysis).publishes(FinalReview)

# 4. Run with real-time dashboard
async def main():
    await flock.serve(dashboard=True)

asyncio.run(main())

What happened:

  • Bug detector and security auditor ran in parallel
  • Final reviewer automatically waited for both
  • Zero prompts written - types defined the behavior
  • Zero graph edges - subscriptions created the workflow
  • Full type safety - Pydantic validates all outputs

Core Features

Typed Artifacts

Every piece of data is a validated Pydantic model:

@flock_type
class PatientDiagnosis(BaseModel):
    condition: str = Field(min_length=10)
    confidence: float = Field(ge=0.0, le=1.0)
    recommended_treatment: list[str] = Field(min_length=1)
    follow_up_required: bool

Benefits:

  • Runtime validation ensures quality
  • Field constraints prevent bad outputs
  • Self-documenting data structures
  • Version-safe (types survive model updates)

Agent Subscriptions with Logic Gates

AND Gates - Wait for ALL types:

# Wait for BOTH types before triggering
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)

OR Gates - Trigger on ANY type:

# Trigger when EITHER type arrives (via chaining)
alert_handler = flock.agent("alerts").consumes(SystemAlert).consumes(UserAlert).publishes(Response)

Count-Based AND Gates:

# Wait for THREE Orders
aggregator = flock.agent("aggregator").consumes(Order, Order, Order).publishes(BatchSummary)

# Wait for TWO Images AND ONE Metadata
validator = flock.agent("validator").consumes(Image, Image, Metadata).publishes(ValidationResult)

Fan-Out & Dynamic Fan-Out

Flock supports fan-out publishing so a single agent execution can generate multiple artifacts:

  • fan_out=10 → fixed count (10 artifacts of a type).
  • fan_out=(min, max)dynamic fan-out where the engine decides how many artifacts to generate within a range, based on input complexity and quality filters.
from flock.core import FanOutRange

idea_generator = (
    flock.agent("idea_generator")
    .consumes(ProductBrief)
    .publishes(
        ProductIdea,
        fan_out=(5, 20),              # engine decides 5–20 ideas
        where=lambda i: i.score >= 8,  # filter AFTER range checks
    )
)

Dynamic fan-out is fully backward compatible with existing fan_out=int usage and is described in detail in the Fan-Out Publishing guide and examples/02-patterns/publish/06_dynamic_fan_out.py.

🧠 Semantic Subscriptions (New in 0.5!)

Match artifacts by MEANING, not keywords:

# Install semantic extras
pip install flock-core[semantic]

# Agents route based on semantic similarity
security_team = (
    flock.agent("security_team")
    .consumes(SupportTicket, semantic_match="security vulnerability exploit")
    .publishes(SecurityAlert)
)

billing_team = (
    flock.agent("billing_team")
    .consumes(SupportTicket, semantic_match="payment charge refund billing")
    .publishes(BillingResponse)
)

# Tickets route automatically based on MEANING!
# "SQL injection" → Security Team (no keyword "security" needed!)
# "charged twice" → Billing Team (semantic match to "payment")

Advanced semantic filtering:

# Custom threshold (0.0-1.0, default 0.4)
.consumes(Ticket, semantic_match="urgent", semantic_threshold=0.7)  # Strict

# Multiple criteria (ALL must match)
.consumes(Doc, semantic_match=["security", "compliance"])  # AND logic

# Field-specific matching
.consumes(Article, semantic_match={
    "query": "machine learning",
    "threshold": 0.6,
    "field": "abstract"  # Only match this field
})

Why this is revolutionary:

  • No keyword brittleness - "SQL injection" matches "security vulnerability"
  • Better recall - Catches semantically similar content
  • Local embeddings - all-MiniLM-L6-v2 model (~90MB), no external API
  • Fast & cached - LRU cache with 10k entries, ~15ms per embedding

📖 Full Semantic Guide →

Advanced Subscription Patterns

Event Join

Predicates - Smart Filtering:

# Only process critical cases
urgent_care = flock.agent("urgent").consumes(
    Diagnosis,
    where=lambda d: d.severity in ["Critical", "High"]
)

BatchSpec - Cost Optimization:

# Process 25 at once = 96% cheaper API calls!
payment_processor = flock.agent("payments").consumes(
    Transaction,
    batch=BatchSpec(size=25, timeout=timedelta(seconds=30))
)

JoinSpec - Data Correlation:

# Match orders + shipments by ID
customer_service = flock.agent("notifications").consumes(
    Order,
    Shipment,
    join=JoinSpec(by=lambda x: x.order_id, within=timedelta(hours=24))
)

Combined - Production Pipelines:

# Correlate sensors, THEN batch for analysis
quality_control = flock.agent("qc").consumes(
    TemperatureSensor,
    PressureSensor,
    join=JoinSpec(by=lambda x: x.device_id, within=timedelta(seconds=30)),
    batch=BatchSpec(size=5, timeout=timedelta(seconds=45))
)

Event Batch

🌟 Fan-Out Publishing

Produce multiple outputs from a single execution:

# Generate 10 diverse product ideas from one brief
idea_generator = (
    flock.agent("generator")
    .consumes(ProductBrief)
    .publishes(ProductIdea, fan_out=10)
)

# With quality filtering
idea_generator = (
    flock.agent("generator")
    .consumes(ProductBrief)
    .publishes(
        ProductIdea,
        fan_out=20,  # Generate 20 candidates
        where=lambda idea: idea.score >= 8.0  # Only publish score >= 8
    )
)

Multi-Output Fan-Out (The Mind-Blowing Part):

# Generate 3 of EACH type = 9 total artifacts in ONE LLM call!
multi_master = (
    flock.agent("multi_master")
    .consumes(Idea)
    .publishes(Movie, MovieScript, MovieCampaign, fan_out=3)
)

# Single execution produces:
# - 3 complete Movies (title, genre, cast, plot)
# - 3 complete MovieScripts (characters, scenes, pages)
# - 3 complete MovieCampaigns (taglines, posters)
# = 9 complex artifacts, 100+ fields, full validation, ONE LLM call!

📖 Full Fan-Out Guide →

⏰ Timer-Based Agent Scheduling (New in 0.5.30!)

Run agents on schedules, not just events:

from datetime import timedelta, time

# Periodic health checks (every 30 seconds)
health_monitor = (
    flock.agent("health_monitor")
    .schedule(every=timedelta(seconds=30))
    .publishes(HealthStatus)
)

# Daily reports (5 PM every day)
daily_report = (
    flock.agent("daily_report")
    .schedule(at=time(hour=17, minute=0))
    .publishes(DailyReport)
)

# Cron expressions (every weekday at 9 AM UTC)
workday_report = (
    flock.agent("workday_report")
    .schedule(cron="0 9 * * 1-5")  # Mon-Fri at 9 AM
    .publishes(WorkdayReport)
)

# One-time scheduled task
scheduled_task = (
    flock.agent("scheduled_task")
    .schedule(at=datetime(2025, 12, 25, 9, 0))  # Christmas 9 AM
    .publishes(TaskResult)
)

Timer agents receive empty input with timer metadata:

async def health_check(ctx: AgentContext) -> HealthStatus:
    # ctx.artifacts = []  # Empty for timer triggers
    # ctx.trigger_type == "timer"  # Know it's timer-triggered
    # ctx.timer_iteration  # How many times fired (0, 1, 2...)
    # ctx.fire_time  # When timer fired
    
    # Access filtered blackboard context
    recent_errors = ctx.get_artifacts(LogEntry)  # Only ERROR logs
    
    return HealthStatus(healthy=len(recent_errors) == 0)

Why this is powerful:

  • No event dependency - Agents run independently on time
  • Context filtering - Combine .schedule() + .consumes() for filtered context
  • Precise timing - Interval, daily, cron, or one-time execution
  • Lifecycle control - Initial delays, repeat limits, graceful shutdown
  • Production-ready - Timer state tracking, drift prevention, crash recovery

📖 Timer Scheduling Guide →

🔒 Zero-Trust Visibility Controls

Built-in security (not bolt-on):

# Multi-tenancy (SaaS isolation)
agent.publishes(CustomerData, visibility=TenantVisibility(tenant_id="customer_123"))

# Explicit allowlist (HIPAA compliance)
agent.publishes(MedicalRecord, visibility=PrivateVisibility(agents={"physician", "nurse"}))

# Role-based access control
agent.identity(AgentIdentity(name="analyst", labels={"clearance:secret"}))
agent.publishes(IntelReport, visibility=LabelledVisibility(required_labels={"clearance:secret"}))

# Time-delayed release
artifact.visibility = AfterVisibility(ttl=timedelta(hours=24), then=PublicVisibility())

Architecturally impossible to bypass: Every context provider inherits from BaseContextProvider, which enforces visibility filtering automatically. You literally cannot create a provider that forgets to check permissions.

Context Providers (Smart Filtering)

Control what agents see:

from flock.context_provider import FilteredContextProvider, PasswordRedactorProvider

# Global filtering - all agents see only urgent items
flock = Flock(
    "openai/gpt-4.1",
    context_provider=FilteredContextProvider(FilterConfig(tags={"urgent"}))
)

# Per-agent overrides
error_agent.context_provider = FilteredContextProvider(FilterConfig(tags={"ERROR"}))

# Production-ready password filtering
flock = Flock(
    "openai/gpt-4.1",
    context_provider=PasswordRedactorProvider()  # Auto-redacts secrets!
)

Built-in providers (all visibility-filtered):

  • DefaultContextProvider - Full blackboard access
  • CorrelatedContextProvider - Workflow isolation
  • RecentContextProvider - Token cost control
  • TimeWindowContextProvider - Time-based filtering
  • SemanticContextProvider - Similarity-based retrieval (New!)
  • EmptyContextProvider - Stateless agents
  • FilteredContextProvider - Custom filtering

Semantic Context Provider:

from flock.semantic import SemanticContextProvider

# Find similar historical incidents
provider = SemanticContextProvider(
    query_text="database connection timeout",
    threshold=0.4,
    limit=5,
    artifact_type=Incident,
    where=lambda a: a.payload["resolved"] is True
)
similar = await provider.get_context(store)

📖 Context Providers Guide →

Persistent Blackboard

Production durability with SQLite:

from flock.store import SQLiteBlackboardStore

store = SQLiteBlackboardStore(".flock/blackboard.db")
await store.ensure_schema()
flock = Flock("openai/gpt-4.1", store=store)

What you get:

  • Long-lived artifacts with full history
  • Historical APIs with pagination
  • Dashboard integration with retention windows
  • CLI tools for maintenance and retention policies

Parallel Execution Control

Batch-then-execute pattern:

# ✅ EFFICIENT: Batch publish, then run in parallel
for review in customer_reviews:
    await flock.publish(review)  # Just scheduling work

await flock.run_until_idle()  # All sentiment_analyzer agents run concurrently!

# Get all results
analyses = await flock.store.get_by_type(SentimentAnalysis)
# 100 analyses in ~1x single review time!

Agent & Orchestrator Components

Composable lifecycle hooks:

from flock.components import AgentComponent

class LoggingComponent(AgentComponent):
    async def on_pre_evaluate(self, agent, ctx, inputs):
        logger.info(f"Agent {agent.name} evaluating: {inputs}")
        return inputs

    async def on_post_evaluate(self, agent, ctx, inputs, result):
        logger.info(f"Agent {agent.name} produced: {result}")
        return result

analyzer.with_utilities(LoggingComponent())

Built-in components: Rate limiting, caching, metrics, budget tracking, circuit breakers, deduplication

📖 Agent Components Guide →

🛠️ Server Components (New in 0.5.30!)

Extend Flock's HTTP API with custom middleware, routes, and lifecycle management:

from flock.components.server import ServerComponent

class CustomAPIComponent(ServerComponent):
    async def on_startup(self, orchestrator):
        # Add custom routes, middleware, or startup logic
        pass
    
    async def on_shutdown(self, orchestrator):
        # Cleanup resources
        pass

# Register server component
flock.add_server_component(CustomAPIComponent())

Built-in server components:

  • TimerComponent - Manages scheduled agent execution
  • ControlRoutesComponent - Agent/artifact management API
  • GraphRoutesComponent - Dashboard graph data API
  • TraceComponent - OpenTelemetry trace viewer
  • StaticFilesComponent - Dashboard UI serving

Why this matters:

  • Modular architecture - Add features without modifying core
  • Lifecycle hooks - Startup/shutdown coordination
  • Custom endpoints - Extend API with domain-specific routes
  • Middleware support - Authentication, logging, rate limiting
  • Production-ready - Proper initialization order, error handling

📖 Orchestrator Components Guide →

Production Safety

Built-in safeguards:

# Circuit breakers (auto-added)
flock = Flock("openai/gpt-4.1")  # CircuitBreakerComponent(max_iterations=1000)

# Feedback loop protection
critic.prevent_self_trigger(True)  # Won't trigger itself infinitely

# Best-of-N execution
agent.best_of(5, score=lambda result: result.metrics["confidence"])

Production Observability

Real-Time Dashboard

Start with one line:

await flock.serve(dashboard=True)

Agent View Agent View: Real-time communication patterns

Features:

  • Dual Modes: Agent view & Blackboard view
  • Real-Time Updates: WebSocket streaming with live activation
  • Interactive Graph: Drag, zoom, pan, 5 auto-layout algorithms
  • Advanced Filtering: Correlation ID tracking, time ranges, autocomplete
  • Control Panel: Publish artifacts, invoke agents from UI
  • Keyboard Shortcuts: WCAG 2.1 AA compliant

Blackboard View Blackboard View: Data lineage and transformations

Production-Grade Trace Viewer

Jaeger-style tracing with 7 modes:

Trace Viewer Timeline view with span hierarchies

7 Trace Modes:

  1. Timeline - Waterfall visualization
  2. Statistics - Sortable duration/error tracking
  3. RED Metrics - Rate, Errors, Duration monitoring
  4. Dependencies - Service communication analysis
  5. DuckDB SQL - Interactive query editor with CSV export
  6. Configuration - Real-time filtering
  7. Guide - Built-in documentation

Dependencies Dependency analysis

OpenTelemetry + DuckDB Tracing

One environment variable enables tracing:

export FLOCK_AUTO_TRACE=true
export FLOCK_TRACE_FILE=true

python your_app.py
# Traces stored in .flock/traces.duckdb

AI-queryable debugging:

import duckdb
conn = duckdb.connect('.flock/traces.duckdb', read_only=True)

# Find bottlenecks
slow_ops = conn.execute("""
    SELECT name, AVG(duration_ms) as avg_ms, COUNT(*) as count
    FROM spans
    WHERE duration_ms > 1000
    GROUP BY name
    ORDER BY avg_ms DESC
""").fetchall()

# Find errors with full context
errors = conn.execute("""
    SELECT name, status_description,
           json_extract(attributes, '$.input') as input,
           json_extract(attributes, '$.output') as output
    FROM spans
    WHERE status_code = 'ERROR'
""").fetchall()

Real debugging:

You: "My pizza agent is slow"
AI: [queries DuckDB]
    "DSPyEngine.evaluate takes 23s on average.
     Input size: 50KB of conversation history.
     Recommendation: Limit context to last 5 messages."

DuckDB Query DuckDB SQL query interface

REST API

Production-ready HTTP endpoints:

await flock.serve(dashboard=True)  # API + Dashboard on port 8344
# API docs: http://localhost:8344/docs

Key endpoints:

  • POST /api/v1/artifacts - Publish to blackboard
  • GET /api/v1/artifacts - Query with filtering/pagination
  • POST /api/v1/agents/{name}/run - Direct agent invocation
  • GET /api/v1/correlations/{id}/status - Workflow tracking
  • GET /health and GET /metrics - Monitoring

Features:

  • ✅ OpenAPI 3.0 documentation at /docs
  • ✅ Pydantic validation
  • ✅ Correlation tracking
  • ✅ Consumption metadata
  • ✅ Prometheus-compatible metrics

Framework Comparison

Dimension Graph-Based Chat-Based Flock (Blackboard)
Pattern Directed graph Round-robin chat Blackboard subscriptions
Coordination Manual edges Message passing Type subscriptions
Parallelism Manual split/join Sequential Automatic
Type Safety Varies Text messages Pydantic + validation
Coupling Tight Medium Loose
Adding Agents Rewrite graph Update flow Just subscribe
Testing Full graph Full group Individual isolation
Security DIY DIY Built-in (5 types)
Scalability O(n²) Limited O(n)

When Flock Wins

✅ Use Flock when you need:

  • Parallel agent execution (automatic)
  • Type-safe outputs (Pydantic validation)
  • Minimal prompt engineering (schemas define behavior)
  • Dynamic agent addition (no rewiring)
  • Testing in isolation (unit test individual agents)
  • Built-in security (HIPAA, SOC2, multi-tenancy)
  • 10+ agents (linear complexity)
  • Semantic routing (meaning-based matching)

When Alternatives Win

⚠️ Consider graph-based frameworks:

  • Extensive ecosystem integration needed
  • Workflow is inherently sequential
  • Battle-tested maturity required
  • Team has existing expertise

⚠️ Consider chat-based frameworks:

  • Conversation-based development preferred
  • Turn-taking dialogue use case
  • Specific ecosystem features needed

Honest Trade-offs

You trade:

  • Ecosystem maturity (smaller community)
  • Extensive documentation (catching up)
  • Battle-tested age (newer architecture)

You gain:

  • Better scalability (O(n) vs O(n²))
  • Type safety (validation vs hope)
  • Cleaner architecture (loose coupling)
  • Production safety (built-in circuit breakers)
  • Security model (5 visibility types)
  • Semantic intelligence (meaning-based routing)

Different frameworks for different priorities. Choose based on what matters to your team.


Production Readiness

What Works Today (v0.5.0)

✅ Production-ready core:

  • 1300+ tests with >75% coverage (>90% on critical paths)
  • Blackboard orchestrator with typed artifacts
  • Parallel + sequential execution (automatic)
  • Zero-trust security (5 visibility types)
  • Semantic subscriptions with local embeddings
  • Timer-based agent scheduling (interval, daily, cron, one-time)
  • Server components for extensible HTTP API
  • Circuit breakers and feedback prevention
  • OpenTelemetry + DuckDB tracing
  • Real-time dashboard with 7-mode trace viewer
  • MCP integration (Model Context Protocol)
  • Best-of-N, batching, joins, fan-out
  • Type-safe retrieval API
  • SQLite persistent store

⚠️ What's missing for large-scale:

  • Advanced retry logic (basic only)
  • Event replay (no Kafka yet)
  • Kubernetes-native deployment (no Helm)
  • OAuth/RBAC (dashboard has no auth)

All missing features planned for v1.0 (Q4 2025)

Recommended Use Cases Today

✅ Good fit right now:

  • Startups/MVPs (fast iteration, type safety)
  • Internal tools (in-memory acceptable)
  • Research/prototyping (clean architecture)
  • Medium-scale systems (10-50 agents, 1000s of artifacts)

⚠️ Wait for 1.0 if you need:

  • Enterprise persistence (multi-region, HA)
  • Compliance auditing (immutable logs)
  • Multi-tenancy SaaS (OAuth/SSO)
  • Mission-critical 99.99% uptime

Flock 0.5.0 is production-ready for the right use cases. Know your requirements.


Getting Started

# Install
pip install flock-core

# With semantic features
pip install flock-core[semantic]

# Set API key
export OPENAI_API_KEY="sk-..."

# Try examples
git clone https://github.com/whiteducksoftware/flock-flow.git
cd flock-flow

# CLI examples
uv run python examples/01-cli/01_declarative_pizza.py

# Dashboard examples
uv run python examples/02-dashboard/01_declarative_pizza.py

# Semantic routing
uv run python examples/08-semantic/01_intelligent_ticket_routing.py

Learn by doing:


Production Use Cases

Financial Services: Multi-Signal Trading

Challenge: Analyze signals in parallel, correlate within time windows, maintain audit trails.

# Parallel signal analyzers
volatility = flock.agent("volatility").consumes(MarketData).publishes(VolatilityAlert)
sentiment = flock.agent("sentiment").consumes(NewsArticle).publishes(SentimentAlert)

# Trade execution waits for CORRELATED signals
trader = flock.agent("trader").consumes(
    VolatilityAlert, SentimentAlert,
    join=JoinSpec(within=timedelta(minutes=5))
).publishes(TradeOrder)

Healthcare: HIPAA-Compliant Diagnostics

Challenge: Multi-modal fusion with access controls, audit trails, zero-trust.

# Privacy controls built-in
radiology.publishes(XRayAnalysis, visibility=PrivateVisibility(agents={"diagnostician"}))
lab.publishes(LabResults, visibility=TenantVisibility(tenant_id="patient_123"))

# Diagnostician waits for BOTH with role-based access
diagnostician = flock.agent("diagnostician").consumes(XRayAnalysis, LabResults).publishes(Diagnosis)

E-Commerce: Intelligent Support Routing

Challenge: Route support tickets to specialized teams based on meaning.

# Semantic routing (NO keyword matching!)
security_team.consumes(Ticket, semantic_match="security vulnerability exploit")
billing_team.consumes(Ticket, semantic_match="payment charge refund billing")
tech_support.consumes(Ticket, semantic_match="technical issue error bug")

# "SQL injection" → Security (no "security" keyword needed!)
# "charged twice" → Billing (semantic match!)
# "app crashes" → Tech Support (semantic understanding!)

📖 Full Use Cases →


Contributing

We're building Flock in the open. See Contributing Guide.

Before contributing:

Quality standards:

  • All tests must pass
  • Coverage requirements met
  • Code formatted with Ruff

Roadmap to 1.0

Target: Q4 2025

See ROADMAP.md for detailed status and tracking.

Key initiatives:

  • Reliability: Advanced retry, error recovery, distributed tracing
  • Persistence: Multi-region stores, event replay, Kafka integration
  • Security: OAuth/RBAC, audit logging, compliance tooling
  • Operations: Kubernetes deployment, Helm charts, monitoring
  • Quality: Performance benchmarks, stress testing, migration tools

The Bottom Line

Flock makes different architectural choices:

Instead of:

  • ❌ Prompt engineering → ✅ Declarative type contracts
  • ❌ Workflow graphs → ✅ Blackboard subscriptions
  • ❌ Keyword matching → ✅ Semantic intelligence
  • ❌ Manual parallelization → ✅ Automatic concurrent execution
  • ❌ Bolt-on security → ✅ Zero-trust visibility controls
  • ❌ Hope-based debugging → ✅ AI-queryable distributed traces

These are architectural decisions with real tradeoffs.

Different frameworks for different priorities. Choose based on what matters to your team.


Built with ❤️ by white duck GmbH

"Declarative contracts eliminate prompt hell. Blackboard architecture eliminates graph spaghetti. Semantic intelligence eliminates keyword brittleness. Proven patterns applied to modern LLMs."

⭐ Star on GitHub | 📖 Documentation | 🚀 Try Examples | 💼 Enterprise Support


Last Updated: October 19, 2025 Version: Flock 0.5.0 (Blackboard Edition) Status: Production-Ready Core, Enterprise Features Roadmapped

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flock_core-0.5.318.tar.gz (4.1 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flock_core-0.5.318-py3-none-any.whl (1.1 MB view details)

Uploaded Python 3

File details

Details for the file flock_core-0.5.318.tar.gz.

File metadata

  • Download URL: flock_core-0.5.318.tar.gz
  • Upload date:
  • Size: 4.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for flock_core-0.5.318.tar.gz
Algorithm Hash digest
SHA256 419939b16740ee0b547af55a6e75d617faa479ab078f40fdb21028ea34e66072
MD5 3c87aaeae4b7d8513d740e07c1bed3cc
BLAKE2b-256 3cc6e70d979aa16cb234a7f4faa766074e17f42b77fe69cdea6edc8c07c0d8b5

See more details on using hashes here.

File details

Details for the file flock_core-0.5.318-py3-none-any.whl.

File metadata

  • Download URL: flock_core-0.5.318-py3-none-any.whl
  • Upload date:
  • Size: 1.1 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for flock_core-0.5.318-py3-none-any.whl
Algorithm Hash digest
SHA256 9e7ad6444c7a74e734eec4cb5d8d345da3773575cbfe2cc0a5bd76cc86d9448b
MD5 0ebcec818e83675ebfa9a480101ac3b5
BLAKE2b-256 21261ee90130af45bdc18c23ed16b873da11f6245d9d74dce58e03c329e39908

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page