Skip to main content

Redis-based distributed synchronization primitives for Python

Project description

redsync

PyPI version Python 3.10+ License: MIT

Redis-based distributed synchronization primitives for Python. Async API using redis.asyncio.

Features

Core

  • Blocking, no polling – Uses Redis BLPOP: the connection blocks on the server until a permit or signal is available. No busy-waiting, no lock + pub/sub overhead.
  • Async-first – Built on redis.asyncio; use with async/await.
  • Python 3.10+ – Modern Python support.

Semaphore

  • N permits – Semaphore count from 1 to 4096 for limiting concurrency across processes.
  • Configurable init – LUA (atomic, default) or OPTIMISTIC_LOCKING strategy for creating the permit pool.
  • Automatic lifecycle – Built-in watchdog keeps semaphores alive while in use; Redis auto-deletes keys when all clients disconnect or crash.

Event

  • Distributed Signal – Simple one-to-one signaling between processes.

TODO

  • Automatic leaked permit recovery – Implement a mechanism (e.g. via heartbeats in metadata) to detect and reclaim permits that were leaked because a worker crashed while holding them.
  • Other sync primitives – Add more primitives.

Installation

pip install redsync

Or with uv:

uv add redsync

Requirements: Redis server, redis>=5.0.0 (async support).

Semaphore

Usage

import asyncio
from redis.asyncio import Redis
from redsync import RedisSemaphore, RedisSemaphoreTimeoutError

async def main():
    r = Redis()
    sem = await RedisSemaphore.create(r, "my_resource", count=1)

    # acquire() raises RedisSemaphoreTimeoutError on timeout
    try:
        await sem.acquire(timeout=10)
        try:
            # do work
            pass
        finally:
            await sem.release()
    except RedisSemaphoreTimeoutError:
        pass  # handle timeout

    # or use context manager (raises on timeout)
    async with sem:
        # do work
        pass

asyncio.run(main())

N permits and attaching

Use count > 1 to allow N concurrent holders. count must be between 1 and 4096.

from redsync import SemaphoreInitStrategy

# Creator initializes the pool
sem = await RedisSemaphore.create(r, "pool", count=5, semaphore_init_strategy=SemaphoreInitStrategy.LUA)

# Other workers can attach without knowing the count
worker_sem = await RedisSemaphore.attach(r, "pool", timeout=60.0)
print(f"Total permits: {await worker_sem.get_count()}")

await worker_sem.acquire()
# ...
await worker_sem.release()

create vs attach

In a distributed environment, you have two options for connecting to a semaphore:

  1. Call create() everywhere (Idempotent): If your application consists of multiple identical worker nodes running the exact same codebase, they can all safely call RedisSemaphore.create(..., count=5). The first worker to execute it will atomically initialize the semaphore, and the rest will instantly validate that their requested count matches the existing one.
  2. Call create() once, and attach() elsewhere (Consumer): If your architecture has a central "manager" process that dictates concurrency limits, the manager calls create(..., count=5). The worker processes then call RedisSemaphore.attach(..., timeout=60.0). attach() does not require a count, never initializes the pool, and simply polls until the creator sets it up.

Semantic Intent: While using create() everywhere works perfectly, using attach() cleanly separates your Control Plane (the entity that decides the concurrency limits and creates the resources) from your Data Plane (the workers that just consume the resources). It simplifies worker code because workers do not need to hardcode or know the count beforehand—they just say "Give me a permit for the pool the manager set up."

Init strategies

The semaphore uses a Redis list as a permit pool. The list must be created and filled with count elements before anyone can BLPOP. Two strategies are supported:

Lua Optimistic Locking
Idea Run a script that atomically ensures the list has N elements (if LLEN == 0 then RPUSH N times). Uses a Redis transaction (WATCH + MULTI/EXEC) to atomically check if the metadata exists, and if not, creates the list and metadata.
Pros Single atomic op; no extra key; idempotent. No Lua; perfectly atomic; crash-proof.
Cons Requires Lua (standard in Redis). Transaction retry loop in Python code.

Default is SemaphoreInitStrategy.LUA. Use SemaphoreInitStrategy.OPTIMISTIC_LOCKING to avoid Lua.

Event

A distributed version of a one-to-one signal. set() pushes a signal to Redis, and wait() blocks until a signal is available.

Usage

from redsync import RedisEvent, RedisEventTimeoutError

async def worker():
    event = RedisEvent(r, "task_done")

    print("Waiting for signal...")
    await event.wait(timeout=60)
    print("Signal received, continuing work!")

async def trigger():
    event = RedisEvent(r, "task_done")
    await event.set()  # Wakes up exactly one waiter

