Skip to main content

Distributed Durable Functions in Python

Project description

Senpuki

Distributed durable functions for Python. Write reliable, stateful workflows using async/await.

pip install senpuki

Quick Example

import asyncio
from senpuki import Senpuki, Result

@Senpuki.durable()
async def process_order(order_id: str) -> dict:
    await asyncio.sleep(1)  # Simulate work
    return {"order_id": order_id, "status": "processed"}

@Senpuki.durable()
async def order_workflow(order_ids: list[str]) -> Result[list, Exception]:
    results = []
    for order_id in order_ids:
        result = await process_order(order_id)
        results.append(result)
    return Result.Ok(results)

async def main():
    backend = Senpuki.backends.SQLiteBackend("workflow.db")
    await backend.init_db()
    executor = Senpuki(backend=backend)
    
    worker = asyncio.create_task(executor.serve())
    
    exec_id = await executor.dispatch(order_workflow, ["ORD-001", "ORD-002"])
    result = await executor.wait_for(exec_id)
    print(result.value)

asyncio.run(main())

Why Senpuki?

Feature Temporal Celery Prefect Airflow Senpuki
Durable Execution Yes No Partial No Yes
Setup Complexity High Medium Medium High Very Low
Infrastructure Server cluster Broker Server Multi-component SQLite/Postgres
Native Async Yes No Yes Limited Yes

Senpuki fills the gap between simple task queues (Celery) and enterprise platforms (Temporal):

  • vs Temporal: Same durability guarantees, fraction of the infrastructure
  • vs Celery/Dramatiq: True workflow durability, not just task retries
  • vs Prefect/Airflow: Application workflows, not batch data pipelines

See full comparison for details.

Features

  • Durable Execution - Workflow state survives crashes and restarts
  • Automatic Retries - Configurable retry policies with exponential backoff
  • Distributed Workers - Scale horizontally across multiple processes
  • Parallel Execution - Fan-out/fan-in with asyncio.gather and Senpuki.map
  • Rate Limiting - Control concurrent executions per function
  • External Signals - Coordinate workflows with external events
  • Dead Letter Queue - Inspect and replay failed tasks
  • Idempotency & Caching - Prevent duplicate work
  • Multiple Backends - SQLite (dev) or PostgreSQL (production)
  • OpenTelemetry - Distributed tracing support

Key Concepts

from senpuki import Senpuki, RetryPolicy

# Configurable retry policies
@Senpuki.durable(
    retry_policy=RetryPolicy(max_attempts=5, initial_delay=1.0),
    queue="high_priority",
    max_concurrent=10,  # Rate limiting
    idempotent=True,    # Prevent duplicate execution
)
async def my_activity(data: dict) -> dict:
    ...

# Durable sleep (doesn't block workers)
await Senpuki.sleep("30m")

# Parallel execution
results = await asyncio.gather(*[process(item) for item in items])
# Or optimized for large batches:
results = await Senpuki.map(process, items)

# External signals
payload = await Senpuki.wait_for_signal("approval")
await executor.send_signal(exec_id, "approval", {"approved": True})

Backends

# SQLite (development)
backend = Senpuki.backends.SQLiteBackend("senpuki.db")

# PostgreSQL (production)
backend = Senpuki.backends.PostgresBackend("postgresql://user:pass@host/db")

# Optional: Redis for low-latency notifications
executor = Senpuki(
    backend=backend,
    notification_backend=Senpuki.notifications.RedisBackend("redis://localhost")
)

CLI

senpuki list                    # List executions
senpuki show <exec_id>          # Show execution details
senpuki dlq list                # List dead-lettered tasks
senpuki dlq replay <task_id>    # Replay failed task

Documentation

Full documentation available in docs/:

Examples

See examples/ for complete workflows:

  • simple_flow.py - Basic workflow
  • saga_trip_booking.py - Saga pattern with compensation
  • batch_processing.py - Fan-out/fan-in
  • media_pipeline.py - Complex multi-stage pipeline

Requirements

  • Python 3.12+
  • aiosqlite or asyncpg (backend)
  • redis (optional, for notifications)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senpuki-0.2.0.tar.gz (60.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

senpuki-0.2.0-py3-none-any.whl (75.6 kB view details)

Uploaded Python 3

File details

Details for the file senpuki-0.2.0.tar.gz.

File metadata

  • Download URL: senpuki-0.2.0.tar.gz
  • Upload date:
  • Size: 60.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for senpuki-0.2.0.tar.gz
Algorithm Hash digest
SHA256 fcfaff608782f3388602191e06b3ee910528e1acebdf7658bf0e3c62923b6ccb
MD5 ff63be120f70068b769e24491190fb44
BLAKE2b-256 35786f770a714c61db4a89a34771eaaa58ddcc342dafd39878a73f12134fee2b

See more details on using hashes here.

File details

Details for the file senpuki-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: senpuki-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 75.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for senpuki-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 46fa68b6c6625e877354a262778d7fddccc51c4361d288f1b61282a266d8f7fd
MD5 ebe5d7997c6d938cf6774b3ad4f68ca6
BLAKE2b-256 c42663481427b51cca416794c2f73b000c8d8a09d61c0d92fee0f356db3ceb3d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page