A unified, delightful Python concurrency library
Project description
Concurry
Parallelism for humans.
Concurry is a unified, delightful concurrency library for Python. It replaces the fragmented landscape of threading, multiprocessing, asyncio, and Ray with a single, elegant API. Write your code once, and run it on a single thread, multiple cores, or a distributed cluster—without changing a line of business logic.
🚀 Quickstart: 50x Speedup in 3 Lines of Code
Calling LLMs sequentially is painfully slow. With Concurry, you can parallelize your existing code instantly.
Prerequisites: pip install concurry litellm
from pydantic import BaseModel
import litellm
# Line 1. Import concurry
from concurry import worker, gather
# Line 2. Add the @worker decorator to an existing class
@worker(mode="thread", max_workers=50)
class LLM(BaseModel):
model: str
def call(self, prompt: str) -> str:
# This runs in a separate thread!
return litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
# Initialize your worker (looks just like a normal class)
llm = LLM(model="gpt-3.5-turbo")
prompts = [f"What is {i} + {i}?" for i in range(100)]
results = [llm.call(prompt) for prompt in prompts] # Returns futures instantly, runs in parallel
# Line 3. gather futures
results = gather(results, progress=True) # Waits for all results
print(f"Processed {len(results)} prompts!")
llm.stop()
The Result:
- Sequential: ~780 seconds
- Concurry: ~16 seconds (50x faster)
No refactoring. No concurrent.futures. No async def virus. No ray.remote.
Just your code, parallelized.
We think that's delicious 🤤
📦 Installation
pip install concurry
For distributed computing support:
pip install "concurry[ray]"
For all features:
pip install "concurry[all]"
💡 Why Concurry?
The Problem: Fragmentation
Python's concurrency tools are scattered.
- Threading: Good for I/O, bad API (
concurrent.futures). - Multiprocessing: Good for CPU, hard to debug, pickling errors.
- Asyncio: High throughput, but requires rewriting everything (
async/await). - Ray: Powerful for clusters, but heavyweight for scripts.
The Solution: Unified API
Concurry abstracts all of these into a single interface.
Without Concurry (The Old Way)
You have to learn 4 different APIs to do the same thing.
# ❌ Threading API
with ThreadPoolExecutor() as executor:
future = executor.submit(task, arg)
# ❌ Multiprocessing API (Different behavior!)
with ProcessPoolExecutor() as executor:
future = executor.submit(task, arg)
# ❌ Asyncio API (Rewrite everything!)
async def main():
await asyncio.create_task(async_task(arg))
# ❌ Ray API (Another new API!)
ray.get(ray_task.remote(arg))
With Concurry (The Delightful Way)
One API, any backend.
from concurry import worker, gather
@worker
class MyWorker:
def do_work(self, x: int) -> int:
return x * 2
# Run on threads?
w = MyWorker.options(mode="thread", max_workers=10).init()
# Run on processes? Uncomment below.
# w = MyWorker.options(mode="process", max_workers=10).init()
# Run on a ray cluster? Uncomment below.
# w = MyWorker.options(mode="ray", max_workers=10).init()
# Run on asyncio? Uncomment below.
# w = MyWorker.options(mode="asyncio").init()
# The submission code NEVER changes:
futures = [w.do_work(i) for i in range(1000)]
# The collection code NEVER changes:
results = gather(futures, progress=True)
w.stop()
✨ Key Features
🎭 Actor-Based Workers
Stateful workers that persist across calls. Perfect for database connections, model weights, or session management.
from concurry import worker
@worker(mode="thread")
class Counter:
def __init__(self):
self.count = 0
def increment(self) -> int:
self.count += 1
return self.count
# State is preserved!
counter = Counter()
print(counter.increment().result()) # 1
print(counter.increment().result()) # 2
counter.stop()
🚦 Rate Limiting
Built-in rate limiting for APIs. Token buckets, sliding windows, and more, enforced globally across all workers.
from concurry import worker, gather, CallLimit
@worker(
mode="thread",
max_workers=20,
# Limit to 100 calls per minute across ALL 20 threads
limits=[CallLimit(window_seconds=60, capacity=100)]
)
class APIWorker:
def fetch(self, url: str):
# Rate limit is automatically checked here
return f"Fetched {url}"
pool = APIWorker()
futures = [pool.fetch(f"url_{i}") for i in range(200)]
results = gather(futures, progress=True) # Smoothly throttled!
pool.stop()
🔁 Intelligent Retries
Don't let flaky networks break your batch jobs. Configure retries declaratively.
from concurry import worker, RetryConfig
@worker(
mode="thread",
retry_config=RetryConfig(
max_retries=5,
retry_on=(ConnectionError, TimeoutError),
backoff_factor=2.0 # Exponential backoff: 1s, 2s, 4s, ...
)
)
class FlakyWorker:
def fetch(self):
# Automatically retried on failure!
pass
✅ Pydantic Integration
Full support for Pydantic models. Arguments are validated and coerced before they even reach the worker.
from concurry import worker
from pydantic import BaseModel, Field
@worker(mode="process")
class DataWorker(BaseModel):
db_url: str = Field(..., pattern=r"^postgres://")
def process(self, data: dict):
return data
# Validated at initialization!
try:
w = DataWorker(db_url="invalid-url")
except Exception as e:
print(f"Validation failed!: {e}") # Caught before worker starts
🎬 The @task Decorator
Just want to run a function in parallel? You don't need a class.
from concurry import task, gather
import time
@task(mode="process", max_workers=4)
def heavy_computation(x: int) -> int:
time.sleep(1) ## Example heavy computation
return x
# Run 100 heavy computations in parallel
futures = [heavy_computation(i) for i in range(100)]
results = gather(futures, progress=True)
heavy_computation.stop()
📚 Documentation
- User Guide: Tutorials and best practices.
- API Reference: Detailed API specs.
- Gallery: Production-ready examples (LLM pipelines, web scrapers).
🤝 Contributing
We love contributions! Check out CONTRIBUTING.md to get started.
📄 License
Apache 2.0 - See LICENSE for details.
Made with ❤️ by Amazon Scientists
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file concurry-0.12.2.tar.gz.
File metadata
- Download URL: concurry-0.12.2.tar.gz
- Upload date:
- Size: 3.0 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5fb65f58314761e3a14e9c25796a7665cc47c7bd0e2ae2c2f2044aa51b68b6ca
|
|
| MD5 |
8a8aa75029c0a7e32956c87f2a57e24e
|
|
| BLAKE2b-256 |
2502ecc28dbc9fb701cf7a9ea0535ba6f44425ba69f10566c07a562a8c2ef06b
|
File details
Details for the file concurry-0.12.2-py3-none-any.whl.
File metadata
- Download URL: concurry-0.12.2-py3-none-any.whl
- Upload date:
- Size: 159.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b2f21eced500b181fbaec2ce28ad5a7d9d9dfd7762024912764e4b6fae69b1b
|
|
| MD5 |
9c68d82ae48cc50b780277604f393be5
|
|
| BLAKE2b-256 |
c8579e58cdf6c77fe7beb928c232af88cac1ed0a15a437e299b1030e960d59b6
|