Worker pool in Python with size and rate bounding options
Project description
werkpool
werkpool is a Python library for managing asynchronous task execution with fine-grained control over concurrency and rate limiting. Built on asyncio, it provides a flexible worker pool that can handle various workload patterns efficiently.
Features
- Concurrency Control: Limit the number of tasks executing simultaneously
- Rate Limiting: Control task execution rate (tasks per second)
- Hybrid Mode: Combine both concurrency and rate limits
- Task Retries: Built-in retry logic with configurable backoff strategies
- Timeout Support: Set per-task execution timeouts
- Context Manager: Clean resource management with
async withsyntax
Installation
pip install werkpool
Quick Start
import asyncio
from werkpool import WorkerPool
async def fetch_data(url):
# Your async task here
return f"Data from {url}"
async def main():
pool = WorkerPool(size=10, rate=100)
# Schedule tasks
future = pool.run(lambda: fetch_data("https://example.com"))
result = await future
await pool.shutdown()
asyncio.run(main())
Usage Examples
1. Rate-Limited (High Throughput)
Making requests with a rate limit of 100 requests per second.
import asyncio
from werkpool import worker_pool
async def call_api(item_id):
# Simulated API call
await asyncio.sleep(0.01)
return f"Processed {item_id}"
async def main():
async with worker_pool(rate=100) as pool: # 100 tasks per second
tasks = [pool.run(lambda i=i: call_api(i)) for i in range(1000)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
2. Rate-Limited Pool (Low Throughput)
Calling a rate-restricted API allowing less than 1 request every second.
import asyncio
from werkpool import WorkerPool
async def check_status(resource_id):
# Expensive or rate-limited operation
await asyncio.sleep(0.1)
return f"Status of {resource_id}"
async def main():
pool = WorkerPool(rate=0.2) # 0.2 per second = 1 request per 5 seconds
futures = []
for i in range(10):
future = pool.run(lambda i=i: check_status(i))
futures.append(future)
results = await asyncio.gather(*futures)
print(f"All statuses checked: {len(results)} items")
await pool.shutdown()
asyncio.run(main())
3. Worker-Limited Pool
Preventing resource exhaustion by limiting concurrent workers.
import asyncio
from werkpool import worker_pool
async def process_file(filename):
# Memory or I/O intensive operation
await asyncio.sleep(0.5)
return f"Processed {filename}"
async def main():
# Maximum 5 concurrent tasks at any time
async with worker_pool(size=5) as pool:
files = [f"file_{i}.txt" for i in range(50)]
tasks = [pool.run(lambda f=f: process_file(f)) for f in files]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} files")
asyncio.run(main())
4. Combined Rate and Worker Limiting
In some situations like web scraping, you may need to respect both rate and concurrency limitationsss.
import asyncio
from werkpool import WorkerPool
async def scrape_page(url):
# Simulated web scraping
await asyncio.sleep(0.2)
return f"Content from {url}"
async def main():
# Max 5 concurrent requests, but only 10 requests per second
pool = WorkerPool(size=5, rate=10)
urls = [f"https://example.com/page/{i}" for i in range(100)]
futures = [pool.run(lambda u=u: scrape_page(u)) for u in urls]
results = await asyncio.gather(*futures)
print(f"Scraped {len(results)} pages successfully")
await pool.shutdown()
asyncio.run(main())
5. Advanced: Retries and Timeouts
import asyncio
from werkpool import WorkerPool
async def unreliable_api_call(endpoint):
# Might fail occasionally
import random
if random.random() < 0.3:
raise ConnectionError("Network error")
await asyncio.sleep(0.1)
return f"Data from {endpoint}"
async def main():
pool = WorkerPool(size=10, rate=50)
future = pool.run(
lambda: unreliable_api_call("/data"),
timeout=5, # Max 5 seconds per attempt
retries=3, # Retry up to 3 times
retryable_exceptions=[ConnectionError, TimeoutError]
)
try:
result = await future
print(f"Success: {result}")
except Exception as e:
print(f"Failed after retries: {e}")
await pool.shutdown()
asyncio.run(main())
API Reference
WorkerPool
WorkerPool(size: int | None = None, rate: float | None = None)
- size: Maximum number of concurrent workers (None = unlimited)
- rate: Maximum executions per second (None = unlimited)
Methods
run() - Schedule a task for execution
pool.run(
task: Callable[[], Awaitable[T]],
timeout: int | None = None,
retries: int = 0,
retryable_exceptions: List[type[Exception]] = [Exception],
backoff: Callable[[int], float] = lambda attempts: 2**attempts + random.uniform(0, 1)
) -> asyncio.Future[T]
shutdown() - Wait for all tasks to complete
await pool.shutdown()
kill() - Cancel all pending tasks immediately
await pool.kill()
worker_pool (Context Manager)
async with worker_pool(size=10, rate=100) as pool:
# Use pool
pass
# Automatically calls shutdown
Contributing
Found a bug? Have a feature request? Please open an issue on GitHub!
We welcome:
- 🐛 Bug reports
- 💡 Feature requests
- 📖 Documentation improvements
- 🔧 Pull requests
License
MIT License - see LICENSE file for details.
Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file werkpool-0.2.0.tar.gz.
File metadata
- Download URL: werkpool-0.2.0.tar.gz
- Upload date:
- Size: 5.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f5372886a2795f8dd1fc5b0f70d28c74b7db3bf1c502a0f8dd32ef627334932
|
|
| MD5 |
1a516a5a5042e8f1ab4d8cf088c19f15
|
|
| BLAKE2b-256 |
8fb3f52c18b18aea65b385f146c42b95feb3ea390535eb40ffc68d8b2d497c89
|
File details
Details for the file werkpool-0.2.0-py3-none-any.whl.
File metadata
- Download URL: werkpool-0.2.0-py3-none-any.whl
- Upload date:
- Size: 6.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.12.11 Linux/6.11.11-linuxkit
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e640188db8b8a7d8705467182ad0b6631c9361de970b2793697a60c2a2ad720f
|
|
| MD5 |
5a4c59d8d2b25bc270389c1aa4da7a80
|
|
| BLAKE2b-256 |
5272fe13ab9495b6351c356374fed13c6bc5d3b32e9ee54f9905cc1d2fb67107
|