Skip to main content

Per-key asyncio locking with automatic cleanup and graceful-shutdown support. Zero dependencies.

Project description

asyncio-keyed-lock

CI PyPI version Python versions License

Per-key asyncio locking with automatic cleanup and graceful-shutdown support. Zero runtime dependencies.

Internally, AsyncioKeyedLock maintains a dict that maps keys to regular asyncio.Lock instances. Locks are created on demand and evicted automatically when their reference count drops to zero, keeping memory usage proportional to the number of keys currently in contention.

Table of Contents

Key Features

  • Per-key mutual exclusion using the native async with lock("key") idiom — no callbacks, no manual acquire/release.
  • Event-driven eviction of idle keys. Internal lock entries are removed the moment no coroutine references them, so memory never grows beyond what is actively in use.
  • Graceful teardown via wait_for_all(). Wait for every in-flight critical section to complete before shutting down — useful for application lifecycle hooks and clean test isolation.
  • Introspection for monitoring and debugging: active_keys_count, active_keys, and the Pythonic "key" in lock containment check.
  • Zero runtime dependencies. The package uses only the Python standard library (asyncio, dataclasses, contextlib).
  • Fully type-annotated with a py.typed marker (PEP 561). Works out of the box with mypy, pyright, and other type checkers.
  • Tested on Python 3.10 through 3.14.

Installation

pip install asyncio-keyed-lock

Or with uv:

uv add asyncio-keyed-lock

Quick Start

import asyncio
from asyncio_keyed_lock import AsyncioKeyedLock

lock = AsyncioKeyedLock()

async def process(resource_id: str) -> None:
    async with lock(resource_id):
        # Only one coroutine at a time holds the lock for this resource_id.
        # Different resource_ids proceed concurrently.
        ...

async def main() -> None:
    await asyncio.gather(
        process("user:42"),
        process("user:42"),  # waits for the first one
        process("user:99"),  # proceeds immediately
    )

asyncio.run(main())

API

Member Kind Description
lock(key) async context manager Acquire the lock for key, yield, then release and clean up.
active_keys_count property Number of keys currently held or waited on. O(1).
active_keys property Snapshot list of keys currently held or waited on. O(k).
"key" in lock containment check True if key is currently held or waited on. O(1).
await lock.wait_for_all() async method Resolve the first time active_keys_count reaches zero. See Graceful Teardown.

Use Case: Concurrent Batch Processing

When consuming messages from a system like Apache Kafka, messages within a single partition are delivered in order. If you process them one at a time, per-key ordering is inherently preserved — two messages with the same key never overlap.

Batch processing changes the picture. Pulling multiple messages from the same partition and processing them concurrently is significantly faster, but it breaks the per-key serial guarantee: two messages that share a key can now execute in parallel, leading to the same race conditions described later in this document.

An AsyncioKeyedLock scoped to each batch restores per-key serialisation while keeping different keys fully concurrent. Combined with wait_for_all(), the batch handler can wait until every message has been processed before committing the consumer offset — ensuring no work is lost.

import asyncio
from dataclasses import dataclass

from asyncio_keyed_lock import AsyncioKeyedLock


@dataclass(slots=True)
class Message:
    key: str
    payload: bytes


async def handle_message(msg: Message) -> None:
    # Business logic that must not interleave for the same key.
    ...


async def process_batch(messages: list[Message]) -> None:
    lock = AsyncioKeyedLock()

    async def _process(msg: Message) -> None:
        async with lock(msg.key):
            await handle_message(msg)

    async with asyncio.TaskGroup() as tg:
        for msg in messages:
            tg.create_task(_process(msg))

    # All tasks have completed — safe to commit the batch offset.

Race Conditions in Single-Threaded asyncio

Python's asyncio runs in a single thread, but that does not prevent race conditions. The reason is that every await expression is a yield point — a place where the event loop can suspend the current coroutine and run another one.

Synchronous code between await points executes atomically within a single event-loop iteration and cannot be interleaved. However, any operation that spans multiple await points (e.g., read-then-write against a database) can be interleaved with other coroutines doing the same thing, leading to inconsistent state.

A keyed lock serialises those multi-await operations per key, ensuring that for a given key only one coroutine is inside the critical section at a time — while still allowing unrelated keys to proceed concurrently.

Graceful Teardown

wait_for_all() resolves the first time active_keys_count reaches zero after the call. This is useful during application shutdown or between tests to ensure all in-flight work is complete.

# Wait until all currently active keys have been released.
await lock.wait_for_all()

If new keys are acquired after the call but before the count first reaches zero, those keys must also be released before wait_for_all() resolves. In other words, the trigger is the first moment the lock holds no active keys at all — not the specific set of keys that existed at call time.

For a strict "wait until truly idle" guarantee when new work may keep arriving:

while lock.active_keys_count > 0:
    await lock.wait_for_all()

Development

git clone https://github.com/ori88c-python-packages/asyncio-keyed-lock.git
cd asyncio-keyed-lock
uv sync

# Run tests
uv run pytest

# Lint and format
uv run ruff check .
uv run ruff format .

# Type check
uv run mypy src

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_keyed_lock-1.0.0.tar.gz (35.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_keyed_lock-1.0.0-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_keyed_lock-1.0.0.tar.gz.

File metadata

  • Download URL: asyncio_keyed_lock-1.0.0.tar.gz
  • Upload date:
  • Size: 35.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for asyncio_keyed_lock-1.0.0.tar.gz
Algorithm Hash digest
SHA256 63377f88329898efd94e3aec8ad3509c76542609cee3aaa1cef97acee003c1e7
MD5 fb63f1fb13a942bf283099209da66570
BLAKE2b-256 27aab64fde09a79b5c92d75bee4b9408865b148ec31333cfcbe6b48d6669cc1b

See more details on using hashes here.

File details

Details for the file asyncio_keyed_lock-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: asyncio_keyed_lock-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for asyncio_keyed_lock-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 308495744d507541d45a529cdcacdf7295ed60c9bf4a73bd8db8780247487f5e
MD5 38621ff8a36e58d31d3a4b7af3374c4f
BLAKE2b-256 f6685c318430d8519160f900b2a0bbd5e5ef4de876221d55d73e4b4078a2cc56

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page