Skip to main content

async_orch is a Python library designed for building and managing asynchronous task pipelines.

Project description

async_orch: Asynchronous Task Pipelines

PyPI version License: MIT

async_orch is a Python library designed for building and managing asynchronous task pipelines. It provides a flexible framework to define individual tasks, sequence them, run them in parallel with concurrency controls, and apply resilience patterns like circuit breakers. The library is built on asyncio (Python 3.11+) to handle non-blocking operations efficiently.

Key Features

  • Asynchronous by Design: Leverages asyncio for efficient I/O-bound operations and concurrency.
  • Composability: Build complex workflows by nesting Task, Sequence, Parallel, and CircuitGroup components.
  • State Management & Observability: A global EventBus allows for monitoring task states and circuit breaker events.
  • Resilience Patterns: Includes CircuitGroup for circuit breaker functionality (using aiobreaker).
  • Extensible Policies: Customize task execution with retry mechanisms or other strategies (e.g., using backoff as shown in examples).

Installation

You can install async_orch using pip:

pip install async_orch

Requires Python 3.11 or newer.

Quick Start

Here's a simple example of defining and running a sequence of tasks:

import asyncio
from async_orch import Task, Sequence, run

# Define some tasks
async def fetch_user_data(user_id: int) -> dict:
    print(f"Fetching data for user {user_id}...")
    await asyncio.sleep(0.1) # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}", "status": "active"}

def process_user_data(data: dict) -> dict:
    print(f"Processing data for {data['name']}...")
    data["processed"] = True
    return data

async def notify_user(data: dict):
    print(f"Notifying {data['name']} about processing completion...")
    await asyncio.sleep(0.05) # Simulate I/O
    print(f"Notification sent for {data['name']}.")

# Create a pipeline
user_pipeline = Sequence(
    Task(fetch_user_data, 101, name="FetchUserData"),
    Task(process_user_data, name="ProcessUserData"), # Takes output from previous task
    Task(notify_user, name="NotifyUser")            # Takes output from previous task
)

async def main():
    print("Starting user pipeline...")
    results = await run(user_pipeline)
    print("\nPipeline completed.")
    if results and len(results) > 1:
        final_data = results[1] # Result from process_user_data
        print(f"Final processed data from pipeline: {final_data}")


if __name__ == "__main__":
    asyncio.run(main())

Documentation

For detailed information on all components, advanced usage, and more examples, please refer to the async_orch Wiki.

The wiki covers:

  • Core Components: Task, EventBus, Sequence, Parallel, CircuitGroup, TaskState.
  • Advanced Patterns: Implementing retries, using the event bus for logging.
  • Codebase structure and how to run the bundled examples.

Examples

The examples/ directory contains scripts demonstrating various features of async_orch. You can run them directly:

Contributing

Contributions are welcome! Please feel free to open an issue or submit a pull request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_orch-0.1.0.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_orch-0.1.0-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file async_orch-0.1.0.tar.gz.

File metadata

  • Download URL: async_orch-0.1.0.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for async_orch-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d8ce5ce52b7694b1ca8f3c2aae603c9d347d397fb9ecdaecd9860732b6acb58b
MD5 2dd943ae690d256576b7099a6dc895f0
BLAKE2b-256 b0f0f1a78b32f8ce75adad1c45e01c13284f16e5fd146c910737479a38b39311

See more details on using hashes here.

File details

Details for the file async_orch-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: async_orch-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for async_orch-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 99fc2f598d26a1a9d1665cb31567083974a05eb6c5d8f081fdf830167b1e0111
MD5 e7796e17ac12e033d7c250bd5620ed38
BLAKE2b-256 5a46d126fc46694c024a4e1e9ea18ee4e5dd505a6762d8b57c1b28245b407677

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page