Skip to main content

An asynchronous task manager that handles concurrent task execution with automatic retries, status monitoring, and graceful cancellation.

Project description

AsyncWeaver

AsyncWeaver is a Python library for managing asynchronous tasks with retry capabilities and status monitoring. It provides a way to queue, execute, and track multiple concurrent tasks while handling failures and retries automatically.

Features

  • Concurrent task execution with configurable number of workers
  • Automatic task retries with customizable delay
  • Task status monitoring
  • Support for task cancellation (individual or bulk)
  • Callback support for task completion, failure, and cancellation
  • Custom task ID generation
  • Type hints for better IDE support

Installation

pip install async-weave

Basic Usage

import asyncio
from async_weave import AsyncWeaver

async def example():
    # Create a manager with 5 workers, max 3 retries, and 5 minute retry delay
    async with AsyncWeaver(worker_count=5, max_retries=3, retry_delay=300) as manager:
        # Define an async task
        async def my_task(param):
            # Your task logic here
            return f"Processed {param}"
        
        # Add tasks to the queue
        task_id = await manager.add_task(my_task, "data")
        
        # Wait for result
        result = await manager.get_task_result(task_id)
        print(f"Task completed with result: {result}")

# Run the example
asyncio.run(example())

Advanced Features

Custom Task IDs

task_id = await manager.add_task(my_task, task_id="custom_id_123")

Callbacks

By default the task_id is a str, but if you set a custom task_id or a custom task id generator it will be the type you specified.

async def example_with_callbacks():
    async with AsyncWeaver() as manager:
        def on_complete(task_id: str, result):
            print(f"Task completed with: {result}")
            
        def on_error(task_id: str, error):
            print(f"Task failed with: {error}")
            
        def on_cancel(task_id: str):
            print("Task was cancelled")
            
        await manager.add_task(
            my_task,
            on_complete=on_complete,
            on_error=on_error,
            on_cancel=on_cancel
        )

Task Cancellation

# Cancel a specific task
await manager.cancel_task(task_id)

# Cancel all running tasks
await manager.cancel_all_tasks()

Custom ID Generator

def custom_id_generator():
    return f"task_{time.time()}"

manager = AsyncWeaver(id_generator=custom_id_generator)

Configuration Options

  • worker_count: Number of concurrent workers (default: 10)
  • max_retries: Maximum number of retry attempts for failed tasks (default: 1)
  • retry_delay: Time to wait before retrying failed tasks in seconds (default: 300)
  • monitor_interval: Interval for monitoring task status in seconds (default: 60)
  • id_generator: Custom function for generating task IDs

Task States

Tasks can be in the following states:

  • PENDING: Task is queued but not yet started
  • IN_PROGRESS: Task is currently running
  • COMPLETED: Task finished successfully
  • FAILED: Task failed and exceeded retry attempts
  • CANCELLED: Task was cancelled

Waiting for Task Completion

To wait for all tasks to complete, use the wait_for_completion method:

# Wait indefinitely for all tasks to complete
await manager.wait_for_completion()

# Wait with a timeout
await manager.wait_for_completion(timeout=60)  # Wait up to 60 seconds

For individual tasks, you can wait for specific results:

# Wait for a specific task to complete
result = await manager.get_task_result(task_id, timeout=30)

Note: When shutting down with shutdown(), you can control whether to wait for completion:

# Wait for completion during shutdown
await manager.shutdown(timeout=60, cancel_running=False)

# Cancel running tasks during shutdown
await manager.shutdown(cancel_running=True)

## Testing

The project includes a comprehensive test suite. To run the tests:

```bash
pytest tests/

License

[Insert your license here]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_weave-0.1.1.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

async_weave-0.1.1-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file async_weave-0.1.1.tar.gz.

File metadata

  • Download URL: async_weave-0.1.1.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.13

File hashes

Hashes for async_weave-0.1.1.tar.gz
Algorithm Hash digest
SHA256 4950feb1e48ff5776636db01c1040bb6c61c0701d4d8c8fe7ce3af01a89e0f9d
MD5 b53d08155937505dced52d1281e117eb
BLAKE2b-256 ad8c601e22187e25fa2e5b6bc15816ef8ca5cdf785b4d1ba6c2f67fe4ef60494

See more details on using hashes here.

File details

Details for the file async_weave-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for async_weave-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 537697e80ea5591807e445977d3b86dc308178e2c146f866a9f45666c6a3d7c3
MD5 b72b328003efcb4b41c9ee94fb04eb1f
BLAKE2b-256 4899ac12f004cb3359593c045bc2f403a71de4937f6e256eab5f434e7923b977

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page