Community-maintained durable execution local and cloud test runner for Python
Project description
Async Durable Execution Runner for Python
Table of Contents
Installation
pip install async-durable-execution-runner
Overview
Use Async Durable Execution Runner for Python to test your Python durable functions locally and in cloud environments.
This package is distributed from a community-maintained fork of the original Apache-2.0 licensed AWS project and continues under Apache License 2.0 with upstream notices preserved.
The test framework contains a local runner, so you can run and test your durable function locally before you deploy it.
Quick Start
A durable function under test
import asyncio
from typing import Any
from async_durable_execution import (
DurableContext,
durable_execution,
durable_step,
durable_with_child_context,
)
from async_durable_execution.config import Duration
@durable_step
async def one(a: int, b: int) -> str:
await asyncio.sleep(0)
return f"{a} {b}"
@durable_step
async def two_1(a: int, b: int) -> str:
await asyncio.sleep(0)
return f"{a} {b}"
@durable_step
async def two_2(a: int, b: int) -> str:
await asyncio.sleep(0)
return f"{b} {a}"
@durable_with_child_context
async def two(ctx: DurableContext, a: int, b: int) -> str:
two_1_result: str = ctx.step(two_1(a, b))
two_2_result: str = ctx.step(two_2(a, b))
return f"{two_1_result} {two_2_result}"
@durable_step
async def three(a: int, b: int) -> str:
await asyncio.sleep(0)
return f"{a} {b}"
@durable_execution
async def function_under_test(event: Any, context: DurableContext) -> list[str]:
results: list[str] = []
result_one: str = context.step(one(1, 2))
results.append(result_one)
context.wait(duration=Duration.from_seconds(1))
result_two: str = context.run_in_child_context(two(3, 4))
results.append(result_two)
result_three: str = context.step(three(5, 6))
results.append(result_three)
return results
Your test code
from async_durable_execution.execution import InvocationStatus
from async_durable_execution_runner.runner import (
ContextOperation,
DurableFunctionTestResult,
DurableFunctionTestRunner,
StepOperation,
)
def test_my_durable_functions():
with DurableFunctionTestRunner(handler=function_under_test) as runner:
result: DurableFunctionTestResult = runner.run(input="input str", timeout=10)
assert result.status is InvocationStatus.SUCCEEDED
assert result.result == '["1 2", "3 4 4 3", "5 6"]'
one_result: StepOperation = result.get_step("one")
assert one_result.result == '"1 2"'
two_result: ContextOperation = result.get_context("two")
assert two_result.result == '"3 4 4 3"'
three_result: StepOperation = result.get_step("three")
assert three_result.result == '"5 6"'
Architecture
Event Flow
- DurableTestRunner starts execution via Executor
- Executor creates Execution and schedules initial invocation
- During execution, checkpoints are processed by CheckpointProcessor
- Individual Processors transform operation updates and may trigger events
- ExecutionNotifier broadcasts events to Executor (observer)
- Executor updates Execution state based on events
- Execution completion triggers final event notifications
- DurableTestRunner run() blocks until it receives completion event, and then returns
DurableFunctionTestResult.
Major Components
Core Execution Flow
- DurableTestRunner - Main entry point that orchestrates test execution
- Executor - Manages execution lifecycle. Mutates Execution.
- Execution - Represents the state and operations of a single durable execution
Service Client Integration
- InMemoryServiceClient - Replaces AWS Lambda service client for local testing. Injected into SDK via
DurableExecutionInvocationInputWithClient
Checkpoint Processing Pipeline
- CheckpointProcessor - Orchestrates operation transformations and validation
- Individual Validators - Validate operation updates and state transitions
- Individual Processors - Transform operation updates into operations (step, wait, callback, context, execution)
Execution status changes (Observer Pattern)
- ExecutionNotifier - Notifies observers of execution events
- ExecutionObserver - Interface for receiving execution lifecycle events
- Executor implements
ExecutionObserverto handle completion events
Component Relationships
1. DurableTestRunner → Executor → Execution
- DurableTestRunner serves as the main API entry point and sets up all components
- Executor manages the execution lifecycle, handling invocations and state transitions
- Execution maintains the state of operations and completion status
2. Service Client Injection
- DurableTestRunner creates InMemoryServiceClient with CheckpointProcessor
- InProcessInvoker injects the service client into SDK via
DurableExecutionInvocationInputWithClient - When durable functions call checkpoint operations, they're intercepted by InMemoryServiceClient
- InMemoryServiceClient delegates to CheckpointProcessor for local processing
3. CheckpointProcessor → Individual Validators → Individual Processors
- CheckpointProcessor orchestrates the checkpoint processing pipeline
- Individual Validators (CheckpointValidator, TransitionsValidator, and operation-specific validators) ensure operation updates are valid
- Individual Processors (StepProcessor, WaitProcessor, etc.) transform
OperationUpdateintoOperation
4. Observer Pattern Flow
The observer pattern enables loose coupling between checkpoint processing and execution management:
- CheckpointProcessor processes operation updates
- Individual Processors detect state changes (completion, failures, timer scheduling)
- ExecutionNotifier broadcasts events to registered observers
- Executor (as ExecutionObserver) receives notifications and updates Execution state
- Execution complete_* methods finalize the execution state
Documentation
Error Handling
The testing framework implements AWS-compliant error responses that match the exact format expected by boto3 and AWS services. For detailed information about error response formats, exception types, and troubleshooting, see:
Key features:
- AWS-compliant JSON format: Matches boto3 expectations exactly
- Smithy model compliance: Field names follow AWS Smithy definitions
- HTTP status code mapping: Standard AWS service status codes
- Boto3 compatibility: Seamless integration with boto3 error handling
Developers
Please see CONTRIBUTING.md. It contains the testing guide, sample commands and instructions for how to contribute to this package.
tldr; use hatch and it will manage virtual envs and dependencies for you, so you don't have to do it manually.
License
This project is licensed under the Apache-2.0 License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_durable_execution_runner-2.0.0a1.tar.gz.
File metadata
- Download URL: async_durable_execution_runner-2.0.0a1.tar.gz
- Upload date:
- Size: 77.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f31a1032b69cf5e4939db03088ebd9d408d4c1369117ddd7af354b6f2adc73d1
|
|
| MD5 |
1fadd13b4bf0eec769d7beb65894e21a
|
|
| BLAKE2b-256 |
8ee36a76154a4f953c0028d1548dfb9a28beae858987ce02c9be23a6297f6c30
|
Provenance
The following attestation bundles were made for async_durable_execution_runner-2.0.0a1.tar.gz:
Publisher:
pypi-publish.yml on zhongkechen/async-durable-execution
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_durable_execution_runner-2.0.0a1.tar.gz -
Subject digest:
f31a1032b69cf5e4939db03088ebd9d408d4c1369117ddd7af354b6f2adc73d1 - Sigstore transparency entry: 1786500359
- Sigstore integration time:
-
Permalink:
zhongkechen/async-durable-execution@156e7413005a0ef29def592ee9cca54577bef8ce -
Branch / Tag:
refs/tags/v2.0.0a1 - Owner: https://github.com/zhongkechen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@156e7413005a0ef29def592ee9cca54577bef8ce -
Trigger Event:
release
-
Statement type:
File details
Details for the file async_durable_execution_runner-2.0.0a1-py3-none-any.whl.
File metadata
- Download URL: async_durable_execution_runner-2.0.0a1-py3-none-any.whl
- Upload date:
- Size: 104.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9e75e5a310b04f05ad25c900e7f55fdb85a138bc68e70dc7967011c96a48e013
|
|
| MD5 |
014764b780ec9fcf63b70b198cf47c19
|
|
| BLAKE2b-256 |
f240c53b5e2c2e20da5a2169965d19e554b96cc0fab2d8c80da8602034ac0c16
|
Provenance
The following attestation bundles were made for async_durable_execution_runner-2.0.0a1-py3-none-any.whl:
Publisher:
pypi-publish.yml on zhongkechen/async-durable-execution
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
async_durable_execution_runner-2.0.0a1-py3-none-any.whl -
Subject digest:
9e75e5a310b04f05ad25c900e7f55fdb85a138bc68e70dc7967011c96a48e013 - Sigstore transparency entry: 1786500397
- Sigstore integration time:
-
Permalink:
zhongkechen/async-durable-execution@156e7413005a0ef29def592ee9cca54577bef8ce -
Branch / Tag:
refs/tags/v2.0.0a1 - Owner: https://github.com/zhongkechen
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi-publish.yml@156e7413005a0ef29def592ee9cca54577bef8ce -
Trigger Event:
release
-
Statement type: