Skip to main content

A unified, delightful Python concurrency library

Project description

Concurry

Concurry

Documentation PyPI Version Python Versions License Build Status

Parallelism for humans.

Concurry is a unified, delightful concurrency library for Python. It replaces the fragmented landscape of threading, multiprocessing, asyncio, and Ray with a single, elegant API. Write your code once, and run it on a single thread, multiple cores, or a distributed cluster—without changing a line of business logic.


🚀 Quickstart: 50x Speedup in 3 Lines of Code

Calling LLMs sequentially is painfully slow. With Concurry, you can parallelize your existing code instantly.

Prerequisites: pip install concurry litellm

from pydantic import BaseModel
import litellm
# Line 1. Import concurry
from concurry import worker, gather

# Line 2. Add the @worker decorator to an existing class
@worker(mode="thread", max_workers=50)
class LLM(BaseModel):
    model: str
    
    def call(self, prompt: str) -> str:
        # This runs in a separate thread!
        return litellm.completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        ).choices[0].message.content

# Initialize your worker (looks just like a normal class)
llm = LLM(model="gpt-3.5-turbo")

prompts = [f"What is {i} + {i}?" for i in range(100)]
results = [llm.call(prompt) for prompt in prompts]  # Returns futures instantly, runs in parallel
# Line 3. gather futures
results = gather(results, progress=True)            # Waits for all results

print(f"Processed {len(results)} prompts!")
llm.stop()

The Result:

  • Sequential: ~780 seconds
  • Concurry: ~16 seconds (50x faster)

No refactoring. No concurrent.futures. No async def virus. No ray.remote. Just your code, parallelized. We think that's delicious 🤤


📦 Installation

pip install concurry

For distributed computing support:

pip install "concurry[ray]"

For all features:

pip install "concurry[all]"

💡 Why Concurry?

The Problem: Fragmentation

Python's concurrency tools are scattered.

  • Threading: Good for I/O, bad API (concurrent.futures).
  • Multiprocessing: Good for CPU, hard to debug, pickling errors.
  • Asyncio: High throughput, but requires rewriting everything (async/await).
  • Ray: Powerful for clusters, but heavyweight for scripts.

The Solution: Unified API

Concurry abstracts all of these into a single interface.

Without Concurry (The Old Way)

You have to learn 4 different APIs to do the same thing.

# ❌ Threading API
with ThreadPoolExecutor() as executor:
    future = executor.submit(task, arg)

# ❌ Multiprocessing API (Different behavior!)
with ProcessPoolExecutor() as executor:
    future = executor.submit(task, arg)

# ❌ Asyncio API (Rewrite everything!)
async def main():
    await asyncio.create_task(async_task(arg))

# ❌ Ray API (Another new API!)
ray.get(ray_task.remote(arg))

With Concurry (The Delightful Way)

One API, any backend.

from concurry import worker, gather

@worker
class MyWorker:
    def do_work(self, x: int) -> int:
        return x * 2

# Run on threads?
w = MyWorker.options(mode="thread", max_workers=10).init()

# Run on processes? Uncomment below.
# w = MyWorker.options(mode="process", max_workers=10).init()

# Run on a ray cluster? Uncomment below.
# w = MyWorker.options(mode="ray", max_workers=10).init()

# Run on asyncio? Uncomment below.
# w = MyWorker.options(mode="asyncio").init()

# The submission code NEVER changes:
futures = [w.do_work(i) for i in range(1000)]
# The collection code NEVER changes:
results = gather(futures, progress=True)
w.stop()

✨ Key Features

🎭 Actor-Based Workers

Stateful workers that persist across calls. Perfect for database connections, model weights, or session management.

from concurry import worker

@worker(mode="thread")
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self) -> int:
        self.count += 1
        return self.count

# State is preserved!
counter = Counter()
print(counter.increment().result())  # 1
print(counter.increment().result())  # 2
counter.stop()

🚦 Rate Limiting

Built-in rate limiting for APIs. Token buckets, sliding windows, and more, enforced globally across all workers.

from concurry import worker, gather, CallLimit

@worker(
    mode="thread",
    max_workers=20,
    # Limit to 100 calls per minute across ALL 20 threads
    limits=[CallLimit(window_seconds=60, capacity=100)]
)
class APIWorker:
    def fetch(self, url: str):
        # Rate limit is automatically checked here
        return f"Fetched {url}"

pool = APIWorker()
futures = [pool.fetch(f"url_{i}") for i in range(200)]
results = gather(futures, progress=True)  # Smoothly throttled!
pool.stop()

🔁 Intelligent Retries

Don't let flaky networks break your batch jobs. Configure retries declaratively.

from concurry import worker, RetryConfig

@worker(
    mode="thread",
    retry_config=RetryConfig(
        max_retries=5,
        retry_on=(ConnectionError, TimeoutError),
        backoff_factor=2.0  # Exponential backoff: 1s, 2s, 4s, ...
    )
)
class FlakyWorker:
    def fetch(self):
        # Automatically retried on failure!
        pass

✅ Pydantic Integration

Full support for Pydantic models. Arguments are validated and coerced before they even reach the worker.

from concurry import worker
from pydantic import BaseModel, Field

@worker(mode="process")
class DataWorker(BaseModel):
    db_url: str = Field(..., pattern=r"^postgres://")
    
    def process(self, data: dict):
        return data

# Validated at initialization!
try:
    w = DataWorker(db_url="invalid-url")
except Exception as e:
    print(f"Validation failed!: {e}")  # Caught before worker starts

🎬 The @task Decorator

Just want to run a function in parallel? You don't need a class.

from concurry import task, gather
import time

@task(mode="process", max_workers=4)
def heavy_computation(x: int) -> int:
    time.sleep(1)  ## Example heavy computation
    return x

# Run 100 heavy computations in parallel
futures = [heavy_computation(i) for i in range(100)]
results = gather(futures, progress=True)
heavy_computation.stop()

📚 Documentation


🤝 Contributing

We love contributions! Check out CONTRIBUTING.md to get started.


📄 License

Apache 2.0 - See LICENSE for details.


Made with ❤️ by Amazon Scientists

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurry-0.13.0.tar.gz (3.0 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

concurry-0.13.0-py3-none-any.whl (161.6 kB view details)

Uploaded Python 3

File details

Details for the file concurry-0.13.0.tar.gz.

File metadata

  • Download URL: concurry-0.13.0.tar.gz
  • Upload date:
  • Size: 3.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for concurry-0.13.0.tar.gz
Algorithm Hash digest
SHA256 868e5b048316ce3153dd0a24536b6bf863847df984814624934533cddac8b98b
MD5 ddd2f400d1dbede6abe06f097e6481e8
BLAKE2b-256 0a94c8954cf54d03662bd12ea3c27ef346730a4893bca5e8e73016033ac6281f

See more details on using hashes here.

File details

Details for the file concurry-0.13.0-py3-none-any.whl.

File metadata

  • Download URL: concurry-0.13.0-py3-none-any.whl
  • Upload date:
  • Size: 161.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for concurry-0.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 337509d770c5a3c495b77f3a5f605d23f6f2f2f01d5309cb7dd539a9cf0351b1
MD5 4f82c347b2c397bbc77cec06d6c73a1e
BLAKE2b-256 ca13395b71e22582f6c2a7742b6bb5b3f7ac09cb8c034497eff61130ee4154dd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page