Skip to main content

Worker pool in Python with size and rate bounding options

Project description

werkpool

werkpool is a Python library for managing asynchronous task execution with fine-grained control over concurrency and rate limiting. Built on asyncio, it provides a flexible worker pool that can handle various workload patterns efficiently.

Features

  • Concurrency Control: Limit the number of tasks executing simultaneously
  • Rate Limiting: Control task execution rate (tasks per second)
  • Hybrid Mode: Combine both concurrency and rate limits
  • Task Retries: Built-in retry logic with configurable backoff strategies
  • Timeout Support: Set per-task execution timeouts
  • Context Manager: Clean resource management with async with syntax

Installation

pip install werkpool

Quick Start

import asyncio
from werkpool import WorkerPool

async def fetch_data(url):
    # Your async task here
    return f"Data from {url}"

async def main():
    pool = WorkerPool(size=10, rate=100)
    
    # Schedule tasks
    future = pool.run(lambda: fetch_data("https://example.com"))
    result = await future
    
    await pool.shutdown()

asyncio.run(main())

Usage Examples

1. Rate-Limited (High Throughput)

Making requests with a rate limit of 100 requests per second.

import asyncio
from werkpool import worker_pool

async def call_api(item_id):
    # Simulated API call
    await asyncio.sleep(0.01)
    return f"Processed {item_id}"

async def main():
    async with worker_pool(rate=100) as pool:  # 100 tasks per second
        tasks = [pool.run(lambda i=i: call_api(i)) for i in range(1000)]
        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())

2. Rate-Limited Pool (Low Throughput)

Calling a rate-restricted API allowing less than 1 request every second.

import asyncio
from werkpool import WorkerPool

async def check_status(resource_id):
    # Expensive or rate-limited operation
    await asyncio.sleep(0.1)
    return f"Status of {resource_id}"

async def main():
    pool = WorkerPool(rate=0.2)  # 0.2 per second = 1 request per 5 seconds
    
    futures = []
    for i in range(10):
        future = pool.run(lambda i=i: check_status(i))
        futures.append(future)
    
    results = await asyncio.gather(*futures)
    print(f"All statuses checked: {len(results)} items")
    
    await pool.shutdown()

asyncio.run(main())

3. Worker-Limited Pool

Preventing resource exhaustion by limiting concurrent workers.

import asyncio
from werkpool import worker_pool

async def process_file(filename):
    # Memory or I/O intensive operation
    await asyncio.sleep(0.5)
    return f"Processed {filename}"

async def main():
    # Maximum 5 concurrent tasks at any time
    async with worker_pool(size=5) as pool:
        files = [f"file_{i}.txt" for i in range(50)]
        
        tasks = [pool.run(lambda f=f: process_file(f)) for f in files]
        results = await asyncio.gather(*tasks)
        
        print(f"Processed {len(results)} files")

asyncio.run(main())

4. Combined Rate and Worker Limiting

In some situations like web scraping, you may need to respect both rate and concurrency limitationsss.

import asyncio
from werkpool import WorkerPool

async def scrape_page(url):
    # Simulated web scraping
    await asyncio.sleep(0.2)
    return f"Content from {url}"

async def main():
    # Max 5 concurrent requests, but only 10 requests per second
    pool = WorkerPool(size=5, rate=10)
    
    urls = [f"https://example.com/page/{i}" for i in range(100)]
    
    futures = [pool.run(lambda u=u: scrape_page(u)) for u in urls]
    results = await asyncio.gather(*futures)
    
    print(f"Scraped {len(results)} pages successfully")
    
    await pool.shutdown()

asyncio.run(main())

5. Advanced: Retries and Timeouts

import asyncio
from werkpool import WorkerPool

async def unreliable_api_call(endpoint):
    # Might fail occasionally
    import random
    if random.random() < 0.3:
        raise ConnectionError("Network error")
    await asyncio.sleep(0.1)
    return f"Data from {endpoint}"

async def main():
    pool = WorkerPool(size=10, rate=50)
    
    future = pool.run(
        lambda: unreliable_api_call("/data"),
        timeout=5,  # Max 5 seconds per attempt
        retries=3,  # Retry up to 3 times
        retryable_exceptions=[ConnectionError, TimeoutError]
    )
    
    try:
        result = await future
        print(f"Success: {result}")
    except Exception as e:
        print(f"Failed after retries: {e}")
    
    await pool.shutdown()

asyncio.run(main())

API Reference

WorkerPool

WorkerPool(size: int | None = None, rate: float | None = None)
  • size: Maximum number of concurrent workers (None = unlimited)
  • rate: Maximum executions per second (None = unlimited)

Methods

run() - Schedule a task for execution

pool.run(
    task: Callable[[], Awaitable[T]],
    timeout: int | None = None,
    retries: int = 0,
    retryable_exceptions: List[type[Exception]] = [Exception],
    backoff: Callable[[int], float] = lambda attempts: 2**attempts + random.uniform(0, 1)
) -> asyncio.Future[T]

shutdown() - Wait for all tasks to complete

await pool.shutdown()

kill() - Cancel all pending tasks immediately

await pool.kill()

worker_pool (Context Manager)

async with worker_pool(size=10, rate=100) as pool:
    # Use pool
    pass
# Automatically calls shutdown

Contributing

Found a bug? Have a feature request? Please open an issue on GitHub!

We welcome:

  • 🐛 Bug reports
  • 💡 Feature requests
  • 📖 Documentation improvements
  • 🔧 Pull requests

License

MIT License - see LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

werkpool-0.2.0.tar.gz (5.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

werkpool-0.2.0-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file werkpool-0.2.0.tar.gz.

File metadata

  • Download URL: werkpool-0.2.0.tar.gz
  • Upload date:
  • Size: 5.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit

File hashes

Hashes for werkpool-0.2.0.tar.gz
Algorithm Hash digest
SHA256 2f5372886a2795f8dd1fc5b0f70d28c74b7db3bf1c502a0f8dd32ef627334932
MD5 1a516a5a5042e8f1ab4d8cf088c19f15
BLAKE2b-256 8fb3f52c18b18aea65b385f146c42b95feb3ea390535eb40ffc68d8b2d497c89

See more details on using hashes here.

File details

Details for the file werkpool-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: werkpool-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit

File hashes

Hashes for werkpool-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e640188db8b8a7d8705467182ad0b6631c9361de970b2793697a60c2a2ad720f
MD5 5a4c59d8d2b25bc270389c1aa4da7a80
BLAKE2b-256 5272fe13ab9495b6351c356374fed13c6bc5d3b32e9ee54f9905cc1d2fb67107

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page