Distributed Durable Functions in Python
Project description
Senpuki
Distributed durable functions for Python. Write reliable, stateful workflows using async/await.
pip install senpuki
Quick Example
import asyncio
from senpuki import Senpuki, Result
@Senpuki.durable()
async def process_order(order_id: str) -> dict:
await asyncio.sleep(1) # Simulate work
return {"order_id": order_id, "status": "processed"}
@Senpuki.durable()
async def order_workflow(order_ids: list[str]) -> Result[list, Exception]:
results = []
for order_id in order_ids:
result = await process_order(order_id)
results.append(result)
return Result.Ok(results)
async def main():
backend = Senpuki.backends.SQLiteBackend("workflow.db")
await backend.init_db()
executor = Senpuki(backend=backend)
worker = asyncio.create_task(executor.serve())
exec_id = await executor.dispatch(order_workflow, ["ORD-001", "ORD-002"])
result = await executor.wait_for(exec_id)
print(result.value)
asyncio.run(main())
Why Senpuki?
| Feature | Temporal | Celery | Prefect | Airflow | Senpuki |
|---|---|---|---|---|---|
| Durable Execution | Yes | No | Partial | No | Yes |
| Setup Complexity | High | Medium | Medium | High | Very Low |
| Infrastructure | Server cluster | Broker | Server | Multi-component | SQLite/Postgres |
| Native Async | Yes | No | Yes | Limited | Yes |
Senpuki fills the gap between simple task queues (Celery) and enterprise platforms (Temporal):
- vs Temporal: Same durability guarantees, fraction of the infrastructure
- vs Celery/Dramatiq: True workflow durability, not just task retries
- vs Prefect/Airflow: Application workflows, not batch data pipelines
See full comparison for details.
Features
- Durable Execution - Workflow state survives crashes and restarts
- Automatic Retries - Configurable retry policies with exponential backoff
- Distributed Workers - Scale horizontally across multiple processes
- Parallel Execution - Fan-out/fan-in with
asyncio.gatherandSenpuki.map - Rate Limiting - Control concurrent executions per function
- External Signals - Coordinate workflows with external events
- Dead Letter Queue - Inspect and replay failed tasks
- Idempotency & Caching - Prevent duplicate work
- Multiple Backends - SQLite (dev) or PostgreSQL (production)
- OpenTelemetry - Distributed tracing support
Key Concepts
from senpuki import Senpuki, RetryPolicy
# Configurable retry policies
@Senpuki.durable(
retry_policy=RetryPolicy(max_attempts=5, initial_delay=1.0),
queue="high_priority",
max_concurrent=10, # Rate limiting
idempotent=True, # Prevent duplicate execution
)
async def my_activity(data: dict) -> dict:
...
# Durable sleep (doesn't block workers)
await Senpuki.sleep("30m")
# Parallel execution
results = await asyncio.gather(*[process(item) for item in items])
# Or optimized for large batches:
results = await Senpuki.map(process, items)
# External signals
payload = await Senpuki.wait_for_signal("approval")
await executor.send_signal(exec_id, "approval", {"approved": True})
Backends
# SQLite (development)
backend = Senpuki.backends.SQLiteBackend("senpuki.db")
# PostgreSQL (production)
backend = Senpuki.backends.PostgresBackend("postgresql://user:pass@host/db")
# Optional: Redis for low-latency notifications
executor = Senpuki(
backend=backend,
notification_backend=Senpuki.notifications.RedisBackend("redis://localhost")
)
CLI
senpuki list # List executions
senpuki show <exec_id> # Show execution details
senpuki dlq list # List dead-lettered tasks
senpuki dlq replay <task_id> # Replay failed task
Documentation
Full documentation available in docs/:
- Getting Started | Core Concepts | Comparison
- Guides: Durable Functions | Orchestration | Error Handling | Parallel Execution | Signals | Workers | Monitoring
- Patterns: Saga | Batch Processing
- Reference: API | Configuration | Deployment
Examples
See examples/ for complete workflows:
simple_flow.py- Basic workflowsaga_trip_booking.py- Saga pattern with compensationbatch_processing.py- Fan-out/fan-inmedia_pipeline.py- Complex multi-stage pipeline
Requirements
- Python 3.12+
aiosqliteorasyncpg(backend)redis(optional, for notifications)
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file senpuki-0.2.0.tar.gz.
File metadata
- Download URL: senpuki-0.2.0.tar.gz
- Upload date:
- Size: 60.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fcfaff608782f3388602191e06b3ee910528e1acebdf7658bf0e3c62923b6ccb
|
|
| MD5 |
ff63be120f70068b769e24491190fb44
|
|
| BLAKE2b-256 |
35786f770a714c61db4a89a34771eaaa58ddcc342dafd39878a73f12134fee2b
|
File details
Details for the file senpuki-0.2.0-py3-none-any.whl.
File metadata
- Download URL: senpuki-0.2.0-py3-none-any.whl
- Upload date:
- Size: 75.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46fa68b6c6625e877354a262778d7fddccc51c4361d288f1b61282a266d8f7fd
|
|
| MD5 |
ebe5d7997c6d938cf6774b3ad4f68ca6
|
|
| BLAKE2b-256 |
c42663481427b51cca416794c2f73b000c8d8a09d61c0d92fee0f356db3ceb3d
|