async_orch is a Python library designed for building and managing asynchronous task pipelines.
Project description
async_orch: Asynchronous Task Pipelines
async_orch is a Python library designed for building and managing asynchronous task pipelines. It provides a flexible framework to define individual tasks, sequence them, run them in parallel with concurrency controls, and apply resilience patterns like circuit breakers. The library is built on asyncio (Python 3.11+) to handle non-blocking operations efficiently.
Key Features
- Asynchronous by Design: Leverages
asynciofor efficient I/O-bound operations and concurrency. - Composability: Build complex workflows by nesting
Task,Sequence,Parallel, andCircuitGroupcomponents. - State Management & Observability: A global
EventBusallows for monitoring task states and circuit breaker events. - Resilience Patterns: Includes
CircuitGroupfor circuit breaker functionality (usingaiobreaker). - Extensible Policies: Customize task execution with retry mechanisms or other strategies (e.g., using
backoffas shown in examples).
Installation
You can install async_orch using pip:
pip install async_orch
Requires Python 3.11 or newer.
Quick Start
Here's a simple example of defining and running a sequence of tasks:
import asyncio
from async_orch import Task, Sequence, run
# Define some tasks
async def fetch_user_data(user_id: int) -> dict:
print(f"Fetching data for user {user_id}...")
await asyncio.sleep(0.1) # Simulate I/O
return {"id": user_id, "name": f"User {user_id}", "status": "active"}
def process_user_data(data: dict) -> dict:
print(f"Processing data for {data['name']}...")
data["processed"] = True
return data
async def notify_user(data: dict):
print(f"Notifying {data['name']} about processing completion...")
await asyncio.sleep(0.05) # Simulate I/O
print(f"Notification sent for {data['name']}.")
# Create a pipeline
user_pipeline = Sequence(
Task(fetch_user_data, 101, name="FetchUserData"),
Task(process_user_data, name="ProcessUserData"), # Takes output from previous task
Task(notify_user, name="NotifyUser") # Takes output from previous task
)
async def main():
print("Starting user pipeline...")
results = await run(user_pipeline)
print("\nPipeline completed.")
if results and len(results) > 1:
final_data = results[1] # Result from process_user_data
print(f"Final processed data from pipeline: {final_data}")
if __name__ == "__main__":
asyncio.run(main())
Documentation
For detailed information on all components, advanced usage, and more examples, please refer to the async_orch Wiki.
The wiki covers:
- Core Components:
Task,EventBus,Sequence,Parallel,CircuitGroup,TaskState. - Advanced Patterns: Implementing retries, using the event bus for logging.
- Codebase structure and how to run the bundled examples.
Examples
The examples/ directory contains scripts demonstrating various features of async_orch. You can run them directly:
Contributing
Contributions are welcome! Please feel free to open an issue or submit a pull request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_orch-0.1.0.tar.gz.
File metadata
- Download URL: async_orch-0.1.0.tar.gz
- Upload date:
- Size: 9.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8ce5ce52b7694b1ca8f3c2aae603c9d347d397fb9ecdaecd9860732b6acb58b
|
|
| MD5 |
2dd943ae690d256576b7099a6dc895f0
|
|
| BLAKE2b-256 |
b0f0f1a78b32f8ce75adad1c45e01c13284f16e5fd146c910737479a38b39311
|
File details
Details for the file async_orch-0.1.0-py3-none-any.whl.
File metadata
- Download URL: async_orch-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
99fc2f598d26a1a9d1665cb31567083974a05eb6c5d8f081fdf830167b1e0111
|
|
| MD5 |
e7796e17ac12e033d7c250bd5620ed38
|
|
| BLAKE2b-256 |
5a46d126fc46694c024a4e1e9ea18ee4e5dd505a6762d8b57c1b28245b407677
|