Skip to main content

A lightweight, high-performance, in-process micro-orchestrator for structured, declarative, and parallel asynchronous tasks in Python.

Project description

sdax - Structured Declarative Async eXecution

PyPI version Python 3.11+ License: MIT GitHub

sdax is a lightweight, high-performance, in-process micro-orchestrator for Python's asyncio. It is designed to manage complex, tiered, parallel asynchronous tasks with a declarative API, guaranteeing a correct and predictable order of execution.

It is ideal for building the internal logic of a single, fast operation, such as a complex API endpoint, where multiple dependent I/O calls (to databases, feature flags, or other services) must be reliably initialized, executed, and torn down.

Links:

Key Features

  • Structured Lifecycle: Enforces a rigid pre-execute -> execute -> post-execute lifecycle for all tasks.
  • Tiered Parallel Execution: Tasks are grouped into integer "levels." All tasks at a given level are executed in parallel, and the framework ensures all tasks at level N complete successfully before level N+1 begins.
  • Guaranteed Cleanup: post-execute runs for any task whose pre-execute was started, regardless of whether it succeeded, failed, or was cancelled. This ensures resources are always released.
  • Declarative & Flexible: Define tasks as simple data classes. Methods for each phase are optional, and each can have its own timeout and retry configuration.
  • Lightweight & Dependency-Free: Runs directly inside your application's event loop with no external dependencies, schedulers, or databases, with minimal overhead (see Performance section for details).

Installation

pip install sdax

Or for development:

git clone https://github.com/owebeeone/sdax.git
cd sdax
pip install -e .

Quick Start

import asyncio
from dataclasses import dataclass
from sdax import AsyncTaskProcessor, AsyncTask, TaskFunction

# 1. Define your context class with typed fields
@dataclass
class TaskContext:
    user_id: int | None = None
    feature_flags: dict | None = None
    db_connection = None

# 2. Define your task functions
async def check_auth(ctx: TaskContext):
    print("Level 1: Checking authentication...")
    await asyncio.sleep(0.1)
    ctx.user_id = 123
    print("Auth successful.")

async def load_feature_flags(ctx: TaskContext):
    print("Level 1: Loading feature flags...")
    await asyncio.sleep(0.2)
    ctx.feature_flags = {"new_api": True}
    print("Flags loaded.")

async def fetch_user_data(ctx: TaskContext):
    print("Level 2: Fetching user data...")
    if not ctx.user_id:
        raise ValueError("Auth failed, cannot fetch user data.")
    await asyncio.sleep(0.1)
    print("User data fetched.")

async def close_db_connection(ctx: TaskContext):
    print("Tearing down db connection...")
    await asyncio.sleep(0.05)
    print("Connection closed.")

async def main():
    # 3. Create a processor and a context
    processor = AsyncTaskProcessor()
    ctx = TaskContext()

    # 4. Declaratively define your workflow
    processor.add_task(
        level=1,
        task=AsyncTask(
            name="Authentication",
            pre_execute=TaskFunction(function=check_auth),
            post_execute=TaskFunction(function=close_db_connection)
        )
    )
    processor.add_task(
        level=1,
        task=AsyncTask(
            name="FeatureFlags",
            pre_execute=TaskFunction(function=load_feature_flags)
        )
    )
    processor.add_task(
        level=2,
        task=AsyncTask(
            name="UserData",
            execute=TaskFunction(function=fetch_user_data)
        )
    )

    # 4. Run the processor
    try:
        await processor.process_tasks(ctx)
        print("\nWorkflow completed successfully!")
    except* Exception as e:
        print(f"\nWorkflow failed: {e.exceptions[0]}")

if __name__ == "__main__":
    asyncio.run(main())

Important: Cleanup Guarantees & Resource Management

⚠️ Critical Behavior: post_execute runs for any task whose pre_execute was started, even if:

  • pre_execute raised an exception
  • pre_execute was cancelled (due to a sibling task failure)
  • pre_execute timed out

