Skip to main content

AntFlow: Async execution library with concurrent.futures-style API and advanced pipelines

Project description

AntFlow Logo

AntFlow

Why AntFlow?

The name 'AntFlow' is inspired by the efficiency of an ant colony, where each ant (worker) performs its specialized function, and together they contribute to the colony's collective goal. Similarly, AntFlow orchestrates independent workers to achieve complex asynchronous tasks seamlessly.

The Problem I Had to Solve

I was processing massive amounts of data using OpenAI's Batch API. The workflow was complex:

  1. Upload batches of data to OpenAI
  2. Wait for processing to complete
  3. Download the results
  4. Save to database
  5. Repeat for the next batch

Initially, I processed 10 batches at a time using basic async. But here's the problem: I had to wait for ALL 10 batches to complete before starting the next group.

The Bottleneck

Imagine this scenario:

  • 9 batches complete in 5 minutes
  • 1 batch gets stuck and takes 30 minutes
  • I waste 25 minutes waiting for that one slow batch while my system sits idle

With hundreds of batches to process, these delays accumulated into hours of wasted time. Even worse, one failed batch would block the entire pipeline.

The Solution: AntFlow

I built AntFlow to solve this exact problem. Instead of batch-by-batch processing, AntFlow uses worker pools where:

  • ✅ Each worker handles tasks independently
  • ✅ When a worker finishes, it immediately grabs the next task
  • ✅ Slow tasks don't block fast ones
  • ✅ Always maintain optimal concurrency (e.g., 10 tasks running simultaneously)
  • ✅ Built-in retry logic for failed tasks
  • ✅ Multi-stage pipelines for complex workflows

Result: My OpenAI batch processing went from taking hours to completing in a fraction of the time, with automatic retry handling and zero idle time.

AntFlow Workers

AntFlow: Modern async execution library with concurrent.futures-style API and advanced pipelines


Key Features

🚀 Worker Pool Architecture

  • Independent workers that never block each other
  • Automatic task distribution
  • Optimal resource utilization

🔄 Multi-Stage Pipelines

  • Chain operations with configurable worker pools per stage
  • Each stage runs independently
  • Data flows automatically between stages
  • Priority Queues: Assign priority to items to bypass sequential processing (NEW)
  • Interactive Control: Resume pipelines and inject items into any stage (NEW)

💪 Built-in Resilience

  • Per-task retry with exponential backoff
  • Per-stage retry for transactional operations
  • Failed tasks don't stop the pipeline

📊 Real-time Monitoring & Dashboards

  • Built-in Progress Bar - Simple progress=True flag for terminal progress
  • Three Dashboard Levels - Compact, Detailed, and Full dashboards
  • Custom Dashboards - Implement DashboardProtocol for your own UI
  • Worker State Tracking - Know what each worker is doing in real-time
  • Performance Metrics - Track items processed, failures, avg time per worker
  • Error Summary - Aggregated error statistics with get_error_summary()
  • StatusTracker - Real-time item tracking with full history

🎯 Familiar API

  • Drop-in async replacement for concurrent.futures
  • submit(), map(), as_completed() methods
  • Clean, intuitive interface

Fluent APIs (NEW)

  • Pipeline.quick() - One-liner for simple pipelines
  • Pipeline.create() - Fluent builder pattern
  • Stage Presets - io_bound(), cpu_bound(), rate_limited()
  • Result Streaming - pipeline.stream() for processing results as they complete

Use Cases

Perfect for:

  • Batch API Processing - OpenAI, Anthropic, any batch API
  • ETL Pipelines - Extract, transform, load at scale
  • Web Scraping - Fetch, parse, store web data efficiently
  • Data Processing - Process large datasets with retry logic
  • Microservices - Chain async service calls with error handling

Real-world Impact:

  • Process large batches without bottlenecks
  • Automatic retry for transient failures
  • Zero idle time = maximum throughput
  • Clear observability with metrics and callbacks

Quick Install

pip install antflow

Quick Start

AntFlow offers three equivalent ways to create pipelines. Choose based on your needs:

Method 1: Stage Objects (Full Control)

Use explicit Stage objects when you need fine-grained control:

from antflow import Pipeline, Stage

# Define stages explicitly
fetch_stage = Stage(name="Fetch", workers=10, tasks=[fetch_data], retry="per_task", task_attempts=3)
process_stage = Stage(name="Process", workers=5, tasks=[transform])
save_stage = Stage(name="Save", workers=3, tasks=[save_to_db])

