Distributed semaphore and mutex with Redis and Sentinel support
Project description
╔════╗
║ 🔴 ║ ╔═════╗ ╔══════╗╔═════╗ ╔═╗╔══════╗
║ ║ ║ ╔══╗╚╗║ ╔════╝║ ╔═╗ ╚╗║ ║║ ╔════╝
║ 🟡 ║ ║ ╚══╝╔╝║ ╚══╗ ║ ║ ╚╗ ║║ ║║ ╚════╗
║ ║ ║ ╔═╗ ╚╗║ ╔══╝ ║ ║ ╔╝ ║║ ║╚════╗ ║
║ 🟢 ║ ║ ║ ╚╗ ║║ ╚════╗║ ╚═╝ ╔╝║ ║╔════╝ ║
║ ║ ╚═╝ ╚═╝╚══════╝╚═════╝ ╚═╝╚══════╝
╚════╝
|| ███████ ███████ ███ ███ █████ ██████ ██ ██ ██████ ██████ ███████
▄▄▄▄ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
████ ███████ █████ ██ ████ ██ ███████ ██████ ███████ ██ ██ ██████ █████
██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
███████ ███████ ██ ██ ██ ██ ██ ██ ██ ██████ ██ ██ ███████
🔒 Distributed Synchronization Primitives on Redis 🔒
Counting Semaphores • Mutexes • Fencing Tokens
Sync/Async • Sentinel • Heartbeat
Distributed semaphore and mutex on Redis with Sentinel support, a sync/async API, and an automatic heartbeat.
Features
- Counting Semaphore — limit concurrent access to a resource to at most N
- Mutex — exclusive lock (binary semaphore)
- Redis Sentinel — failover support for high availability
- Sync and Async APIs — works with both threading and asyncio
- Heartbeat — automatically extends the lock TTL
- Fencing Tokens — protection against race conditions during GC pauses
- Atomic operations — all critical operations run via Lua scripts
- Flexible wait strategies — polling with exponential backoff, or BLPOP for efficient blocking waits
- Docker for Redis — ready-to-use container for local development
Installation
pip install py-redis-semaphore
Or with uv:
uv add py-redis-semaphore
Quick start
Mutex (exclusive lock)
An exclusive lock. Use it when only one process/instance may access the resource at any point in time (e.g. database migrations, a cron job, cache recomputation).
import redis
from redis_semaphore import Mutex
client = redis.Redis()
with Mutex(client, "my-resource") as lock:
print(f"Fencing token: {lock.fencing_token}")
# Only one process can run this code
Counting Semaphore
Limits the number of concurrent owners of a resource to N. Use it when the resource tolerates bounded concurrency (a DB pool, an external API limit, CPU/IO task throttling).
from redis_semaphore import Semaphore, SemaphoreConfig
config = SemaphoreConfig(
name="database-pool",
limit=5, # Up to 5 concurrent connections
lock_timeout=30.0,
)
with Semaphore(client, config) as sem:
# Work with the database
pass
Async API
Async variants for asyncio. They behave like their sync counterparts but do not
block the event loop, and use async with, aacquire, and arelease.
import asyncio
import redis.asyncio as aioredis
from redis_semaphore import Mutex, Semaphore, SemaphoreConfig
async def main():
client = aioredis.Redis()
async with Mutex(client, "async-lock") as lock:
print(f"Mutex token: {lock.fencing_token}")
sem_cfg = SemaphoreConfig(name="async-semaphore", limit=3)
async with Semaphore(client, sem_cfg) as sem:
print(f"Semaphore token: {sem.fencing_token}")
await client.aclose()
asyncio.run(main())
Redis Sentinel
Use Sentinel when Redis is deployed in a high-availability setup and you need automatic failover when the master goes down.
Example of connecting through Sentinel (Mutex and Semaphore):
from redis_semaphore import SentinelConfig, RedisConnectionFactory, Mutex, Semaphore, SemaphoreConfig
config = SentinelConfig(
sentinels=[
("sentinel1.example.com", 26379),
("sentinel2.example.com", 26379),
("sentinel3.example.com", 26379),
],
service_name="mymaster",
password="secret",
)
client = RedisConnectionFactory.create_sync(config)
with Mutex(client, "ha-lock") as lock:
# Automatic failover when the master goes down
pass
sem_cfg = SemaphoreConfig(name="ha-semaphore", limit=5)
with Semaphore(client, sem_cfg) as sem:
pass
Reliability model and its limits
It is important to understand which guarantees the library provides and which it does not.
What is guaranteed. All operations (acquire/release/refresh/cleanup) are atomic
on the Redis side via Lua scripts. As long as the Redis node (master) is reachable
and consistent, mutual exclusion holds: there can never be more than limit owners
at once.
What is NOT guaranteed — failover. This is not Redlock and not a multi-master algorithm. A lock lives on a single master (directly or via Sentinel). Replication in Redis is asynchronous, so when the master fails and a replica is promoted, the owner record may not have been replicated yet — and a second client can acquire the "same" slot on the new master. In other words, during a failover or network split, mutual exclusion may be briefly violated.
How to live with this — fencing tokens. Every successful acquire returns a
monotonically increasing fencing_token. Pass it to the protected resource (a DB,
a file store, an external service) and reject operations carrying a token lower
than the last one seen. This way safety is enforced on the resource side rather
than by trusting the lock itself — it is the only correct way to protect against
double ownership during failover and against GC pauses.
Clocks. TTLs are computed using client-side time. NTP synchronization between clients is required; a clock skew of a few seconds breaks lock expiration.
Timeliness of loss detection. Keep the client socket_timeout smaller than
lock_timeout, otherwise a single stalled refresh can delay lock-loss escalation
past the deadline.
Bottom line: for task coordination, rate limiting, and pools where the resource verifies a fencing token, the library is production-ready. For scenarios that require strict mutual exclusion with zero tolerance for even a single violation during failover, you need a different mechanism (e.g. a consensus store).
What to choose and when
- Mutex — when you need exactly one owner: "no one but me" (migrations, report generation, background jobs that must not run in parallel).
- Semaphore — when bounded concurrency is acceptable: "no more than N" (connection pools, rate limiting external services, load throttling).
Configuration and usage
SemaphoreConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str | required | Logical resource name. Used in Redis keys, e.g. semaphore:{name}:owners. |
limit |
int | required | Maximum number of concurrent owners. Once there are limit owners, a new acquire returns busy. |
lock_timeout |
float | 30.0 | Slot TTL in seconds. Stored as the score in the owners ZSET and extended by the heartbeat. |
acquire_timeout |
float | None | Maximum wait time when blocking=True. None means wait forever. |
retry_interval |
float | 0.1 | Pause between repeated acquire attempts while waiting. |
refresh_interval |
float | None | Heartbeat interval. If not set, 80% of lock_timeout is used. |
namespace |
str | "semaphore" | Redis key prefix for isolating environments/services. |
strict_mode |
bool | False | If True, raises LockLostError immediately when the slot is lost. |
use_server_time |
bool | False | If True, time is taken from Redis (TIME). Useful when clocks drift between machines: TTLs and expired-slot cleanup stay consistent. The downside is one extra network RTT per time operation. Enable use_server_time if you run several instances on different hosts with no guaranteed clock sync, or if you observe premature timeouts or "stuck" slots caused by clock skew. |
acquire_mode |
AcquireMode | BLPOP | Wait strategy: POLLING (retry loop) or BLPOP (efficient blocking wait via Redis). |
retry_interval_max |
float | None | Maximum interval for exponential backoff. None means no backoff. |
retry_backoff_multiplier |
float | 2.0 | Multiplier for exponential backoff. |
retry_jitter |
float | 0.0 | Random jitter as a fraction of the interval (0.0–1.0). Helps avoid a thundering herd. |
blpop_timeout |
float | 5.0 | BLPOP timeout before a fallback retry (only for BLPOP mode). |
Example configuration and how it works
from redis_semaphore import Semaphore, SemaphoreConfig
config = SemaphoreConfig(
name="payments",
limit=5, # at most 5 concurrent owners
lock_timeout=30.0, # a slot lives for 30s without a refresh
acquire_timeout=None, # wait forever
retry_interval=0.5, # check for a free slot every 0.5s
refresh_interval=24.0, # heartbeat extends the TTL ahead of time
strict_mode=False, # don't crash when the slot is lost
)
# acquire will wait for a slot, checking every 0.5s.
# Once a slot is acquired, the heartbeat keeps the TTL alive
# so the record does not expire in Redis.
with Semaphore(client, config) as sem:
do_work()
Manual management (without a context manager)
from redis_semaphore import Semaphore, SemaphoreConfig
config = SemaphoreConfig(name="jobs", limit=3)
sem = Semaphore(client, config)
result = sem.acquire(blocking=True)
if result.success:
try:
sem.refresh() # manually extend the TTL if needed
do_work()
finally:
sem.release()
else:
print("Resource busy")
acquire()returns the result of the acquisition attempt.refresh()extends the slot TTL and returnsTrue/False.release()frees the slot; it is important to call it infinallyso you don't leave a slot held if the worker code raises.
How acquire() works
blocking=Trueenables waiting for a slot, polling atretry_interval.acquire_timeout=Nonemeans wait forever (only whenblocking=True).blocking=Falsemakes a single attempt and returnssuccess=Falseimmediately.with Semaphore(...)andwith Mutex(...)always useblocking=True.
Example: waiting for a slot (blocking)
from redis_semaphore import Semaphore, SemaphoreConfig
config = SemaphoreConfig(
name="jobs",
limit=2,
retry_interval=0.5,
acquire_timeout=5.0,
)
sem = Semaphore(client, config)
result = sem.acquire(blocking=True)
if result.success:
try:
do_work()
finally:
sem.release()
else:
# we only reach here if blocking=False
print("Resource busy")
The client will wait for a slot for up to 5 seconds, checking every 0.5 seconds.
If a slot is acquired, do_work() runs and the slot is released in finally.
Had we used blocking=False, the code would have gone straight to the else branch.
Example: a single quick attempt (non-blocking)
result = mutex.acquire(blocking=False)
if result.success:
try:
do_work()
finally:
mutex.release()
else:
print("Resource busy")
acquire makes a single attempt and returns immediately.
If the slot is taken, the code does not wait and goes straight to the else branch.
Wait strategies (AcquireMode)
When blocking=True, the semaphore waits for a slot to free up. There are two strategies:
BLPOP (default)
Uses Redis BLPOP to block until notified. On release(), a signal is published
that wakes a waiting client.
from redis_semaphore import Semaphore, SemaphoreConfig
# BLPOP is used by default
config = SemaphoreConfig(
name="jobs",
limit=5,
blpop_timeout=5.0, # fallback polling every 5 seconds
)
Advantages of BLPOP:
- Minimal load on Redis (no constant polling)
- Instant wake-up when a slot is freed
- The wait queue is not stored explicitly — BLPOP serves only as a wake-up signal
POLLING
A simple retry loop with a retry_interval pause between attempts.
from redis_semaphore import Semaphore, SemaphoreConfig, AcquireMode
config = SemaphoreConfig(
name="jobs",
limit=5,
acquire_mode=AcquireMode.POLLING,
retry_interval=0.1, # check every 100ms
)
With exponential backoff and jitter:
config = SemaphoreConfig(
name="jobs",
limit=5,
acquire_mode=AcquireMode.POLLING,
retry_interval=0.1, # initial interval
retry_interval_max=2.0, # maximum interval
retry_backoff_multiplier=2.0, # double every cycle
retry_jitter=0.1, # ±10% random jitter
)
Backoff is useful for reducing load on Redis during long waits. Jitter helps avoid a thundering herd when many clients wait at the same time.
When to choose POLLING:
- Simple cases with short waits
- Compatibility with older Redis versions
Handling lock loss
def on_lock_lost(identifier: str):
print(f"Lock {identifier} was lost!")
# Graceful shutdown
# Mutex example
mutex = Mutex(client, "critical", on_lock_lost=on_lock_lost)
# semaphore example
sem_cfg = SemaphoreConfig(name="critical-pool", limit=2)
semaphore = Semaphore(client, sem_cfg, on_lock_lost=on_lock_lost)
What happens in this example:
- When the slot is lost (e.g. the TTL expired),
on_lock_lostis invoked. - In the callback you can trigger a graceful shutdown.
Async methods
Async methods use the a prefix:
aacquire/arelease/arefresh__aenter__/__aexit__(viaasync with)
Example:
import redis.asyncio as aioredis
from redis_semaphore import Semaphore, SemaphoreConfig
async def main():
client = aioredis.Redis()
config = SemaphoreConfig(name="async", limit=2)
async with Semaphore(client, config) as sem:
print(sem.fencing_token)
await client.aclose()
You can also use the explicit async methods without async with:
import redis.asyncio as aioredis
from redis_semaphore import Mutex
async def main():
client = aioredis.Redis()
mutex = Mutex(client, "explicit-async")
result = await mutex.aacquire(blocking=False)
if result.success:
try:
print(mutex.fencing_token)
finally:
await mutex.arelease()
await client.aclose()
Logging
By default the standard logging module is used with the logger name redis_semaphore.
You can swap in your own logger (e.g. loguru or structlog):
from redis_semaphore import set_logger
# loguru
from loguru import logger as loguru_logger
set_logger(loguru_logger)
# structlog
import structlog
set_logger(structlog.get_logger("redis_semaphore"))
Metrics
Prometheus metrics are optional. To install:
pip install "py-redis-semaphore[prometheus]"
Example usage:
from prometheus_client import REGISTRY
from redis_semaphore import PrometheusMetrics, set_metrics
# Register the metrics in the application's existing registry
set_metrics(PrometheusMetrics(registry=REGISTRY))
Examples
See examples/basic_usage.py (sync/async examples) and
examples/multiprocess_simulation.py (a simulation of several processes).
Run them with:
python examples/basic_usage.py
python examples/multiprocess_simulation.py
More about how the semaphore works
See ARCHITECTURE.md — a step-by-step walkthrough of the
algorithm and implementation details.
Developer documentation
See CONTRIBUTING.md.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_redis_semaphore-0.1.1.tar.gz.
File metadata
- Download URL: py_redis_semaphore-0.1.1.tar.gz
- Upload date:
- Size: 129.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
68c4b2063e0d9fb94aad2c61c619cc5744bd84c2ee5e41ff94ceb3419362b0e4
|
|
| MD5 |
fbae77f8d59ee226bc3f0f9ca76812c5
|
|
| BLAKE2b-256 |
474e88b01ca422d4ff943bc7038021bff843859e1cda0e0ef7e9b2e15a345b3f
|
File details
Details for the file py_redis_semaphore-0.1.1-py3-none-any.whl.
File metadata
- Download URL: py_redis_semaphore-0.1.1-py3-none-any.whl
- Upload date:
- Size: 30.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
714e88ec98ca562f6c6dcffcdf31f571f199c40ac5d8cda7456865f7abbba0ac
|
|
| MD5 |
65281db03530eb02820c670fa6006596
|
|
| BLAKE2b-256 |
c1fc396c1249a1ca6e0fb9ffb9be895da32fbbfc274816ca0bee0f0594f8509d
|