Distributed FIFO lock using Redis Streams with strict ordering and crash recovery
Project description
Redis Stream-based FIFO Lock
This module provides both synchronous and asynchronous lock-like classes that ensure strict FIFO ordering using Redis Streams.
Features
- Strict FIFO ordering: Stream entry order defines who's next
- Blocking: Waiters block on BLPOP of their personal signal key; they wake only when dispatched
- Handoff: release() acks the previous holder's message and dispatches the next in a single step
- Crash safety: If a holder dies, its message stays pending. The next release() (or any call site) uses XAUTOCLAIM to take that stuck pending and re-signal the rightful owner
- Fencing token: The stream id (e.g., "1731294030000-0") is a natural, ever-increasing fencing token you can pass downstream if needed
Usage
Synchronous (StreamGate)
import redis
from redis_fifo_lock import StreamGate
# Create Redis client
r = redis.Redis(host='localhost', port=6379, db=0)
# Create gate
gate = StreamGate(r)
# Using context manager (recommended)
with gate:
# do your exclusive/critical work here
print("I have the lock!")
# Or manual acquire/release
owner, msg_id = gate.acquire()
try:
# do your exclusive/critical work here
print("I have the lock!")
finally:
gate.release(owner, msg_id)
# With timeout
try:
owner, msg_id = gate.acquire(timeout=30)
try:
# do work
pass
finally:
gate.release(owner, msg_id)
except TimeoutError:
print("Timed out waiting for lock")
Asynchronous (AsyncStreamGate)
import asyncio
import redis.asyncio as redis
from redis_fifo_lock import AsyncStreamGate
async def worker(name: str, gate: AsyncStreamGate):
async with await gate.session(): # blocks until it's your turn
print(f"{name} running…")
await asyncio.sleep(0.5) # do exclusive work
async def main():
r = redis.Redis(host="localhost", port=6379, db=0)
gate = AsyncStreamGate(r)
# launch a few contenders
tasks = [asyncio.create_task(worker(f"W{i}", gate)) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
Manual async acquire/release
import redis.asyncio as redis
from redis_fifo_lock import AsyncStreamGate
async def main():
r = redis.Redis(host="localhost", port=6379, db=0)
gate = AsyncStreamGate(r)
owner, msg_id = await gate.acquire()
try:
# do your exclusive/critical work here
print("I have the lock!")
finally:
await gate.release(owner, msg_id)
# With timeout
try:
owner, msg_id = await gate.acquire(timeout=30)
try:
# do work
pass
finally:
await gate.release(owner, msg_id)
except asyncio.TimeoutError:
print("Timed out waiting for lock")
Configuration
Both StreamGate and AsyncStreamGate accept the following optional parameters:
stream(str): Stream name for the gate (default: "gate:stream")group(str): Consumer group name (default: "gate:group")adv_consumer(str): Dispatcher/advancer consumer identity (auto-generated if None)sig_prefix(str): Prefix for per-waiter signal keys (default: "gate:sig:")sig_ttl_ms(int): TTL for signal keys in milliseconds (default: 300000 = 5 minutes)claim_idle_ms(int): Idle time before considering a holder dead (default: 60000 = 60 seconds)last_key(str): Key to store the last dispatched message ID (default: "gate:last-dispatched")
Example with custom configuration:
gate = StreamGate(
r,
stream="my:custom:stream",
group="my:custom:group",
sig_ttl_ms=120000, # 2 minutes
claim_idle_ms=30000, # 30 seconds
)
Cancellation
If you want to cancel waiting before being dispatched:
# Synchronous
owner, msg_id = gate.acquire(timeout=1) # Use short timeout
# If timeout expires, TimeoutError is raised and cleanup is automatic
# Or manually cancel
owner, msg_id = gate.acquire() # This would block, but in another context...
gate.cancel(owner, msg_id) # ...you can cancel
# Asynchronous
await gate.cancel(owner, msg_id)
Observability
You can use Redis commands to observe the state of the gate:
# View pending messages (active holders)
XPENDING gate:stream gate:group
# View stream entries
XRANGE gate:stream - +
# View consumer group info
XINFO GROUPS gate:stream
How it Works
- Enqueue: Each waiter enqueues a ticket using
XADD gate:stream * owner=<uuid> - Dispatch: The current holder (on
release()) advances the queue by:- Reading the next stream entry in order using
XREADGROUP - Signaling that specific owner to proceed via
LPUSHto their personal signal key - Keeping that entry pending (so if the owner dies, it can be re-delivered)
- Reading the next stream entry in order using
- Wait: Waiters block on
BLPOPof their personal signal key until dispatched - Release: When the owner eventually calls
release(), we:- ACK the entry that represented their turn using
XACK - Dispatch the next ticket using
XREADGROUP
- ACK the entry that represented their turn using
- Crash Recovery: If a holder dies, their message stays pending. The next
release()usesXAUTOCLAIMto detect and re-signal stuck holders afterclaim_idle_msmilliseconds
This keeps exactly one pending message representing the active holder.
Development
uv sync --all-extras
uv run pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redis_fifo_lock-0.1.3.tar.gz.
File metadata
- Download URL: redis_fifo_lock-0.1.3.tar.gz
- Upload date:
- Size: 27.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f352c31fe2f46d86cdfe13e50ea73f21b7a5e15f23f763e67e1c0726ff1dee25
|
|
| MD5 |
847c169a2f86a68f085779cbea7e4cb9
|
|
| BLAKE2b-256 |
b23fe6d1d0f10ee612e1d8bb1e0b84d879cac75fe3d15a03f57308a8009a0263
|
Provenance
The following attestation bundles were made for redis_fifo_lock-0.1.3.tar.gz:
Publisher:
publish.yml on gaussian/redis-fifo-lock
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redis_fifo_lock-0.1.3.tar.gz -
Subject digest:
f352c31fe2f46d86cdfe13e50ea73f21b7a5e15f23f763e67e1c0726ff1dee25 - Sigstore transparency entry: 1254812819
- Sigstore integration time:
-
Permalink:
gaussian/redis-fifo-lock@2d6d5b93bb00c896f0d75214bb833fa263949c82 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/gaussian
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2d6d5b93bb00c896f0d75214bb833fa263949c82 -
Trigger Event:
workflow_run
-
Statement type:
File details
Details for the file redis_fifo_lock-0.1.3-py3-none-any.whl.
File metadata
- Download URL: redis_fifo_lock-0.1.3-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dde8cfd7c6b434b0374607e22ab0a3419a005448d69e7a9e5da11c83040bdc7d
|
|
| MD5 |
710d4a15f87bbeb867c4e68792027b30
|
|
| BLAKE2b-256 |
3e397709b1bf11eb6b3e5b36066bda68b316a7827a252efc60d6643b6c248bea
|
Provenance
The following attestation bundles were made for redis_fifo_lock-0.1.3-py3-none-any.whl:
Publisher:
publish.yml on gaussian/redis-fifo-lock
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redis_fifo_lock-0.1.3-py3-none-any.whl -
Subject digest:
dde8cfd7c6b434b0374607e22ab0a3419a005448d69e7a9e5da11c83040bdc7d - Sigstore transparency entry: 1254812820
- Sigstore integration time:
-
Permalink:
gaussian/redis-fifo-lock@2d6d5b93bb00c896f0d75214bb833fa263949c82 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/gaussian
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2d6d5b93bb00c896f0d75214bb833fa263949c82 -
Trigger Event:
workflow_run
-
Statement type: