Skip to main content

async_orch is a Python library designed for building and managing asynchronous task pipelines.

Project description

async_orch: Asynchronous Task Pipelines

PyPI version codecov License: MIT

async_orch is a Python library designed for building and managing asynchronous task pipelines. It provides a flexible framework to define individual tasks, sequence them, run them in parallel with concurrency controls, and apply resilience patterns like circuit breakers. The library is built on asyncio (Python 3.11+) to handle non-blocking operations efficiently.

Key Features

  • Asynchronous by Design: Leverages asyncio for efficient I/O-bound operations and concurrency.
  • Composability: Build complex workflows by nesting Sequence, Parallel, and CircuitDefinition components.
  • State Management & Observability: A global EventBus allows for monitoring task states and circuit breaker events.
  • Resilience Patterns: Includes CircuitDefinition for circuit breaker functionality (using aiobreaker).
  • Extensible Policies: Tasks can be wrapped with retry mechanisms or other strategies (e.g., using backoff as shown in examples).

Installation

You can install async_orch using pip:

pip install async_orch

Requires Python 3.11 or newer.

Quick Start

Here's a simple example of defining and running a sequence of tasks:

import asyncio
from async_orch import Sequence, run # TaskRunner is now internal

# Define some tasks
async def fetch_user_data(user_id: int) -> dict:
    print(f"Fetching data for user {user_id}...")
    await asyncio.sleep(0.1) # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}", "status": "active"}

def process_and_notify(data: dict) -> dict: # Combined for simplicity in example
    print(f"Processing data for {data['name']}...")
    data["processed"] = True
    print(f"Notifying {data['name']} about processing completion...")
    # await asyncio.sleep(0.05) # If this part were async
    print(f"Notification sent for {data['name']}.")
    return data # Return the processed data

# Create a pipeline using the new API
user_pipeline = Sequence(
    lambda: fetch_user_data(101), # Use lambda to pass arguments to the first task
    process_and_notify,           # Receives output from fetch_user_data
    name="UserProfileWorkflow"    # Optional name for the sequence
)

async def main():
    print("Starting user pipeline...")
    # The 'run' function executes the sequence and returns the result of the last task.
    final_data = await run(user_pipeline)
    print("\nPipeline completed.")
    if final_data:
        print(f"Final processed data from pipeline: {final_data}")

if __name__ == "__main__":
    asyncio.run(main())

Documentation

For detailed information on all components, advanced usage, and more examples, please refer to the async_orch Wiki.

The wiki covers:

  • Core Components: Sequence, Parallel, CircuitDefinition, EventBus, TaskState.
  • Advanced Patterns: Implementing retries, using the event bus for logging.
  • Codebase structure and how to run the bundled examples.

Examples

The examples/ directory contains scripts demonstrating various features of async_orch. You can run them directly:

Contributing

Contributions are welcome! Please feel free to open an issue or submit a pull request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_orch-0.1.1.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_orch-0.1.1-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file async_orch-0.1.1.tar.gz.

File metadata

  • Download URL: async_orch-0.1.1.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for async_orch-0.1.1.tar.gz
Algorithm Hash digest
SHA256 8e3c20427449eff7665f06b441bcb54ff976189b838da5f95fe83e78d7ae2720
MD5 f87fb0aa8f8ff700bb9988d3451e57b0
BLAKE2b-256 02a671e934b321315ee4169bef89bf60d3a07f55c39bf5bc5c9f651c191df28a

See more details on using hashes here.

File details

Details for the file async_orch-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: async_orch-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for async_orch-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 464e93a36bbbacfd42e1c961fbcbcca56a4b843b0f4e39b6dfdea3cfc83451a8
MD5 face9d2e3061f52a3b87dcb89bd1bb3e
BLAKE2b-256 6adf775657974b222425e01bfccd557a733c92a0da249e631c96d863c0ffb582

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page