Skip to main content

Worker pool in Python with size and rate bounding options

Project description

werkpool

werkpool is a Python library for managing asynchronous task execution with fine-grained control over concurrency and rate limiting. Built on asyncio, it provides a flexible worker pool that can handle various workload patterns efficiently.

Features

  • Concurrency Control: Limit the number of tasks executing simultaneously
  • Rate Limiting: Control task execution rate (tasks per second)
  • Hybrid Mode: Combine both concurrency and rate limits for precise control
  • Task Retries: Built-in retry logic with configurable backoff strategies
  • Timeout Support: Set per-task execution timeouts
  • Context Manager: Clean resource management with async with syntax

Installation

pip install werkpool

Or with Poetry:

poetry add werkpool

Quick Start

import asyncio
from werkpool import WorkerPool

async def fetch_data(url):
    # Your async task here
    return f"Data from {url}"

async def main():
    pool = WorkerPool(size=10, rate=100)
    
    # Schedule tasks
    future = pool.run(lambda: fetch_data("https://example.com"))
    result = await future
    
    await pool.shutdown()

asyncio.run(main())

Usage Examples

1. Rate-Limited Pool (High Throughput)

Use Case: Making API requests with a rate limit of 100 requests per second. Ideal for services with generous rate limits where you want to maximize throughput without hitting rate caps.

import asyncio
from werkpool import worker_pool

async def call_api(item_id):
    # Simulated API call
    await asyncio.sleep(0.01)
    return f"Processed {item_id}"

async def main():
    async with worker_pool(rate=100) as pool:  # 100 tasks per second
        tasks = [pool.run(lambda i=i: call_api(i)) for i in range(1000)]
        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())

2. Rate-Limited Pool (Low Throughput)

Use Case: Calling a rate-restricted API that allows only 1 request every 20 seconds (3 per minute). Perfect for free-tier APIs or services with strict rate limits.

import asyncio
from werkpool import WorkerPool

async def check_status(resource_id):
    # Expensive or rate-limited operation
    await asyncio.sleep(0.1)
    return f"Status of {resource_id}"

async def main():
    pool = WorkerPool(rate=0.05)  # 0.05 per second = 1 request per 20 seconds
    
    futures = []
    for i in range(10):
        future = pool.run(lambda i=i: check_status(i))
        futures.append(future)
    
    results = await asyncio.gather(*futures)
    print(f"All statuses checked: {len(results)} items")
    
    await pool.shutdown()

asyncio.run(main())

3. Worker-Limited Pool

Use Case: Preventing resource exhaustion by limiting concurrent connections. Ideal for database connections, file operations, or memory-intensive tasks where too many concurrent operations could overwhelm system resources.

import asyncio
from werkpool import worker_pool

async def process_file(filename):
    # Memory or I/O intensive operation
    await asyncio.sleep(0.5)
    return f"Processed {filename}"

async def main():
    # Maximum 5 concurrent tasks at any time
    async with worker_pool(size=5) as pool:
        files = [f"file_{i}.txt" for i in range(50)]
        
        tasks = [pool.run(lambda f=f: process_file(f)) for f in files]
        results = await asyncio.gather(*tasks)
        
        print(f"Processed {len(results)} files")

asyncio.run(main())

4. Combined Rate and Worker Limiting

Use Case: Scraping websites where you need to respect both the server's rate limit (10 requests/second) AND limit concurrent connections (5 simultaneous) to be a good citizen and avoid overwhelming the target server.

Rationale: This provides the most precise control. The size parameter ensures you don't open too many connections at once (which could trigger DDoS protection), while rate ensures you don't exceed the API's rate limit even if workers complete quickly.

import asyncio
from werkpool import WorkerPool

async def scrape_page(url):
    # Simulated web scraping
    await asyncio.sleep(0.2)
    return f"Content from {url}"

async def main():
    # Max 5 concurrent requests, but only 10 requests per second
    pool = WorkerPool(size=5, rate=10)
    
    urls = [f"https://example.com/page/{i}" for i in range(100)]
    
    futures = [pool.run(lambda u=u: scrape_page(u)) for u in urls]
    results = await asyncio.gather(*futures)
    
    print(f"Scraped {len(results)} pages successfully")
    
    await pool.shutdown()

asyncio.run(main())

5. Advanced: Retries and Timeouts

import asyncio
from werkpool import WorkerPool

async def unreliable_api_call(endpoint):
    # Might fail occasionally
    import random
    if random.random() < 0.3:
        raise ConnectionError("Network error")
    await asyncio.sleep(0.1)
    return f"Data from {endpoint}"

async def main():
    pool = WorkerPool(size=10, rate=50)
    
    future = pool.run(
        lambda: unreliable_api_call("/data"),
        timeout=5,  # Max 5 seconds per attempt
        retries=3,  # Retry up to 3 times
        retryable_exceptions=[ConnectionError, TimeoutError]
    )
    
    try:
        result = await future
        print(f"Success: {result}")
    except Exception as e:
        print(f"Failed after retries: {e}")
    
    await pool.shutdown()

asyncio.run(main())

API Reference

WorkerPool

WorkerPool(size: int | None = None, rate: float | None = None)
  • size: Maximum number of concurrent workers (None = unlimited)
  • rate: Maximum executions per second (None = unlimited)

Methods

run() - Schedule a task for execution

pool.run(
    task: Callable[[], Awaitable[T]],
    timeout: int | None = None,
    retries: int = 0,
    retryable_exceptions: List[type[Exception]] = [Exception],
    backoff: Callable[[int], float] = lambda attempts: 2**attempts + random.uniform(0, 1)
) -> asyncio.Future[T]

shutdown() - Wait for all tasks to complete

await pool.shutdown()

kill() - Cancel all pending tasks immediately

await pool.kill()

worker_pool (Context Manager)

async with worker_pool(size=10, rate=100) as pool:
    # Use pool
    pass
# Automatically calls shutdown

When to Use Each Mode

Mode Parameters Use Case
Rate-Limited Only rate=N API rate limits, throttling requests to external services
Worker-Limited Only size=N Resource constraints (DB connections, memory), CPU-bound tasks
Combined size=N, rate=M Web scraping, respecting both connection and rate limits

Contributing

Found a bug? Have a feature request? Please open an issue on GitHub!

We welcome:

  • 🐛 Bug reports
  • 💡 Feature requests
  • 📖 Documentation improvements
  • 🔧 Pull requests

License

MIT License - see LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

werkpool-0.1.4.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

werkpool-0.1.4-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file werkpool-0.1.4.tar.gz.

File metadata

  • Download URL: werkpool-0.1.4.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit

File hashes

Hashes for werkpool-0.1.4.tar.gz
Algorithm Hash digest
SHA256 02fab2a1505fda51a8a7a71e4ae9aa552f62e60e1485becc26bdfc895ecc9038
MD5 cadc2a42b6d816daee60cbba902c1341
BLAKE2b-256 29cfb449b8be27488c78a66fda93167a533d2bfee712309762c98a9378fcc221

See more details on using hashes here.

File details

Details for the file werkpool-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: werkpool-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 6.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit

File hashes

Hashes for werkpool-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 4d36926281124e74a0c78f8dccb9e33f28f8aeb7a22acc4d7e24401b7c13b47d
MD5 fec2b719569443f84bdc82bf261db779
BLAKE2b-256 af0d212b946cf5c66588aa71f623fd156148d29cf150e810e7b1b7e59c17a5b7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page