Skip to main content

Community-maintained durable execution local and cloud test runner for Python

Project description

Async Durable Execution Runner for Python

PyPI - Version PyPI - Python Version

OpenSSF Scorecard


Table of Contents

Installation

pip install async-durable-execution-runner

Overview

Use Async Durable Execution Runner for Python to test your Python durable functions locally and in cloud environments.

This package is distributed from a community-maintained fork of the original Apache-2.0 licensed AWS project and continues under Apache License 2.0 with upstream notices preserved.

The test framework contains a local runner, so you can run and test your durable function locally before you deploy it.

Quick Start

A durable function under test

import asyncio
from typing import Any

from async_durable_execution import (
    DurableContext,
    durable_execution,
    durable_step,
    durable_with_child_context,
)
from async_durable_execution.config import Duration


@durable_step
async def one(a: int, b: int) -> str:
    await asyncio.sleep(0)
    return f"{a} {b}"


@durable_step
async def two_1(a: int, b: int) -> str:
    await asyncio.sleep(0)
    return f"{a} {b}"


@durable_step
async def two_2(a: int, b: int) -> str:
    await asyncio.sleep(0)
    return f"{b} {a}"


@durable_with_child_context
async def two(ctx: DurableContext, a: int, b: int) -> str:
    two_1_result: str = ctx.step(two_1(a, b))
    two_2_result: str = ctx.step(two_2(a, b))
    return f"{two_1_result} {two_2_result}"


@durable_step
async def three(a: int, b: int) -> str:
    await asyncio.sleep(0)
    return f"{a} {b}"


@durable_execution
async def function_under_test(event: Any, context: DurableContext) -> list[str]:
    results: list[str] = []

    result_one: str = context.step(one(1, 2))
    results.append(result_one)

    context.wait(duration=Duration.from_seconds(1))

    result_two: str = context.run_in_child_context(two(3, 4))
    results.append(result_two)

    result_three: str = context.step(three(5, 6))
    results.append(result_three)

    return results

Your test code

from async_durable_execution.execution import InvocationStatus
from async_durable_execution_runner.runner import (
    ContextOperation,
    DurableFunctionTestResult,
    DurableFunctionTestRunner,
    StepOperation,
)


def test_my_durable_functions():
    with DurableFunctionTestRunner(handler=function_under_test) as runner:
        result: DurableFunctionTestResult = runner.run(input="input str", timeout=10)

    assert result.status is InvocationStatus.SUCCEEDED
    assert result.result == '["1 2", "3 4 4 3", "5 6"]'

    one_result: StepOperation = result.get_step("one")
    assert one_result.result == '"1 2"'

    two_result: ContextOperation = result.get_context("two")
    assert two_result.result == '"3 4 4 3"'

    three_result: StepOperation = result.get_step("three")
    assert three_result.result == '"5 6"'

Architecture

Durable Functions Python Test Framework Architecture

Event Flow

Event Flow Sequence Diagram

  1. DurableTestRunner starts execution via Executor
  2. Executor creates Execution and schedules initial invocation
  3. During execution, checkpoints are processed by CheckpointProcessor
  4. Individual Processors transform operation updates and may trigger events
  5. ExecutionNotifier broadcasts events to Executor (observer)
  6. Executor updates Execution state based on events
  7. Execution completion triggers final event notifications
  8. DurableTestRunner run() blocks until it receives completion event, and then returns DurableFunctionTestResult.

Major Components

Core Execution Flow

  • DurableTestRunner - Main entry point that orchestrates test execution
  • Executor - Manages execution lifecycle. Mutates Execution.
  • Execution - Represents the state and operations of a single durable execution

Service Client Integration

  • InMemoryServiceClient - Replaces AWS Lambda service client for local testing. Injected into SDK via DurableExecutionInvocationInputWithClient

Checkpoint Processing Pipeline

  • CheckpointProcessor - Orchestrates operation transformations and validation
  • Individual Validators - Validate operation updates and state transitions
  • Individual Processors - Transform operation updates into operations (step, wait, callback, context, execution)

Execution status changes (Observer Pattern)

  • ExecutionNotifier - Notifies observers of execution events
  • ExecutionObserver - Interface for receiving execution lifecycle events
  • Executor implements ExecutionObserver to handle completion events

Component Relationships

1. DurableTestRunner → Executor → Execution

  • DurableTestRunner serves as the main API entry point and sets up all components
  • Executor manages the execution lifecycle, handling invocations and state transitions
  • Execution maintains the state of operations and completion status

2. Service Client Injection

  • DurableTestRunner creates InMemoryServiceClient with CheckpointProcessor
  • InProcessInvoker injects the service client into SDK via DurableExecutionInvocationInputWithClient
  • When durable functions call checkpoint operations, they're intercepted by InMemoryServiceClient
  • InMemoryServiceClient delegates to CheckpointProcessor for local processing

3. CheckpointProcessor → Individual Validators → Individual Processors

  • CheckpointProcessor orchestrates the checkpoint processing pipeline
  • Individual Validators (CheckpointValidator, TransitionsValidator, and operation-specific validators) ensure operation updates are valid
  • Individual Processors (StepProcessor, WaitProcessor, etc.) transform OperationUpdate into Operation

4. Observer Pattern Flow

The observer pattern enables loose coupling between checkpoint processing and execution management:

  1. CheckpointProcessor processes operation updates
  2. Individual Processors detect state changes (completion, failures, timer scheduling)
  3. ExecutionNotifier broadcasts events to registered observers
  4. Executor (as ExecutionObserver) receives notifications and updates Execution state
  5. Execution complete_* methods finalize the execution state

Documentation

Error Handling

The testing framework implements AWS-compliant error responses that match the exact format expected by boto3 and AWS services. For detailed information about error response formats, exception types, and troubleshooting, see:

Key features:

  • AWS-compliant JSON format: Matches boto3 expectations exactly
  • Smithy model compliance: Field names follow AWS Smithy definitions
  • HTTP status code mapping: Standard AWS service status codes
  • Boto3 compatibility: Seamless integration with boto3 error handling

Developers

Please see CONTRIBUTING.md. It contains the testing guide, sample commands and instructions for how to contribute to this package.

tldr; use hatch and it will manage virtual envs and dependencies for you, so you don't have to do it manually.

License

This project is licensed under the Apache-2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_durable_execution_runner-2.0.0a1.tar.gz (77.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_durable_execution_runner-2.0.0a1-py3-none-any.whl (104.7 kB view details)

Uploaded Python 3

File details

Details for the file async_durable_execution_runner-2.0.0a1.tar.gz.

File metadata

File hashes

Hashes for async_durable_execution_runner-2.0.0a1.tar.gz
Algorithm Hash digest
SHA256 f31a1032b69cf5e4939db03088ebd9d408d4c1369117ddd7af354b6f2adc73d1
MD5 1fadd13b4bf0eec769d7beb65894e21a
BLAKE2b-256 8ee36a76154a4f953c0028d1548dfb9a28beae858987ce02c9be23a6297f6c30

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_durable_execution_runner-2.0.0a1.tar.gz:

Publisher: pypi-publish.yml on zhongkechen/async-durable-execution

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file async_durable_execution_runner-2.0.0a1-py3-none-any.whl.

File metadata

File hashes

Hashes for async_durable_execution_runner-2.0.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 9e75e5a310b04f05ad25c900e7f55fdb85a138bc68e70dc7967011c96a48e013
MD5 014764b780ec9fcf63b70b198cf47c19
BLAKE2b-256 f240c53b5e2c2e20da5a2169965d19e554b96cc0fab2d8c80da8602034ac0c16

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_durable_execution_runner-2.0.0a1-py3-none-any.whl:

Publisher: pypi-publish.yml on zhongkechen/async-durable-execution

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page