Durable execution framework for Python
Project description
PyErgon - Durable Execution Framework for Python
Pure Python implementation of durable execution with Temporal-like semantics.
Features
- Durable Steps: Automatically cached and retried on failure
- Durable Timers: Event-driven timers survive process restarts
- Distributed Workers: Multiple workers process flows from shared queue
- Event-Driven Architecture: Workers wake on events (new work, timer expiry, new timer)
- Storage Backends: SQLite, Redis, and in-memory implementations
Design Philosophy
PyErgon follows practical software engineering principles:
- Simple, readable code over clever abstractions
- Clear naming without cryptic abbreviations
- Explicit dependencies and no global state
- Errors handled once at the appropriate level
- Type hints and protocols for structural typing
- Composable components with focused responsibilities
Installation
# Clone repository
git clone <repo-url>
cd pyergon
# Install dependencies with uv
uv sync
Dependencies:
- Python 3.11+
- aiosqlite >= 0.19.0
Quick Start
import asyncio
from dataclasses import dataclass
from pyergon import flow, flow_type, step, Executor
from pyergon.storage.sqlite import SqliteExecutionLog
@dataclass
@flow_type
class OrderProcessor:
order_id: str
amount: float
@step
async def validate(self):
print(f"[{self.order_id}] Validating...")
return self.amount > 0
@step
async def process_payment(self):
print(f"[{self.order_id}] Processing ${self.amount}...")
return f"payment-{self.order_id}"
@flow
async def run(self):
await self.validate()
return await self.process_payment()
async def main():
# Setup storage
storage = await SqliteExecutionLog.in_memory()
# Execute workflow
order = OrderProcessor("ORD-001", 100.0)
executor = Executor(order, storage, "order-001")
outcome = await executor.run(lambda o: o.run())
print(f"Result: {outcome.result}")
await storage.close()
asyncio.run(main())
Distributed Workers with Timers
from dataclasses import dataclass
from pyergon import flow, flow_type, step, Scheduler, Worker
from pyergon.storage.sqlite import SqliteExecutionLog
from pyergon.executor.timer import schedule_timer_named
from pyergon.core import TaskStatus
@dataclass
@flow_type
class TimedOrderProcessor:
order_id: str
@step
async def wait_for_fraud_check(self):
print(f"[{self.order_id}] Waiting for fraud check...")
await schedule_timer_named(2.0, f"fraud-{self.order_id}")
print(f"[{self.order_id}] Fraud check complete")
@flow
async def process(self):
await self.wait_for_fraud_check()
return "completed"
async def main():
# Setup
storage = SqliteExecutionLog("distributed.db")
await storage.connect()
scheduler = Scheduler(storage)
# Start workers with timer processing
worker1 = Worker(storage, "worker-1", enable_timers=True)
await worker1.register(TimedOrderProcessor)
handle1 = await worker1.start()
worker2 = Worker(storage, "worker-2", enable_timers=True)
await worker2.register(TimedOrderProcessor)
handle2 = await worker2.start()
# Schedule flows
for i in range(3):
order = TimedOrderProcessor(f"ORD-{i:03d}")
await scheduler.schedule(order)
# Wait for completion
await asyncio.sleep(5)
# Shutdown
await handle1.shutdown()
await handle2.shutdown()
await storage.close()
asyncio.run(main())
Architecture
Core Components
-
Core Types (
pyergon.core)Invocation: Single step execution recordInvocationStatus: Step state machineScheduledFlow: Distributed queue item
-
Storage Layer (
pyergon.storage)ExecutionLog: Abstract protocol for persistenceSqliteExecutionLog: SQLite backendInMemoryExecutionLog: In-memory backend for testing
-
Executor (
pyergon.executor)Executor: Execute flows with durable contextScheduler: Enqueue flows for distributed processingWorker: Process flows from queueschedule_timer: Durable timersawait_external_signal: External event coordination
-
Decorators (
pyergon.decorators)@flow_type: Mark workflow class@flow: Mark flow entry point method@step: Mark durable step method
Examples
See examples/ directory for complete examples:
PYTHONPATH=src uv run python examples/simple_timer_sqlite.py
Testing
# Run all tests (60 tests, 48% coverage)
uv run pytest tests/
# Run specific test file
uv run pytest tests/test_durability.py -v
# Type checking
mypy src/pyergon/
# Linting
ruff check src/pyergon/
Development
Code Quality
- Comprehensive docstrings
- Type hints throughout
- Protocol-based interfaces
- Example usage in every module
- Test coverage with property-based testing
License
MIT / Apache 2.0 (dual license)
Credits
Inspired by:
- Temporal - Durable execution engine
- Dave Cheney - Practical Go principles
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyergon-0.3.0.tar.gz.
File metadata
- Download URL: pyergon-0.3.0.tar.gz
- Upload date:
- Size: 116.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2d928be43cf3f695754700a6bc363148968fb04f728027a1cef1a013cc7a748
|
|
| MD5 |
4fc0a115aef58f6f60e3667ca4421201
|
|
| BLAKE2b-256 |
57e4f41ebc712c5237eb51b1fda3a75178e17bba6a6793e78d72460dbcda189c
|
Provenance
The following attestation bundles were made for pyergon-0.3.0.tar.gz:
Publisher:
publish.yml on richinex/pyergon
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyergon-0.3.0.tar.gz -
Subject digest:
d2d928be43cf3f695754700a6bc363148968fb04f728027a1cef1a013cc7a748 - Sigstore transparency entry: 790518178
- Sigstore integration time:
-
Permalink:
richinex/pyergon@56c7bb58a60f1dcf8d01eed91c8300f28274ed7b -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/richinex
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@56c7bb58a60f1dcf8d01eed91c8300f28274ed7b -
Trigger Event:
release
-
Statement type:
File details
Details for the file pyergon-0.3.0-py3-none-any.whl.
File metadata
- Download URL: pyergon-0.3.0-py3-none-any.whl
- Upload date:
- Size: 114.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b7b6cec0e071dc8426d13f8d796c2cd318fb33cc22233e41dcc3aedd50220268
|
|
| MD5 |
668fba421fc26226e751118341d73914
|
|
| BLAKE2b-256 |
f69e906999d1346aa2595129835ca93a64869fc11ae93e01642353996343cc84
|
Provenance
The following attestation bundles were made for pyergon-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on richinex/pyergon
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyergon-0.3.0-py3-none-any.whl -
Subject digest:
b7b6cec0e071dc8426d13f8d796c2cd318fb33cc22233e41dcc3aedd50220268 - Sigstore transparency entry: 790518179
- Sigstore integration time:
-
Permalink:
richinex/pyergon@56c7bb58a60f1dcf8d01eed91c8300f28274ed7b -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/richinex
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@56c7bb58a60f1dcf8d01eed91c8300f28274ed7b -
Trigger Event:
release
-
Statement type: