Skip to main content

Distributed Durable Functions in Python

Project description

Senpuki: Distributed Durable Functions for Python

Senpuki is a lightweight, asynchronous, distributed task orchestration library for Python. It allows you to write stateful, reliable workflows ("durable functions") using standard Python async/await syntax. Senpuki handles the complexity of persisting state, retrying failures, and distributing work across a pool of workers.

Table of Contents


Core Concepts

  • Durable Functions: Python async functions decorated with @Senpuki.durable(). They can be orchestrators (calling other functions) or activities (doing work).
  • Orchestrator: A durable function that schedules other durable functions. It sleeps while waiting for sub-tasks to complete, freeing up worker resources.
  • Activity: A leaf-node durable function that performs a specific action (e.g., API call, DB operation).
  • Execution: A single run of a workflow. It has a unique ID and persistent state.
  • Worker: A process that polls the backend storage for pending tasks and executes them.

Installation

pip install senpuki

Requirements:

  • Python 3.12+
  • aiosqlite (optional, for SQLite backend async support)
  • asyncpg (optional, for PostgreSQL backend async support)
  • redis (optional, for Redis notification support)

Quick Start

  1. Define your workflow:

    import asyncio
    from senpuki import Senpuki, Result
    
    # 1. Define an activity
    @Senpuki.durable()
    async def greet(name: str) -> str:
        await asyncio.sleep(0.1) # Simulate work
        return f"Hello, {name}!"
    
    # 2. Define an orchestrator
    @Senpuki.durable()
    async def workflow(names: list[str]) -> Result[list[str], Exception]:
        results = []
        for name in names:
            # Call activity (awaiting it schedules it and waits for result)
            res = await greet(name) 
            results.append(res)
        return Result.Ok(results)
    
  2. Run the system:

    async def main():
        # Setup Backend
        backend = Senpuki.backends.SQLiteBackend("senpuki.sqlite")
        await backend.init_db()
        executor = Senpuki(backend=backend)
    
        # Start a Worker (in background)
        worker = asyncio.create_task(executor.serve())
    
        # Dispatch Workflow
        exec_id = await executor.dispatch(workflow, ["Alice", "Bob"])
        print(f"Started execution: {exec_id}")
    
        # Wait for Result
        while True:
            state = await executor.state_of(exec_id)
            if state.state in ("completed", "failed"):
                break
            await asyncio.sleep(0.5)
    
        result = await executor.result_of(exec_id)
        print(result.value) # ['Hello, Alice!', 'Hello, Bob!']
    
    if __name__ == "__main__":
        asyncio.run(main())
    

Features Guide

Defining Durable Functions

Use the @Senpuki.durable decorator. You can configure retry policies, caching, and queues here.

from senpuki import Senpuki, RetryPolicy

@Senpuki.durable(
    retry_policy=RetryPolicy(max_attempts=3, initial_delay=1.0),
    queue="high_priority",
    tags=["billing"]
)
async def charge_card(amount: int):
    ...

Orchestration & Activities

When a durable function calls another durable function (e.g., await other_func()), Senpuki intercepts this call.

  • It persists a Task record for the child function.
  • The parent function "sleeps" (suspends) until the child task is completed by a worker.
  • This allows workflows to run over days or weeks without consuming memory while waiting.

Retries & Error Handling

Failures happen. Senpuki allows declarative retry policies.

policy = RetryPolicy(
    max_attempts=5,
    backoff_factor=2.0, # Exponential backoff
    jitter=0.1,         # Add randomness to prevent thundering herd
    retry_for=(ConnectionError, ExpiryError) # Only retry these exceptions
)

@Senpuki.durable(retry_policy=policy)
async def unstable_api_call():
    ...

If the function fails after all retries, the Execution is marked as failed, and the error is propagated to the parent orchestrator (if any), which can catch it using standard try/except.

Idempotency & Caching