# Build and run pipeline
pipeline = Pipeline(stages=[fetch_stage, process_stage, save_stage])
results = await pipeline.run(items, progress=True)

Method 2: Fluent Builder API (Concise)

Use Pipeline.create().add() for a chainable, readable syntax:

from antflow import Pipeline

results = await (
    Pipeline.create()
    .add("Fetch", fetch_data, workers=10, retries=3)
    .add("Process", transform, workers=5)
    .add("Save", save_to_db, workers=3)
    .run(items, progress=True)
)

Method 3: Quick One-Liner (Simple Cases)

Use Pipeline.quick() for simple scripts:

from antflow import Pipeline

# Single task
results = await Pipeline.quick(items, process, workers=10, progress=True)

# Multiple tasks (one stage per task)
results = await Pipeline.quick(items, [fetch, process, save], workers=5)

Which Method to Choose?

Method When to Use
Stage objects Fine-grained control, custom callbacks, task concurrency limits
Fluent API Clean multi-stage pipelines, quick prototyping
Pipeline.quick() Simple scripts, single-task processing

All three methods produce the same result - they're just different ways to express the same thing.

Stage Presets

Pre-configured stages for common patterns:

from antflow import Stage

# I/O bound tasks (API calls, file ops) - 10 workers, 3 retries
stage = Stage.io_bound("Fetch", fetch_data, workers=20)

# CPU bound tasks - workers = CPU count, 1 retry
stage = Stage.cpu_bound("Process", transform_data)

# Rate-limited APIs - enforces RPS limit
stage = Stage.rate_limited("API", call_external_api, rps=5)

Built-in Progress & Dashboards

All display options are optional. By default, pipelines run silently.

# No display (silent) - default
results = await pipeline.run(items)

# Simple progress bar - shows END-TO-END progress only
results = await pipeline.run(items, progress=True)
# [████████████░░░░░░░░░░░░░░░░░░] 42% | 126/300 | 24.5/s

# Compact dashboard - overall progress + current stage activity
results = await pipeline.run(items, dashboard="compact")

# Detailed dashboard - PER-STAGE progress table (recommended for multi-stage)
results = await pipeline.run(items, dashboard="detailed")

# Full dashboard - everything + worker states + error log
results = await pipeline.run(items, dashboard="full")

Tip: For multi-stage pipelines, use dashboard="detailed" to see progress per stage and identify bottlenecks.

Stream Results

Process results as they complete:

async for result in pipeline.stream(items):
    print(f"Got: {result.value}")
    if some_condition:
        break  # Early exit supported

Traditional API

For full control, use the traditional Stage and Pipeline API:

import asyncio
from antflow import Pipeline, Stage

async def upload_batch(batch_data):
    return "batch_id"

async def check_status(batch_id):
    return "result_url"

async def download_results(result_url):
    return "processed_data"

async def save_to_db(processed_data):
    return "saved"

# Build the pipeline with explicit stages
upload_stage = Stage(name="Upload", workers=10, tasks=[upload_batch])
check_stage = Stage(name="Check", workers=10, tasks=[check_status])
download_stage = Stage(name="Download", workers=10, tasks=[download_results])
save_stage = Stage(name="Save", workers=5, tasks=[save_to_db])

pipeline = Pipeline(stages=[upload_stage, check_stage, download_stage, save_stage])

# Process with progress bar
batches = ["batch1", "batch2", "batch3"]
results = await pipeline.run(batches, progress=True)

What happens: Each stage has its own worker pool. Workers process tasks independently. As soon as a worker finishes, it picks the next task. No waiting. No idle time. Maximum throughput.


Core Concepts

AsyncExecutor: Simple Concurrent Execution

For straightforward parallel processing, AsyncExecutor provides a concurrent.futures-style API:

import asyncio
from antflow import AsyncExecutor

async def process_item(x):
    await asyncio.sleep(0.1)
    return x * 2

async def main():
    async with AsyncExecutor(max_workers=10) as executor:
        # Using map() - returns list directly (like list(executor.map(...)) in concurrent.futures)
        # retries=3 means it will try up to 4 times total with exponential backoff
        results = await executor.map(process_item, range(100), retries=3)
        print(f"Processed {len(results)} items")

asyncio.run(main())

Pipeline: Multi-Stage Processing

For complex workflows with multiple steps, you can build a Pipeline:

import asyncio
from antflow import Pipeline, Stage

async def fetch(x):
    await asyncio.sleep(0.1)
    return f"data_{x}"