This is by design for resource management. If your pre_execute acquires resources (opens files, database connections, locks), your post_execute must be idempotent and handle partial initialization.

Example: Safe Resource Management

@dataclass
class TaskContext:
    lock: asyncio.Lock | None = None
    lock_acquired: bool = False

async def acquire_lock(ctx: TaskContext):
    ctx.lock = await some_lock.acquire()
    # If cancelled here, lock is acquired but flag not set
    ctx.lock_acquired = True

async def release_lock(ctx: TaskContext):
    # ✅ GOOD: Check if we actually acquired the lock
    if ctx.lock_acquired and ctx.lock:
        await ctx.lock.release()
    # ✅ GOOD: Or use try/except for safety
    try:
        if ctx.lock:
            await ctx.lock.release()
    except Exception:
        pass  # Already released or never acquired

Why this matters: In parallel execution, if one task fails, all other tasks in that level are cancelled. Without guaranteed cleanup, you'd leak resources.

Execution Model

The "Elevator" Pattern

Tasks execute in a strict "elevator up, elevator down" pattern:

Level 1: [A-pre, B-pre, C-pre] ─┐
                                 ├─→ (parallel)
Level 2: [D-pre, E-pre] ────────┘
                                 ├─→ (parallel)  
Execute: [A-exec, B-exec, D-exec, E-exec] ─┘
                                 
Teardown: ┌─ [D-post, E-post] (parallel)
          └─ [A-post, B-post, C-post] (parallel)

Key Rules:

  1. Within a level, tasks run in parallel
  2. Levels execute sequentially (level N+1 waits for level N)
  3. execute phase runs after all pre_execute phases complete
  4. post_execute runs in reverse level order (LIFO)
  5. If any task fails, remaining tasks are cancelled but cleanup still runs

Task Phases

Each task can define up to 3 optional phases:

Phase When It Runs Purpose Cleanup Guarantee
pre_execute First, by level Initialize resources, setup post_execute runs if started
execute After all pre_execute Do main work post_execute runs if pre_execute started
post_execute Last, reverse order Cleanup, release resources Always runs if pre_execute started

Performance

Benchmarks (single-threaded, zero-work tasks):

Python Version Multi-level Single Large Level Framework Overhead
Python 3.13 ~137,000 tasks/sec ~21,500 tasks/sec ~7µs per task
Python 3.11 ~15,000 tasks/sec ~159 tasks/sec ~67µs per task

Python 3.13 has significantly improved asyncio performance compared to 3.11. Benchmarks show 9x better throughput in many scenarios.

Key Observations:

  • Multi-level execution: ~79% of raw asyncio performance (Python 3.13)
  • Scalability: Tested with 1,000+ tasks across 100 levels
  • Real-world performance: For typical I/O-bound tasks (10ms+), framework overhead is <0.1% and negligible

When to use:

  • ✅ I/O-bound workflows (database, HTTP, file operations)
  • ✅ Complex multi-step operations with dependencies
  • ✅ Multiple levels with reasonable task counts (5-50 tasks/level)
  • ✅ Tasks where guaranteed cleanup is critical

When NOT to use:

  • ❌ CPU-bound work (use ProcessPoolExecutor instead)
  • ❌ Single level with 100+ parallel tasks (use raw asyncio.TaskGroup)
  • ❌ Simple linear async/await (unnecessary overhead)
  • ❌ Ultra high-frequency operations (>100k ops/sec needed)

Use Cases

✅ Perfect For

  1. Complex API Endpoints

    Level 1: [Auth, RateLimit, FeatureFlags]  # Parallel
    Level 2: [FetchUser, FetchPermissions]     # Depends on Level 1
    Level 3: [LoadData, ProcessRequest]        # Depends on Level 2
    
  2. Data Pipeline Steps

    Level 1: [OpenDBConnection, OpenFileHandle]
    Level 2: [ReadData, TransformData]
    Level 3: [WriteResults]
    Post: Always close connections/files
    
  3. Build/Deploy Systems

    Level 1: [CheckoutCode, ValidateConfig]
    Level 2: [RunTests, BuildArtifacts]
    Level 3: [Deploy, NotifySlack]
    

