Skip to main content

Distributed Durable Functions in Python

Project description

Senpuki

Distributed durable functions for Python. Write reliable, stateful workflows using async/await.

pip install senpuki

Quick Example

import asyncio
from senpuki import Senpuki, Result

@Senpuki.durable()
async def process_order(order_id: str) -> dict:
    await asyncio.sleep(1)  # Simulate work
    return {"order_id": order_id, "status": "processed"}

@Senpuki.durable()
async def order_workflow(order_ids: list[str]) -> Result[list, Exception]:
    results = []
    for order_id in order_ids:
        result = await process_order(order_id)
        results.append(result)
    return Result.Ok(results)

async def main():
    backend = Senpuki.backends.SQLiteBackend("workflow.db")
    await backend.init_db()
    executor = Senpuki(backend=backend)
    
    worker = asyncio.create_task(executor.serve())
    
    exec_id = await executor.dispatch(order_workflow, ["ORD-001", "ORD-002"])
    result = await executor.wait_for(exec_id)
    print(result.value)

asyncio.run(main())

Why Senpuki?

Feature Temporal Celery Prefect Airflow Senpuki
Durable Execution Yes No Partial No Yes
Setup Complexity High Medium Medium High Very Low
Infrastructure Server cluster Broker Server Multi-component SQLite/Postgres
Native Async Yes No Yes Limited Yes

Senpuki fills the gap between simple task queues (Celery) and enterprise platforms (Temporal):

  • vs Temporal: Same durability guarantees, fraction of the infrastructure
  • vs Celery/Dramatiq: True workflow durability, not just task retries
  • vs Prefect/Airflow: Application workflows, not batch data pipelines

See full comparison for details.

Features

  • Durable Execution - Workflow state survives crashes and restarts
  • Automatic Retries - Configurable retry policies with exponential backoff
  • Distributed Workers - Scale horizontally across multiple processes
  • Parallel Execution - Fan-out/fan-in with asyncio.gather and Senpuki.map
  • Rate Limiting - Control concurrent executions per function
  • External Signals - Coordinate workflows with external events
  • Dead Letter Queue - Inspect and replay failed tasks
  • Idempotency & Caching - Prevent duplicate work
  • Multiple Backends - SQLite (dev) or PostgreSQL (production)
  • OpenTelemetry - Distributed tracing support

Key Concepts

from senpuki import Senpuki, RetryPolicy

# Configurable retry policies
@Senpuki.durable(
    retry_policy=RetryPolicy(max_attempts=5, initial_delay=1.0),
    queue="high_priority",
    max_concurrent=10,  # Rate limiting
    idempotent=True,    # Prevent duplicate execution
)
async def my_activity(data: dict) -> dict:
    ...

# Durable sleep (doesn't block workers)
await Senpuki.sleep("30m")

# Parallel execution
results = await asyncio.gather(*[process(item) for item in items])
# Or optimized for large batches:
results = await Senpuki.map(process, items)

# External signals
payload = await Senpuki.wait_for_signal("approval")
await executor.send_signal(exec_id, "approval", {"approved": True})

Backends

# SQLite (development)
backend = Senpuki.backends.SQLiteBackend("senpuki.db")

# PostgreSQL (production)
backend = Senpuki.backends.PostgresBackend("postgresql://user:pass@host/db")

# Optional: Redis for low-latency notifications
executor = Senpuki(
    backend=backend,
    notification_backend=Senpuki.notifications.RedisBackend("redis://localhost")
)

CLI

senpuki list                    # List executions
senpuki show <exec_id>          # Show execution details
senpuki dlq list                # List dead-lettered tasks
senpuki dlq replay <task_id>    # Replay failed task

Documentation

Full documentation available in docs/:

Examples

See examples/ for complete workflows:

  • simple_flow.py - Basic workflow
  • saga_trip_booking.py - Saga pattern with compensation
  • batch_processing.py - Fan-out/fan-in
  • media_pipeline.py - Complex multi-stage pipeline

Requirements

  • Python 3.12+
  • aiosqlite or asyncpg (backend)
  • redis (optional, for notifications)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senpuki-0.3.0.tar.gz (79.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

senpuki-0.3.0-py3-none-any.whl (98.3 kB view details)

Uploaded Python 3

File details

Details for the file senpuki-0.3.0.tar.gz.

File metadata

  • Download URL: senpuki-0.3.0.tar.gz
  • Upload date:
  • Size: 79.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for senpuki-0.3.0.tar.gz
Algorithm Hash digest
SHA256 462a68074add7b066906af5fba030362015e10323b5f75bfdac801defea1ddf4
MD5 b7be081697c8293490ba5c1a9824e5b6
BLAKE2b-256 512b678e3309c79777ca09ea9899bf1d40ebec7de51a3eab641149cfa8234e54

See more details on using hashes here.

File details

Details for the file senpuki-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: senpuki-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 98.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for senpuki-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 591f4f9b96864f4a7e23bbd5806ca2881f44a4225cae70d25fb8eaafecec74a9
MD5 8347023319ce130dae53ba388fb16a64
BLAKE2b-256 d0714daf7f8d3f78fbe58f9ded29e766ac6a427d8a5ab55c3955c1c7c259a4e4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page