Skip to main content

Distributed FIFO lock using Redis Streams with strict ordering and crash recovery

Project description

Redis Stream-based FIFO Lock

This module provides both synchronous and asynchronous lock-like classes that ensure strict FIFO ordering using Redis Streams.

Features

  • Strict FIFO ordering: Stream entry order defines who's next
  • Blocking: Waiters block on BLPOP of their personal signal key; they wake only when dispatched
  • Handoff: release() acks the previous holder's message and dispatches the next in a single step
  • Crash safety: If a holder dies, its message stays pending. The next release() (or any call site) uses XAUTOCLAIM to take that stuck pending and re-signal the rightful owner
  • Fencing token: The stream id (e.g., "1731294030000-0") is a natural, ever-increasing fencing token you can pass downstream if needed

Usage

Synchronous (StreamGate)

import redis
from redis_fifo_lock import StreamGate

# Create Redis client
r = redis.Redis(host='localhost', port=6379, db=0)

# Create gate
gate = StreamGate(r)

# Using context manager (recommended)
with gate:
    # do your exclusive/critical work here
    print("I have the lock!")

# Or manual acquire/release
owner, msg_id = gate.acquire()
try:
    # do your exclusive/critical work here
    print("I have the lock!")
finally:
    gate.release(owner, msg_id)

# With timeout
try:
    owner, msg_id = gate.acquire(timeout=30)
    try:
        # do work
        pass
    finally:
        gate.release(owner, msg_id)
except TimeoutError:
    print("Timed out waiting for lock")

Asynchronous (AsyncStreamGate)

import asyncio
import redis.asyncio as redis
from redis_fifo_lock import AsyncStreamGate

async def worker(name: str, gate: AsyncStreamGate):
    async with await gate.session():   # blocks until it's your turn
        print(f"{name} running…")
        await asyncio.sleep(0.5)       # do exclusive work

async def main():
    r = redis.Redis(host="localhost", port=6379, db=0)
    gate = AsyncStreamGate(r)

    # launch a few contenders
    tasks = [asyncio.create_task(worker(f"W{i}", gate)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Manual async acquire/release

import redis.asyncio as redis
from redis_fifo_lock import AsyncStreamGate

async def main():
    r = redis.Redis(host="localhost", port=6379, db=0)
    gate = AsyncStreamGate(r)

    owner, msg_id = await gate.acquire()
    try:
        # do your exclusive/critical work here
        print("I have the lock!")
    finally:
        await gate.release(owner, msg_id)

    # With timeout
    try:
        owner, msg_id = await gate.acquire(timeout=30)
        try:
            # do work
            pass
        finally:
            await gate.release(owner, msg_id)
    except asyncio.TimeoutError:
        print("Timed out waiting for lock")

Configuration

Both StreamGate and AsyncStreamGate accept the following optional parameters:

  • stream (str): Stream name for the gate (default: "gate:stream")
  • group (str): Consumer group name (default: "gate:group")
  • adv_consumer (str): Dispatcher/advancer consumer identity (auto-generated if None)
  • sig_prefix (str): Prefix for per-waiter signal keys (default: "gate:sig:")
  • sig_ttl_ms (int): TTL for signal keys in milliseconds (default: 300000 = 5 minutes)
  • claim_idle_ms (int): Idle time before considering a holder dead (default: 60000 = 60 seconds)
  • last_key (str): Key to store the last dispatched message ID (default: "gate:last-dispatched")

Example with custom configuration:

gate = StreamGate(
    r,
    stream="my:custom:stream",
    group="my:custom:group",
    sig_ttl_ms=120000,  # 2 minutes
    claim_idle_ms=30000,  # 30 seconds
)

Cancellation

If you want to cancel waiting before being dispatched:

# Synchronous
owner, msg_id = gate.acquire(timeout=1)  # Use short timeout
# If timeout expires, TimeoutError is raised and cleanup is automatic

# Or manually cancel
owner, msg_id = gate.acquire()  # This would block, but in another context...
gate.cancel(owner, msg_id)  # ...you can cancel

# Asynchronous
await gate.cancel(owner, msg_id)

Observability

You can use Redis commands to observe the state of the gate:

# View pending messages (active holders)
XPENDING gate:stream gate:group

# View stream entries
XRANGE gate:stream - +

# View consumer group info
XINFO GROUPS gate:stream

How it Works

  1. Enqueue: Each waiter enqueues a ticket using XADD gate:stream * owner=<uuid>
  2. Dispatch: The current holder (on release()) advances the queue by:
    • Reading the next stream entry in order using XREADGROUP
    • Signaling that specific owner to proceed via LPUSH to their personal signal key
    • Keeping that entry pending (so if the owner dies, it can be re-delivered)
  3. Wait: Waiters block on BLPOP of their personal signal key until dispatched
  4. Release: When the owner eventually calls release(), we:
    • ACK the entry that represented their turn using XACK
    • Dispatch the next ticket using XREADGROUP
  5. Crash Recovery: If a holder dies, their message stays pending. The next release() uses XAUTOCLAIM to detect and re-signal stuck holders after claim_idle_ms milliseconds

This keeps exactly one pending message representing the active holder.

Development

uv sync --all-extras
uv run pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redis_fifo_lock-0.1.3.tar.gz (27.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redis_fifo_lock-0.1.3-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file redis_fifo_lock-0.1.3.tar.gz.

File metadata

  • Download URL: redis_fifo_lock-0.1.3.tar.gz
  • Upload date:
  • Size: 27.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for redis_fifo_lock-0.1.3.tar.gz
Algorithm Hash digest
SHA256 f352c31fe2f46d86cdfe13e50ea73f21b7a5e15f23f763e67e1c0726ff1dee25
MD5 847c169a2f86a68f085779cbea7e4cb9
BLAKE2b-256 b23fe6d1d0f10ee612e1d8bb1e0b84d879cac75fe3d15a03f57308a8009a0263

See more details on using hashes here.

Provenance

The following attestation bundles were made for redis_fifo_lock-0.1.3.tar.gz:

Publisher: publish.yml on gaussian/redis-fifo-lock

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file redis_fifo_lock-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for redis_fifo_lock-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 dde8cfd7c6b434b0374607e22ab0a3419a005448d69e7a9e5da11c83040bdc7d
MD5 710d4a15f87bbeb867c4e68792027b30
BLAKE2b-256 3e397709b1bf11eb6b3e5b36066bda68b316a7827a252efc60d6643b6c248bea

See more details on using hashes here.

Provenance

The following attestation bundles were made for redis_fifo_lock-0.1.3-py3-none-any.whl:

Publisher: publish.yml on gaussian/redis-fifo-lock

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page