❌ Not Suitable For

  • Simple sequential operations (just use await)
  • Fire-and-forget background tasks (use asyncio.create_task)
  • Distributed workflows (use Celery, Airflow)
  • Event-driven systems (use message queues)

Error Handling

Tasks can fail at any phase. The framework:

  1. Cancels remaining tasks at the same level
  2. Runs cleanup for all tasks that started pre_execute
  3. Collects all exceptions into an ExceptionGroup
  4. Raises the group after cleanup completes
try:
    await processor.process_tasks(ctx)
except* ValueError as eg:
    # Handle specific exception type
    for exc in eg.exceptions:
        print(f"Validation error: {exc}")
except* TimeoutError as eg:
    # Handle timeouts
    for exc in eg.exceptions:
        print(f"Task timed out: {exc}")
except ExceptionGroup as eg:
    # Handle all errors
    print(f"Multiple failures: {eg}")

Advanced Features

Per-Task Configuration

Each task function can have its own timeout and retry settings:

AsyncTask(
    name="FlakeyAPI",
    execute=TaskFunction(
        function=call_external_api,
        timeout=5.0,        # 5 second timeout
        retries=3,          # Retry 3 times
        backoff_factor=2.0  # Exponential backoff: 2s, 4s, 8s
    )
)

Shared Context

You define your own context class with typed fields:

@dataclass
class TaskContext:
    user_id: int | None = None
    permissions: list[str] = field(default_factory=list)
    db_connection: Any = None

async def task_a(ctx: TaskContext):
    ctx.user_id = 123  # Set data

async def task_b(ctx: TaskContext):
    user_id = ctx.user_id  # Read data from task_a, with full type hints!

Note: The context is shared but not thread-safe. Since tasks run in a single asyncio event loop, no locking is needed.

Testing

Run the test suite:

pytest tests/ -v

Performance benchmarks:

python tests/test_performance.py -v

Monte Carlo stress testing (runs ~2,750 tasks with random failures):

python tests/test_monte_carlo.py -v

Comparison to Alternatives

Feature sdax Celery Airflow Raw asyncio
Setup complexity Minimal High Very High None
External dependencies None Redis/RabbitMQ PostgreSQL/MySQL None
Throughput ~137k tasks/sec ~500 tasks/sec ~50 tasks/sec ~174k ops/sec
Overhead ~7µs/task Varies High Minimal
Use case In-process workflows Distributed tasks Complex DAGs Simple async
Guaranteed cleanup ✅ Yes ❌ No ❌ No Manual
Level-based execution ✅ Yes ❌ No ✅ Yes Manual

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sdax-0.2.0.tar.gz (17.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sdax-0.2.0-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file sdax-0.2.0.tar.gz.

File metadata

  • Download URL: sdax-0.2.0.tar.gz
  • Upload date:
  • Size: 17.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sdax-0.2.0.tar.gz
Algorithm Hash digest
SHA256 0ac46213cf8611060b6e77f8d472ee84ea0a8d766c7cb2f5766dc23c2288a734
MD5 d21eacee7c38003fe11044762e674548
BLAKE2b-256 620d717e20465d2cb50b4465c3972a99663e863ea19344e35e4ba8e48b60a0c7

See more details on using hashes here.

Provenance

The following attestation bundles were made for sdax-0.2.0.tar.gz:

Publisher: publish.yml on owebeeone/sdax

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sdax-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: sdax-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 9.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sdax-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2bdbaba6f57662530ddc4f817e075770441a4aed9dda2aada9d4c6147a7e7280
MD5 938568716f8d901e581027647c2bfade
BLAKE2b-256 35341cc2e760165ec8bce3ec0b5c97752b7c9b6cbe8f16f8ade082cf8e54e417

See more details on using hashes here.

Provenance

The following attestation bundles were made for sdax-0.2.0-py3-none-any.whl:

Publisher: publish.yml on owebeeone/sdax

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page