To prevent duplicate side-effects (like charging a card twice) or re-doing expensive work:

  1. Idempotency: Results are stored permanently. If a task is scheduled again with the same arguments (and version), the stored result is returned immediately without running the function.
  2. Caching: Similar to idempotency but implies the result can be reused across different executions if the key matches.
@Senpuki.durable(idempotent=True)
async def send_email(user_id: str, subject: str):
    # Safe to call multiple times; will only execute once per unique arguments
    ...

@Senpuki.durable(cached=True, version="v1")
async def heavy_compute(data_hash: str):
    # Result stored in cache table; subsequent calls return immediately
    ...

Parallel Execution (Fan-out/Fan-in)

Use standard asyncio.gather to run tasks in parallel. Senpuki schedules them all, and the worker pool executes them concurrently.

@Senpuki.durable()
async def batch_processor(items: list[int]):
    tasks = []
    for item in items:
        # Schedule all tasks
        tasks.append(process_item(item))
    
    # Wait for all to complete
    results = await asyncio.gather(*tasks)
    return sum(results)

Timeouts & Expirys

You can set a expiry for the entire execution. If it exceeds this duration, it is cancelled.

exec_id = await executor.dispatch(long_workflow, expiry="1h 30m")

Architecture & Backends

Senpuki is backend-agnostic.

SQLite Backend

Included by default. Stores state in a local SQLite file.

  • Best for: Development, testing, single-node deployments, embedded workflows.
  • Features: Full persistence, async support.

Postgres Backend

  • Best for: Production environments, concurrent access, high reliability.
  • Features: Uses asyncpg for high performance.

Mongo Backend (Planned)

  • Best for: Distributed production clusters, high availability.

Redis (Notifications)

Optional. Uses Redis Pub/Sub to notify orchestrators immediately when a task finishes, reducing polling latency.


Running Workers

The executor.serve() method runs the worker loop. In production, you typically run this in a separate process or container.

# worker.py
async def run_worker():
    backend = Senpuki.backends.SQLiteBackend("prod.db")
    executor = Senpuki(backend=backend)
    
    # Consume only specific queues
    await executor.serve(
        queues=["default", "high_priority"],
        max_concurrency=50
    )

You can scale horizontally by running multiple worker instances pointing to the same database.


Examples

See the examples/ folder for complete code:

  1. simple_flow.py: Basic parent-child function calls.
  2. failing_flow.py: Demonstrates automatic retries and Dead Letter Queue (DLQ) behavior.
  3. complex_workflow.py: A data pipeline showcasing caching, retries, and expirys.
  • batch_processing.py: Fan-out/fan-in pattern (processing multiple items in parallel).
  • saga_trip_booking.py: Saga pattern with compensation (rollback) logic.
  • media_pipeline.py: A complex 5-minute simulation of a media processing pipeline (Validation -> Safety -> Transcode/AI -> Package) with a live progress dashboard.

Requirements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senpuki-0.1.0.tar.gz (45.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

senpuki-0.1.0-py3-none-any.whl (58.4 kB view details)

Uploaded Python 3

File details

Details for the file senpuki-0.1.0.tar.gz.

File metadata

  • Download URL: senpuki-0.1.0.tar.gz
  • Upload date:
  • Size: 45.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.7

File hashes

Hashes for senpuki-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6aa0da0e04465b32d0bba8d5706f91ee550383696dc727a4f2b32709b8c8493a
MD5 2371f3f24adfbcdcab65c756a4ec0783
BLAKE2b-256 d0616a16a2f8630ec9c36e90b3b97dfd2da09cdd2d561922b53373ac252fbcc2

See more details on using hashes here.

File details

Details for the file senpuki-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: senpuki-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 58.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.7

File hashes

Hashes for senpuki-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 be5da24f971787bf2ecf6704d2de453cfc9bca0a9f82b88e0476b71f1d8d5819
MD5 81a7950e893176773feaa4c32e5a273f
BLAKE2b-256 8bec08e78b2c4e5a8a67f0dec60dccc04885808a7087d8bbef57a9f59c8502a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page