Skip to main content

A unified, delightful Python concurrency library

Project description

Concurry

Concurry

Documentation PyPI Version Python Versions License Build Status

Parallelism for humans.

Concurry is a unified, delightful concurrency library for Python. It replaces the fragmented landscape of threading, multiprocessing, asyncio, and Ray with a single, elegant API. Write your code once, and run it on a single thread, multiple cores, or a distributed cluster—without changing a line of business logic.


🚀 Quickstart: 50x Speedup in 3 Lines of Code

Calling LLMs sequentially is painfully slow. With Concurry, you can parallelize your existing code instantly.

Prerequisites: pip install concurry litellm

from pydantic import BaseModel
import litellm
# Line 1. Import concurry
from concurry import worker, gather

# Line 2. Add the @worker decorator to an existing class
@worker(mode="thread", max_workers=50)
class LLM(BaseModel):
    model: str
    
    def call(self, prompt: str) -> str:
        # This runs in a separate thread!
        return litellm.completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}]
        ).choices[0].message.content

# Initialize your worker (looks just like a normal class)
llm = LLM(model="gpt-3.5-turbo")

prompts = [f"What is {i} + {i}?" for i in range(100)]
results = [llm.call(prompt) for prompt in prompts]  # Returns futures instantly, runs in parallel
# Line 3. gather futures
results = gather(results, progress=True)            # Waits for all results

print(f"Processed {len(results)} prompts!")
llm.stop()

The Result:

  • Sequential: ~780 seconds
  • Concurry: ~16 seconds (50x faster)

No refactoring. No concurrent.futures. No async def virus. No ray.remote. Just your code, parallelized. We think that's delicious 🤤


📦 Installation

pip install concurry

For distributed computing support:

pip install "concurry[ray]"

For all features:

pip install "concurry[all]"

💡 Why Concurry?

The Problem: Fragmentation

Python's concurrency tools are scattered.

  • Threading: Good for I/O, bad API (concurrent.futures).
  • Multiprocessing: Good for CPU, hard to debug, pickling errors.
  • Asyncio: High throughput, but requires rewriting everything (async/await).
  • Ray: Powerful for clusters, but heavyweight for scripts.

The Solution: Unified API

Concurry abstracts all of these into a single interface.

Without Concurry (The Old Way)

You have to learn 4 different APIs to do the same thing.

# ❌ Threading API
with ThreadPoolExecutor() as executor:
    future = executor.submit(task, arg)

# ❌ Multiprocessing API (Different behavior!)
with ProcessPoolExecutor() as executor:
    future = executor.submit(task, arg)

# ❌ Asyncio API (Rewrite everything!)
async def main():
    await asyncio.create_task(async_task(arg))

# ❌ Ray API (Another new API!)
ray.get(ray_task.remote(arg))

With Concurry (The Delightful Way)

One API, any backend.

from concurry import worker, gather

@worker
class MyWorker:
    def do_work(self, x: int) -> int:
        return x * 2

# Run on threads?
w = MyWorker.options(mode="thread", max_workers=10).init()

# Run on processes? Uncomment below.
# w = MyWorker.options(mode="process", max_workers=10).init()

# Run on a ray cluster? Uncomment below.
# w = MyWorker.options(mode="ray", max_workers=10).init()

# Run on asyncio? Uncomment below.
# w = MyWorker.options(mode="asyncio").init()

# The submission code NEVER changes:
futures = [w.do_work(i) for i in range(1000)]
# The collection code NEVER changes:
results = gather(futures, progress=True)
w.stop()

✨ Key Features

🎭 Actor-Based Workers

Stateful workers that persist across calls. Perfect for database connections, model weights, or session management.

from concurry import worker

@worker(mode="thread")
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self) -> int:
        self.count += 1
        return self.count

# State is preserved!
counter = Counter()
print(counter.increment().result())  # 1
print(counter.increment().result())  # 2
counter.stop()

🚦 Rate Limiting

Built-in rate limiting for APIs. Token buckets, sliding windows, and more, enforced globally across all workers.

from concurry import worker, gather, CallLimit

@worker(
    mode="thread",
    max_workers=20,
    # Limit to 100 calls per minute across ALL 20 threads
    limits=[CallLimit(window_seconds=60, capacity=100)]
)
class APIWorker:
    def fetch(self, url: str):
        # Rate limit is automatically checked here
        return f"Fetched {url}"

pool = APIWorker()
futures = [pool.fetch(f"url_{i}") for i in range(200)]
results = gather(futures, progress=True)  # Smoothly throttled!
pool.stop()

🔁 Intelligent Retries

Don't let flaky networks break your batch jobs. Configure retries declaratively.

from concurry import worker, RetryConfig

@worker(
    mode="thread",
    retry_config=RetryConfig(
        max_retries=5,
        retry_on=(ConnectionError, TimeoutError),
        backoff_factor=2.0  # Exponential backoff: 1s, 2s, 4s, ...
    )
)
class FlakyWorker:
    def fetch(self):
        # Automatically retried on failure!
        pass

✅ Pydantic Integration

Full support for Pydantic models. Arguments are validated and coerced before they even reach the worker.

from concurry import worker
from pydantic import BaseModel, Field

@worker(mode="process")
class DataWorker(BaseModel):
    db_url: str = Field(..., pattern=r"^postgres://")
    
    def process(self, data: dict):
        return data

# Validated at initialization!
try:
    w = DataWorker(db_url="invalid-url")
except Exception as e:
    print(f"Validation failed!: {e}")  # Caught before worker starts

🎬 The @task Decorator

Just want to run a function in parallel? You don't need a class.

from concurry import task, gather
import time

@task(mode="process", max_workers=4)
def heavy_computation(x: int) -> int:
    time.sleep(1)  ## Example heavy computation
    return x

# Run 100 heavy computations in parallel
futures = [heavy_computation(i) for i in range(100)]
results = gather(futures, progress=True)
heavy_computation.stop()

📚 Documentation


🤝 Contributing

We love contributions! Check out CONTRIBUTING.md to get started.


📄 License

Apache 2.0 - See LICENSE for details.


Made with ❤️ by Amazon Scientists

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurry-0.12.2.tar.gz (3.0 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

concurry-0.12.2-py3-none-any.whl (159.8 kB view details)

Uploaded Python 3

File details

Details for the file concurry-0.12.2.tar.gz.

File metadata

  • Download URL: concurry-0.12.2.tar.gz
  • Upload date:
  • Size: 3.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for concurry-0.12.2.tar.gz
Algorithm Hash digest
SHA256 5fb65f58314761e3a14e9c25796a7665cc47c7bd0e2ae2c2f2044aa51b68b6ca
MD5 8a8aa75029c0a7e32956c87f2a57e24e
BLAKE2b-256 2502ecc28dbc9fb701cf7a9ea0535ba6f44425ba69f10566c07a562a8c2ef06b

See more details on using hashes here.

File details

Details for the file concurry-0.12.2-py3-none-any.whl.

File metadata

  • Download URL: concurry-0.12.2-py3-none-any.whl
  • Upload date:
  • Size: 159.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for concurry-0.12.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5b2f21eced500b181fbaec2ce28ad5a7d9d9dfd7762024912764e4b6fae69b1b
MD5 9c68d82ae48cc50b780277604f393be5
BLAKE2b-256 c8579e58cdf6c77fe7beb928c232af88cac1ed0a15a437e299b1030e960d59b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page