Skip to main content

An asynchronous task manager that handles concurrent task execution with automatic retries, status monitoring, and graceful cancellation.

Project description

AsyncWeaver

AsyncWeaver is a Python library for managing asynchronous tasks with retry capabilities and status monitoring. It provides a way to queue, execute, and track multiple concurrent tasks while handling failures and retries automatically.

Features

  • Concurrent task execution with configurable number of workers
  • Automatic task retries with customizable delay
  • Task status monitoring
  • Support for task cancellation (individual or bulk)
  • Callback support for task completion, failure, and cancellation
  • Custom task ID generation
  • Type hints for better IDE support

Installation

pip install async-weave

Basic Usage

import asyncio
from async_weave import AsyncWeaver

async def example():
    # Create a manager with 5 workers, max 3 retries, and 5 minute retry delay
    async with AsyncWeaver(worker_count=5, max_retries=3, retry_delay=300) as manager:
        # Define an async task
        async def my_task(param):
            # Your task logic here
            return f"Processed {param}"
        
        # Add tasks to the queue
        task_id = await manager.add_task(my_task, "data")
        
        # Wait for result
        result = await manager.get_task_result(task_id)
        print(f"Task completed with result: {result}")

# Run the example
asyncio.run(example())

Advanced Features

Custom Task IDs

task_id = await manager.add_task(my_task, task_id="custom_id_123")

Callbacks

By default the task_id is a str, but if you set a custom task_id or a custom task id generator it will be the type you specified.

async def example_with_callbacks():
    async with AsyncWeaver() as manager:
        def on_complete(task_id: str, result):
            print(f"Task completed with: {result}")
            
        def on_error(task_id: str, error):
            print(f"Task failed with: {error}")
            
        def on_cancel(task_id: str):
            print("Task was cancelled")
            
        await manager.add_task(
            my_task,
            on_complete=on_complete,
            on_error=on_error,
            on_cancel=on_cancel
        )

Task Cancellation

# Cancel a specific task
await manager.cancel_task(task_id)

# Cancel all running tasks
await manager.cancel_all_tasks()

Custom ID Generator

def custom_id_generator():
    return f"task_{time.time()}"

manager = AsyncWeaver(id_generator=custom_id_generator)

Configuration Options

  • worker_count: Number of concurrent workers (default: 10)
  • max_retries: Maximum number of retry attempts for failed tasks (default: 1)
  • retry_delay: Time to wait before retrying failed tasks in seconds (default: 300)
  • monitor_interval: Interval for monitoring task status in seconds (default: 60)
  • id_generator: Custom function for generating task IDs

Task States

Tasks can be in the following states:

  • PENDING: Task is queued but not yet started
  • IN_PROGRESS: Task is currently running
  • COMPLETED: Task finished successfully
  • FAILED: Task failed and exceeded retry attempts
  • CANCELLED: Task was cancelled

Waiting for Task Completion

To wait for all tasks to complete, use the wait_for_completion method:

# Wait indefinitely for all tasks to complete
await manager.wait_for_completion()

# Wait with a timeout
await manager.wait_for_completion(timeout=60)  # Wait up to 60 seconds

For individual tasks, you can wait for specific results:

# Wait for a specific task to complete
result = await manager.get_task_result(task_id, timeout=30)

Note: When shutting down with shutdown(), you can control whether to wait for completion:

# Wait for completion during shutdown
await manager.shutdown(timeout=60, cancel_running=False)

# Cancel running tasks during shutdown
await manager.shutdown(cancel_running=True)

## Testing

The project includes a comprehensive test suite. To run the tests:

```bash
pytest tests/

License

[Insert your license here]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_weave-0.1.0.tar.gz (10.2 kB view details)

Uploaded Source

Built Distribution

async_weave-0.1.0-py3-none-any.whl (6.3 kB view details)

Uploaded Python 3

File details

Details for the file async_weave-0.1.0.tar.gz.

File metadata

  • Download URL: async_weave-0.1.0.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.13

File hashes

Hashes for async_weave-0.1.0.tar.gz
Algorithm Hash digest
SHA256 75444d03f132e67e2d41ad9ffcbc3c3c92383fb2f3900d13fd18be1c2ef14813
MD5 1950ead8fa34f1e3614cd3f5b38086db
BLAKE2b-256 a3b4d08d4a6a02261fe5028786f115ca2dca9d5ed6cc5c3708fc76903e71ea84

See more details on using hashes here.

File details

Details for the file async_weave-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for async_weave-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 daec66ff6c73e4a6b1b0e10741345343a25d4895a0e7cd2924373fb3b649b3f2
MD5 18c1856375548557bc1525e0c25233fb
BLAKE2b-256 d4cc685d6724be359a2c25ab0954e4417f99ba0452c78d387fd35c7c823cc7f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page