Characteristics

  • One-to-One: Each set() call wakes up exactly one wait() call.
  • Persistence: If set() is called when no one is waiting, the signal is stored in Redis until a waiter arrives.
  • No Polling: Uses BLPOP for efficient blocking.

Lifecycle (Watchdog)

Every RedisSemaphore instance runs a background watchdog task that periodically extends the TTL of the underlying Redis keys. This ensures:

  • Keys stay alive as long as at least one client holds a reference to the semaphore.
  • Automatic cleanup when all clients disconnect or crash — Redis expires the keys after lease_ttl seconds with no renewals.
  • No thundering herd — each watchdog adds random jitter to its renewal interval so multiple clients don't all hit Redis at the same instant.
Parameter Default Description
lease_ttl 300.0 (5 min) TTL set on Redis keys. The watchdog renews every lease_ttl / 3 seconds (±20% jitter).
# Custom lease TTL
sem = await RedisSemaphore.create(r, "my_resource", count=3, lease_ttl=120.0)

# Always close when done to stop the watchdog immediately
await sem.close()

If close() is not called (e.g. process crash), the keys will naturally expire after lease_ttl seconds.

Exceptions

  • RedsyncError - Base exception
  • RedisSemaphoreError, RedisEventError - Component base exceptions
  • RedisSemaphoreTimeoutError, RedisEventTimeoutError – Timeout occurred
  • RedisSemaphoreNotAcquiredErrorrelease() called without acquiring
  • RedisSemaphoreCountErrorcount not in 1–4096
  • RedisSemaphoreCountMismatchErrorcreate() count mismatch

API Reference

RedisSemaphore

class RedisSemaphore:
    @classmethod
    async def create(cls, redis_client, name: str, *, count: int = 1,
                    lease_ttl: float = 300.0,
                    semaphore_init_strategy: SemaphoreInitStrategy = SemaphoreInitStrategy.LUA,
                    key_prefix: str = "redsync:semaphore") -> RedisSemaphore

    @classmethod
    async def attach(cls, redis_client, name: str, *, timeout: float | None = 60.0,
                    lease_ttl: float = 300.0,
                    key_prefix: str = "redsync:semaphore") -> RedisSemaphore

    async def get_count(self) -> int | None

    async def acquire(self, timeout: float | None = None) -> None  # None = block until available
    async def release(self) -> None
    async def close(self) -> None  # Stop the watchdog task
    async def __aenter__(self) -> RedisSemaphore
    async def __aexit__(...) -> None
  • name – Semaphore identifier (shared across processes).
  • count – Number of permits (1–4096).
  • lease_ttl – TTL in seconds for Redis keys; the watchdog renews every lease_ttl / 3 seconds.
  • timeout – For acquire(): seconds to wait; None blocks indefinitely. Raises RedisSemaphoreTimeoutError on timeout.

RedisEvent

class RedisEvent:
    def __init__(
        self, 
        redis_client, name: str, *,
        key_prefix: str = "redsync:event"
    ) -> None
    async def set(self) -> None
    async def wait(self, timeout: float | None = None) -> None
    async def clear(self) -> None
    async def is_set(self) -> bool

Running tests

pytest
# or
uv run pytest

Set REDIS_URL if Redis is not on localhost:6379.

License

MIT License – see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redsync-2.0.0.tar.gz (64.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redsync-2.0.0-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file redsync-2.0.0.tar.gz.

File metadata

  • Download URL: redsync-2.0.0.tar.gz
  • Upload date:
  • Size: 64.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for redsync-2.0.0.tar.gz
Algorithm Hash digest
SHA256 bc5353a8fea472271153b4d439fae9fc089072faceb29d36e12c1569a193bcc4
MD5 a1e32f8755e248564cf4598456d3bb24
BLAKE2b-256 02f2892948e357697394a59a2f7ab66a5765fb6170c97200c4a9cd3b6e346da2

See more details on using hashes here.

Provenance

The following attestation bundles were made for redsync-2.0.0.tar.gz:

Publisher: release.yml on martinmkhitaryan/redsync

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redsync-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: redsync-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 9.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for redsync-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 27f88c8c748851ecaea03786548f17248aac13ee7fc7cba9b16f756deae4177e
MD5 2c04615502c3299abab2a48573becdba
BLAKE2b-256 fe5a38691dbb1033e6386b3b5354e5c4a495f95d77b6d69e4fd79ba30dc75bb7

See more details on using hashes here.

Provenance

The following attestation bundles were made for redsync-2.0.0-py3-none-any.whl:

Publisher: release.yml on martinmkhitaryan/redsync

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page