Skip to main content

Durable execution framework for Python

Project description

Ergon - Durable Execution Framework for Python

Pure Python implementation of durable execution with Temporal-like semantics.

Features

  • Durable Steps: Automatically cached and retried on failure
  • Durable Timers: Timers survive process restarts
  • Distributed Workers: Multiple workers process flows in parallel
  • Work Stealing: Fair load distribution across workers
  • Storage Backends: SQLite and in-memory implementations
  • Pure Python: No Rust/PyO3 dependencies, just aiosqlite

Design Philosophy

Ergon follows practical software engineering principles:

  • Simple, readable code over clever abstractions
  • Clear naming without cryptic abbreviations
  • Explicit dependencies and no global state
  • Errors handled once at the appropriate level
  • Type hints and protocols for structural typing
  • Composable components with focused responsibilities

Installation

# Clone repository
git clone <repo-url>
cd pyergon

# Install dependencies with uv
uv sync

Dependencies:

  • Python 3.11+
  • aiosqlite >= 0.19.0

Quick Start

import asyncio
from dataclasses import dataclass
from pyergon import flow, flow_type, step, Executor
from pyergon.storage.sqlite import SqliteExecutionLog

@dataclass
@flow_type
class OrderProcessor:
    order_id: str
    amount: float

    @step
    async def validate(self):
        print(f"[{self.order_id}] Validating...")
        return self.amount > 0

    @step
    async def process_payment(self):
        print(f"[{self.order_id}] Processing ${self.amount}...")
        return f"payment-{self.order_id}"

    @flow
    async def run(self):
        await self.validate()
        return await self.process_payment()

async def main():
    # Setup storage
    storage = await SqliteExecutionLog.in_memory()

    # Execute workflow
    order = OrderProcessor("ORD-001", 100.0)
    executor = Executor(order, storage, "order-001")
    outcome = await executor.run(lambda o: o.run())

    print(f"Result: {outcome.result}")

    await storage.close()

asyncio.run(main())

Distributed Workers with Timers

from dataclasses import dataclass
from pyergon import flow, flow_type, step, Scheduler, Worker
from pyergon.storage.sqlite import SqliteExecutionLog
from pyergon.executor.timer import schedule_timer_named
from pyergon.core import TaskStatus

@dataclass
@flow_type
class TimedOrderProcessor:
    order_id: str

    @step
    async def wait_for_fraud_check(self):
        print(f"[{self.order_id}] Waiting for fraud check...")
        await schedule_timer_named(2.0, f"fraud-{self.order_id}")
        print(f"[{self.order_id}] Fraud check complete")

    @flow
    async def process(self):
        await self.wait_for_fraud_check()
        return "completed"

async def main():
    # Setup
    storage = SqliteExecutionLog("distributed.db")
    await storage.connect()

    scheduler = Scheduler(storage)

    # Start workers with timer processing
    worker1 = Worker(storage, "worker-1", enable_timers=True)
    await worker1.register(TimedOrderProcessor)
    handle1 = await worker1.start()

    worker2 = Worker(storage, "worker-2", enable_timers=True)
    await worker2.register(TimedOrderProcessor)
    handle2 = await worker2.start()

    # Schedule flows
    for i in range(3):
        order = TimedOrderProcessor(f"ORD-{i:03d}")
        await scheduler.schedule(order)

    # Wait for completion
    await asyncio.sleep(5)

    # Shutdown
    await handle1.shutdown()
    await handle2.shutdown()
    await storage.close()

asyncio.run(main())

Project Structure

pyergon/
├── src/ergon/              # Pure Python implementation
│   ├── core/              # Core types (Invocation, Status, Context)
│   ├── storage/           # Storage backends (SQLite, Memory, Redis)
│   ├── executor/          # Execution engine (Scheduler, Worker, Timer, Signal)
│   ├── decorators.py      # @flow, @flow_type, and @step decorators
│   └── __init__.py        # Public API
│
├── examples/               # Example workflows
│   ├── simple_timer_sqlite.py
│   ├── complex_multi_worker_load_sqlite.py
│   └── dag_limit_parallel.py
│
├── tests/                  # Test suite (60 tests, 48% coverage)
│   ├── test_basic.py
│   ├── test_properties.py
│   ├── test_durability.py
│   ├── test_concurrency.py
│   └── ...
│
└── pyproject.toml          # Project configuration

Architecture

Core Components

  1. Core Types (ergon.core)

    • Invocation: Single step execution record
    • InvocationStatus: Step state machine
    • ScheduledFlow: Distributed queue item
  2. Storage Layer (ergon.storage)

    • ExecutionLog: Abstract protocol for persistence
    • SqliteExecutionLog: SQLite backend
    • InMemoryExecutionLog: In-memory backend for testing
  3. Executor (ergon.executor)

    • Executor: Execute flows with durable context
    • Scheduler: Enqueue flows for distributed processing
    • Worker: Process flows from queue
    • schedule_timer: Durable timers
    • await_external_signal: External event coordination
  4. Decorators (ergon.decorators)

    • @flow_type: Mark workflow class
    • @flow: Mark flow entry point method
    • @step: Mark durable step method

Examples

See examples/ directory for complete examples:

  • simple_timer_sqlite.py - Durable timers with SQLite
  • complex_multi_worker_load_sqlite.py - Multi-worker stress test
  • dag_limit_parallel.py - Parallel DAG execution
PYTHONPATH=src uv run python examples/simple_timer_sqlite.py

Testing

# Run all tests (60 tests, 48% coverage)
uv run pytest tests/

# Run specific test file
uv run pytest tests/test_durability.py -v

# Type checking
mypy src/ergon/

# Linting
ruff check src/ergon/

Development

Code Quality

  • Comprehensive docstrings
  • Type hints throughout
  • Protocol-based interfaces
  • Example usage in every module
  • Test coverage with property-based testing

Project Stats

  • 2,400+ lines of pure Python
  • 60 tests passing across 8 test files
  • 48% code coverage
  • Zero external compiled dependencies

License

MIT / Apache 2.0 (dual license)

Credits

Inspired by:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyergon-0.2.0.tar.gz (116.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyergon-0.2.0-py3-none-any.whl (113.9 kB view details)

Uploaded Python 3

File details

Details for the file pyergon-0.2.0.tar.gz.

File metadata

  • Download URL: pyergon-0.2.0.tar.gz
  • Upload date:
  • Size: 116.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyergon-0.2.0.tar.gz
Algorithm Hash digest
SHA256 19574944afe07aeb18519b324e17c1ffe810609b9f0ba6730c13bb7ac76d1f8d
MD5 1752da13ec57d6ccfc612630d2b08f83
BLAKE2b-256 7d722acf791df2562d836159efd8e03bf0a3175dd71d7d8dfaeeda4227026adb

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyergon-0.2.0.tar.gz:

Publisher: publish.yml on richinex/pyergon

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyergon-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyergon-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 113.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyergon-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 18c842a659ce866babfebd1d94021d77a36440aaf544e422f1d4e791673389f4
MD5 5a73eab869849ae47757f319ebe2b3d0
BLAKE2b-256 41d4b4a856d205ff935e1e007bb1e6b95068369a03cb1cf33e4d29e916dadadb

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyergon-0.2.0-py3-none-any.whl:

Publisher: publish.yml on richinex/pyergon

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page