Worker pool in Python with size and rate bounding options
Project description
werkpool
werkpool is a Python library for managing asynchronous task execution with fine-grained control over concurrency and rate limiting. Built on asyncio, it provides a flexible worker pool that can handle various workload patterns efficiently.
Features
- Concurrency Control: Limit the number of tasks executing simultaneously
- Rate Limiting: Control task execution rate (tasks per second)
- Hybrid Mode: Combine both concurrency and rate limits for precise control
- Task Retries: Built-in retry logic with configurable backoff strategies
- Timeout Support: Set per-task execution timeouts
- Context Manager: Clean resource management with
async withsyntax
Installation
pip install werkpool
Or with Poetry:
poetry add werkpool
Quick Start
import asyncio
from werkpool import WorkerPool
async def fetch_data(url):
# Your async task here
return f"Data from {url}"
async def main():
pool = WorkerPool(size=10, rate=100)
# Schedule tasks
future = pool.run(lambda: fetch_data("https://example.com"))
result = await future
await pool.shutdown()
asyncio.run(main())
Usage Examples
1. Rate-Limited Pool (High Throughput)
Use Case: Making API requests with a rate limit of 100 requests per second. Ideal for services with generous rate limits where you want to maximize throughput without hitting rate caps.
import asyncio
from werkpool import worker_pool
async def call_api(item_id):
# Simulated API call
await asyncio.sleep(0.01)
return f"Processed {item_id}"
async def main():
async with worker_pool(rate=100) as pool: # 100 tasks per second
tasks = [pool.run(lambda i=i: call_api(i)) for i in range(1000)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
2. Rate-Limited Pool (Low Throughput)
Use Case: Calling a rate-restricted API that allows only 1 request every 20 seconds (3 per minute). Perfect for free-tier APIs or services with strict rate limits.
import asyncio
from werkpool import WorkerPool
async def check_status(resource_id):
# Expensive or rate-limited operation
await asyncio.sleep(0.1)
return f"Status of {resource_id}"
async def main():
pool = WorkerPool(rate=0.05) # 0.05 per second = 1 request per 20 seconds
futures = []
for i in range(10):
future = pool.run(lambda i=i: check_status(i))
futures.append(future)
results = await asyncio.gather(*futures)
print(f"All statuses checked: {len(results)} items")
await pool.shutdown()
asyncio.run(main())
3. Worker-Limited Pool
Use Case: Preventing resource exhaustion by limiting concurrent connections. Ideal for database connections, file operations, or memory-intensive tasks where too many concurrent operations could overwhelm system resources.
import asyncio
from werkpool import worker_pool
async def process_file(filename):
# Memory or I/O intensive operation
await asyncio.sleep(0.5)
return f"Processed {filename}"
async def main():
# Maximum 5 concurrent tasks at any time
async with worker_pool(size=5) as pool:
files = [f"file_{i}.txt" for i in range(50)]
tasks = [pool.run(lambda f=f: process_file(f)) for f in files]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} files")
asyncio.run(main())
4. Combined Rate and Worker Limiting
Use Case: Scraping websites where you need to respect both the server's rate limit (10 requests/second) AND limit concurrent connections (5 simultaneous) to be a good citizen and avoid overwhelming the target server.
Rationale: This provides the most precise control. The size parameter ensures you don't open too many connections at once (which could trigger DDoS protection), while rate ensures you don't exceed the API's rate limit even if workers complete quickly.
import asyncio
from werkpool import WorkerPool
async def scrape_page(url):
# Simulated web scraping
await asyncio.sleep(0.2)
return f"Content from {url}"
async def main():
# Max 5 concurrent requests, but only 10 requests per second
pool = WorkerPool(size=5, rate=10)
urls = [f"https://example.com/page/{i}" for i in range(100)]
futures = [pool.run(lambda u=u: scrape_page(u)) for u in urls]
results = await asyncio.gather(*futures)
print(f"Scraped {len(results)} pages successfully")
await pool.shutdown()
asyncio.run(main())
5. Advanced: Retries and Timeouts
import asyncio
from werkpool import WorkerPool
async def unreliable_api_call(endpoint):
# Might fail occasionally
import random
if random.random() < 0.3:
raise ConnectionError("Network error")
await asyncio.sleep(0.1)
return f"Data from {endpoint}"
async def main():
pool = WorkerPool(size=10, rate=50)
future = pool.run(
lambda: unreliable_api_call("/data"),
timeout=5, # Max 5 seconds per attempt
retries=3, # Retry up to 3 times
retryable_exceptions=[ConnectionError, TimeoutError]
)
try:
result = await future
print(f"Success: {result}")
except Exception as e:
print(f"Failed after retries: {e}")
await pool.shutdown()
asyncio.run(main())
API Reference
WorkerPool
WorkerPool(size: int | None = None, rate: float | None = None)
- size: Maximum number of concurrent workers (None = unlimited)
- rate: Maximum executions per second (None = unlimited)
Methods
run() - Schedule a task for execution
pool.run(
task: Callable[[], Awaitable[T]],
timeout: int | None = None,
retries: int = 0,
retryable_exceptions: List[type[Exception]] = [Exception],
backoff: Callable[[int], float] = lambda attempts: 2**attempts + random.uniform(0, 1)
) -> asyncio.Future[T]
shutdown() - Wait for all tasks to complete
await pool.shutdown()
kill() - Cancel all pending tasks immediately
await pool.kill()
worker_pool (Context Manager)
async with worker_pool(size=10, rate=100) as pool:
# Use pool
pass
# Automatically calls shutdown
When to Use Each Mode
| Mode | Parameters | Use Case |
|---|---|---|
| Rate-Limited Only | rate=N |
API rate limits, throttling requests to external services |
| Worker-Limited Only | size=N |
Resource constraints (DB connections, memory), CPU-bound tasks |
| Combined | size=N, rate=M |
Web scraping, respecting both connection and rate limits |
Contributing
Found a bug? Have a feature request? Please open an issue on GitHub!
We welcome:
- 🐛 Bug reports
- 💡 Feature requests
- 📖 Documentation improvements
- 🔧 Pull requests
License
MIT License - see LICENSE file for details.
Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file werkpool-0.1.4.tar.gz.
File metadata
- Download URL: werkpool-0.1.4.tar.gz
- Upload date:
- Size: 6.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
02fab2a1505fda51a8a7a71e4ae9aa552f62e60e1485becc26bdfc895ecc9038
|
|
| MD5 |
cadc2a42b6d816daee60cbba902c1341
|
|
| BLAKE2b-256 |
29cfb449b8be27488c78a66fda93167a533d2bfee712309762c98a9378fcc221
|
File details
Details for the file werkpool-0.1.4-py3-none-any.whl.
File metadata
- Download URL: werkpool-0.1.4-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d36926281124e74a0c78f8dccb9e33f28f8aeb7a22acc4d7e24401b7c13b47d
|
|
| MD5 |
fec2b719569443f84bdc82bf261db779
|
|
| BLAKE2b-256 |
af0d212b946cf5c66588aa71f623fd156148d29cf150e810e7b1b7e59c17a5b7
|