async def process(x):
    await asyncio.sleep(0.1)
    return x.upper()

async def save(x):
    await asyncio.sleep(0.1)
    return f"saved_{x}"

async def main():
    # Define stages with different worker counts
    fetch_stage = Stage(name="Fetch",        workers=5,
        tasks=[fetch_data, process_data],
        # Limit specific tasks to avoid rate limits
        task_concurrency_limits={
            "fetch_data": 2  # Only 2 concurrent fetch_data calls
        }
    )
    process_stage = Stage(name="Process", workers=5, tasks=[process_data])
    save_stage = Stage(name="Save", workers=3, tasks=[save])

    # Build pipeline
    pipeline = Pipeline(stages=[fetch_stage, process_stage, save_stage])

    # Process 100 items through all stages
    results = await pipeline.run(range(100))

    print(f"Completed: {len(results)} items")
    print(f"Stats: {pipeline.get_stats()}")

if __name__ == "__main__":
    asyncio.run(main())

Why different worker counts?

  • Fetch: I/O bound, use more workers (10)
  • Process: CPU bound, moderate workers (5)
  • Save: Rate-limited API, fewer workers (3)

Real-Time Monitoring with StatusTracker

Track every item as it flows through your pipeline with StatusTracker. Get real-time status updates, query current states, and access complete event history.

from antflow import Pipeline, Stage, StatusTracker
import asyncio

# Mock tasks
async def fetch(x): return x
async def process(x): return x * 2
async def save(x): return x

# 1. Define a callback for real-time updates
async def log_event(event):
    print(f"Item {event.item_id}: {event.status} @ {event.stage}")

tracker = StatusTracker(on_status_change=log_event)

# Define stages
stage1 = Stage(name="Fetch", workers=5, tasks=[fetch])
stage2 = Stage(name="Process", workers=3, tasks=[process])
stage3 = Stage(name="Save", workers=5, tasks=[save])

pipeline = Pipeline(
    stages=[stage1, stage2, stage3],
    status_tracker=tracker
)

# 2. Run pipeline (logs will print in real-time)
async def main():
    items = range(50)
    results = await pipeline.run(items)

    # 3. Get final statistics
    stats = tracker.get_stats()
    print(f"Completed: {stats['completed']}")
    print(f"Failed: {stats['failed']}")

    # Get full history for an item
    history = tracker.get_history(item_id=0)

asyncio.run(main())

See the examples/ directory for more advanced usage, including built-in dashboards (dashboard="compact", "detailed", "full") and a Web Dashboard example (examples/web_dashboard/).


Documentation

AntFlow has comprehensive documentation to help you get started and master advanced features:

🚀 Getting Started

📚 User Guides

💡 Examples

📖 API Reference

You can also build and serve the documentation locally using mkdocs:

pip install mkdocs-material
mkdocs serve

Then open your browser to http://127.0.0.1:8000.


Requirements

  • Python 3.9+
  • tenacity >= 8.0.0

Note: For Python 3.9-3.10, the taskgroup backport is automatically installed.


Running Tests

To run the test suite, first install the development dependencies from the project root:

pip install -e ".[dev]"

Then, you can run the tests using pytest:

pytest

Contributing

Contributions are welcome! Please see our Contributing Guidelines.


License

MIT License - see LICENSE file for details.


Made with ❤️ to solve real problems in production

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

antflow-0.6.0.tar.gz (56.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

antflow-0.6.0-py3-none-any.whl (42.0 kB view details)

Uploaded Python 3

File details

Details for the file antflow-0.6.0.tar.gz.

File metadata

  • Download URL: antflow-0.6.0.tar.gz
  • Upload date:
  • Size: 56.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for antflow-0.6.0.tar.gz
Algorithm Hash digest
SHA256 3f6635798c71f06dd9ccfeb02a9913c5d8ab4b8701faa9fec8d84a4df296d398
MD5 81287e92e9feab9811f6229b485007a0
BLAKE2b-256 12b7029dc169e1e10acce02bbbb41f4ccec3848d99e914e3937e2878c155f82b

See more details on using hashes here.

File details

Details for the file antflow-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: antflow-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 42.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for antflow-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d67f9439b6adfba7e7ca039504068b3d95f2bfd02c4a13bc8eff186d013f41c2
MD5 779437b9062a0e67ca05a74b261d59bc
BLAKE2b-256 5883cfd5c43678bab93ccd05053054349cc3495c590b2f505538a447eae30fb7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page