Skip to main content

Add your description here

Project description

asyncio-advanced-semaphores

Production-ready asyncio semaphores with TTL, heartbeat, fair-queueing, thread-safety, and distributed support (redis).

[!WARNING] This project is a work in progress. Do not use it in production environments yet.

Why Use This?

Traditional asyncio.Semaphore works great until it doesn't:

  • Distributed systems with multiple threads/machines? Good luck coordinating rate limits across multiple instances.
  • TTL support? If you want to limit the time a task can hold a slot, you need to implement it yourself (and it's not easy!).
  • Heartbeat support? In distributed mode, you don't want to leak slots for long when a machine is brutally killed.
  • Zero visibility? No idea which semaphores are congested or why.

This library solves all of that.

Features

  • ⏱️ TTL (Time To Live) — Automatic slot expiration prevents deadlocks from crashed tasks
  • 💓 Heartbeat system — Distributed semaphores stay alive with automatic keep-alive pings
  • ⚖️ Fair queueing — First-come, first-served acquisition prevents starvation
  • 🌐 Distributed coordination (or not) — Redis-backed semaphores work across processes and machines, Memory-backed semaphores work only locally
  • 🔍 Built-in observability — Query acquisition statistics to monitor congestion
  • 🛡️ Thread-safety — You can acquire semaphores from multiple threads

Limitations

  • We only support an async interface
  • We don't support "Redis Cluster" for the moment (for the distributed semaphore implementation)
  • All clients must be reasonably time synchronized (NTP)

Installation

pip install asyncio-advanced-semaphores

Quickstart

In-Memory Semaphore

Perfect for single-process rate limiting, connection pooling, or controlling concurrent operations.

import asyncio
from asyncio_advanced_semaphores import MemorySemaphore

# Limit to 10 concurrent operations
sem = MemorySemaphore(name="my-semaphore", value=10)

async def limited_operation():
    # note: cm() means "context manager"
    async with sem.cm() as acquired_result:
        print(f"Acquired slot id={acquired_result.acquisition_id}, slot_number={acquired_result.slot_number}!")
        await asyncio.sleep(1)  # Do some work
    print("Released slot!")

async def main():
    # Run 50 tasks, but only 10 at a time
    await asyncio.gather(*[limited_operation() for _ in range(50)])

asyncio.run(main())

Distributed Semaphore (Redis)

Coordinate rate limits and resource access across multiple services, processes, or machines.

import asyncio
from asyncio_advanced_semaphores import RedisSemaphore, RedisConfig

# Configure Redis connection
config = RedisConfig(
    url="redis://localhost:6379",
    namespace="my-app",
)

# Create a distributed semaphore (same name = shared across all instances)
sem = RedisSemaphore(
    name="external-api-rate-limit",
    value=100,  # Max 100 concurrent API calls across all services
    config=config,
)

async def call_external_api():
    # note: cm() means "context manager"
    async with sem.cm() as acquired_result:
        print(f"Acquired slot id={acquired_result.acquisition_id}, slot_number={acquired_result.slot_number}!")
        await make_api_request()
    print("Released distributed slot!")

asyncio.run(call_external_api())

Manual Acquire/Release

If you need more control, you can manually acquire and release slots using the acquire() and release() methods:

from asyncio_advanced_semaphores import MemorySemaphore

sem = MemorySemaphore(name="my-semaphore", value=2)

async def manual_usage():
    result = await sem.acquire()
    try:
        # critical section
        print(f"Acquired slot {result.slot_number}")
    finally:
        await sem.release(result.acquisition_id)

Observability

Monitor your semaphores to identify bottlenecks:

from asyncio_advanced_semaphores import MemorySemaphore

# Get statistics for all semaphores
stats = await MemorySemaphore.get_acquired_stats()
for name, stat in stats.items():
    print(f"{name}: {stat.acquired_slots}/{stat.max_slots} ({stat.acquired_percent:.1f}%)")

Advanced Usage

Different semaphore objects with the same name

You can create different semaphore objects with the same name attribute. They will share the same slots.

from asyncio_advanced_semaphores import MemorySemaphore

sem1 = MemorySemaphore(name="my-semaphore", value=1)
sem2 = MemorySemaphore(name="my-semaphore", value=1)

# Let's acquire my-semaphore with the first semaphore object
sem1_acquired_result = await sem1.acquire()

print(await sem2.locked()) # True, because sem1 and sem2 share the same slots (same name)

await sem1.release(sem1_acquired_result.acquisition_id)

print(await sem2.locked()) # False, because sem1 and sem2 share the same slots (same name)

Thread-safety

You can use the semaphore objects from multiple threads. Each thread will have its own event loop. You can share the same semaphore objects across threads or using distinct semaphore objects.

If you use distinct semaphore objects but with the same name attribute, they will share the same slots.

TTL (Time To Live)

You can set the ttl attribute to the number of seconds after which the slot will be released automatically.

from asyncio_advanced_semaphores import MemorySemaphore

sem = MemorySemaphore(name="my-semaphore", value=1, ttl=1)

# Let's acquire my-semaphore with the first semaphore object
result = await sem.acquire()

time.sleep(2)
# NOTE: the semaphore will be released automatically after 1 second (TTL)

Max Acquire Time

You can set the max_acquire_time attribute to the maximum number of seconds to wait for the slot to be acquired. If the slot is not acquired within the timeout, an TimeoutError is raised.

from asyncio_advanced_semaphores import MemorySemaphore

sem = MemorySemaphore(name="my-semaphore", value=1, max_acquire_time=1)

# Let's acquire my-semaphore with the first semaphore object
result1 = await sem.acquire()

try:
    result2 = await sem.acquire()
except TimeoutError:
    # TimeoutError will be raised after 1 second (max_acquire_time)
    pass

await sem.release(result1.acquisition_id)

Heartbeat

With RedisSemaphore, if you set a long ttl value (or no TTL at all), you can set a heartbeat_max_interval (default to 180 seconds) value to keep the slot alive.

from asyncio_advanced_semaphores import RedisSemaphore, RedisConfig

# Configure Redis connection
config = RedisConfig(
    url="redis://localhost:6379",
    namespace="my-app",
)

sem = RedisSemaphore(name="my-semaphore", value=1, ttl=86400, heartbeat_max_interval=180, config=config)

# Let's acquire the semaphore
result = await sem.acquire()

# do your async work...
# (a heartbeat will be automatically and regularly sent to the Redis server to keep the slot alive)

# If the program or the machine is brutally killed, the semaphore won't be released (no time to do that!)
# But the heartbeat task will disappear also and the semaphore will be released automatically after the
# `heartbeat_max_interval` value (180 seconds) which is a lot lower than the `ttl` value (86400 seconds).

[!WARNING] The heartbeat task is completly automatic (you don't have to do anything to keep the slot alive) but this is an asynchronous task. So don't block the event loop for too long to avoid blocking the heartbeat task and automatically releasing the slot.

DEV

This library is managed by UV and a Makefile.

make help to see the available commands.

Some architecture notes are available:

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_advanced_semaphores-0.0.6.tar.gz (24.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_advanced_semaphores-0.0.6-py3-none-any.whl (34.6 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_advanced_semaphores-0.0.6.tar.gz.

File metadata

  • Download URL: asyncio_advanced_semaphores-0.0.6.tar.gz
  • Upload date:
  • Size: 24.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for asyncio_advanced_semaphores-0.0.6.tar.gz
Algorithm Hash digest
SHA256 1e6ab452fc08af279adb0eab06b5b2dcad2be82465c13f76b3f6c9170cba3518
MD5 dfcdc9cc85e7b2b3532c111a49b88953
BLAKE2b-256 0d288d6073bd2d7dd9a9b62118db9657ae3d4dbe3027869de0adf6dd26a34a32

See more details on using hashes here.

File details

Details for the file asyncio_advanced_semaphores-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: asyncio_advanced_semaphores-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 34.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.27 {"installer":{"name":"uv","version":"0.9.27","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for asyncio_advanced_semaphores-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6c16ec9fb4826d5ebd9bab2bb3a8352404e41f8d6ceff4d090099d41a8a5fd83
MD5 c60ab140994c6ab6667d166b8bf12e9c
BLAKE2b-256 27ba82485055c7bc3cef0329f2a327815755be7c204f13557c781d2087dbe411

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page