Per-key asyncio locking with automatic cleanup and graceful-shutdown support. Zero dependencies.
Project description
asyncio-keyed-lock
Per-key asyncio locking with automatic cleanup and graceful-shutdown support. Zero runtime dependencies.
Internally, AsyncioKeyedLock maintains a dict that maps keys to regular asyncio.Lock instances. Locks are created on demand and evicted automatically when their reference count drops to zero, keeping memory usage proportional to the number of keys currently in contention.
Table of Contents
- Key Features
- Installation
- Quick Start
- API
- Use Cases
- Race Conditions in Single-Threaded asyncio
- Graceful Teardown
- Development
- License
Key Features
- Per-key mutual exclusion using the native
async with lock("key")idiom — no callbacks, no manual acquire/release. - Event-driven eviction of idle keys. Internal lock entries are removed the moment no coroutine references them, so memory never grows beyond what is actively in use.
- Graceful teardown via
wait_for_all(). Wait for every in-flight critical section to complete before shutting down — useful for application lifecycle hooks and clean test isolation. - Introspection for monitoring and debugging:
active_keys_count,active_keys, and the Pythonic"key" in lockcontainment check. - Zero runtime dependencies. The package uses only the Python standard library (
asyncio,dataclasses,contextlib). - Fully type-annotated with a
py.typedmarker (PEP 561). Works out of the box with mypy, pyright, and other type checkers. - Tested on Python 3.10 through 3.14.
Installation
pip install asyncio-keyed-lock
Or with uv:
uv add asyncio-keyed-lock
Quick Start
import asyncio
from asyncio_keyed_lock import AsyncioKeyedLock
lock = AsyncioKeyedLock()
async def process(resource_id: str) -> None:
async with lock(resource_id):
# Only one coroutine at a time holds the lock for this resource_id.
# Different resource_ids proceed concurrently.
...
async def main() -> None:
await asyncio.gather(
process("user:42"),
process("user:42"), # waits for the first one
process("user:99"), # proceeds immediately
)
asyncio.run(main())
API
| Member | Kind | Description |
|---|---|---|
lock(key) |
async context manager | Acquire the lock for key, yield, then release and clean up. |
active_keys_count |
property | Number of keys currently held or waited on. O(1). |
active_keys |
property | Snapshot list of keys currently held or waited on. O(k). |
"key" in lock |
containment check | True if key is currently held or waited on. O(1). |
await lock.wait_for_all() |
async method | Resolve the first time active_keys_count reaches zero. See Graceful Teardown. |
Use Cases
Concurrent Batch Processing
When consuming messages from a system like Apache Kafka, messages within a single partition are delivered in order. If you process them one at a time, per-key ordering is inherently preserved — two messages with the same key never overlap.
Batch processing changes the picture. Pulling multiple messages from the same partition and processing them concurrently is significantly faster, but it breaks the per-key serial guarantee: two messages that share a key can now execute in parallel, leading to the same race conditions described later in this document.
An AsyncioKeyedLock scoped to each batch restores per-key serialisation while keeping different keys fully concurrent. Combined with wait_for_all(), the batch handler can wait until every message has been processed before committing the consumer offset — ensuring no work is lost.
import asyncio
from dataclasses import dataclass
from asyncio_keyed_lock import AsyncioKeyedLock
@dataclass(slots=True)
class Message:
key: str
payload: bytes
async def handle_message(msg: Message) -> None:
# Business logic that must not interleave for the same key.
...
async def process_batch(messages: list[Message]) -> None:
lock = AsyncioKeyedLock()
async def _process(msg: Message) -> None:
async with lock(msg.key):
await handle_message(msg)
async with asyncio.TaskGroup() as tg:
for msg in messages:
tg.create_task(_process(msg))
# All tasks have completed — safe to commit the batch offset.
Reducing Write-Concern Failures in Database Transactions
In databases like PostgreSQL, concurrent transactions that touch the same rows can fail with serialization errors or write-concern violations — the database's mechanism for detecting conflicting writes. The standard remedy is a retry loop, but retries carry real costs: round-trip latency, connection-pool pressure, and unnecessary rollback traffic. When multiple coroutines within the same process operate on the same key (a user ID, an IP address, a device identifier), the collision probability is especially high. A keyed lock serialises those coroutines before they reach the database, eliminating intra-process conflicts entirely without sacrificing concurrency across different keys.
Example — threat-score aggregation in a SIEM pipeline. A threat-intelligence service maintains a cumulative severity score per IP address and automatically blocks the entity once a configured threshold is crossed. Events arrive concurrently (port scans, failed authentications, etc.). When two coroutines process events for the same IP simultaneously, the database transaction guarantees correctness — but one of the conflicting transactions is likely to be rejected with a write-concern or serialization error, forcing a retry. A keyed lock scoped per IP address serialises these coroutines before they reach the database, reducing the frequency of such failures for same-pod traffic without sacrificing concurrency across different IP addresses.
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from asyncio_keyed_lock import AsyncioKeyedLock
BLOCK_THRESHOLD = 100
lock = AsyncioKeyedLock()
async def record_security_event(
ip_address: str,
severity: int,
session: AsyncSession,
) -> None:
async with lock(ip_address):
# Only one coroutine per IP is inside this block at a time.
# Events for different IPs are processed concurrently.
result = await session.execute(
select(ThreatProfile).where(ThreatProfile.ip_address == ip_address)
)
profile = result.scalar_one_or_none()
if profile is None:
profile = ThreatProfile(ip_address=ip_address, score=severity)
session.add(profile)
else:
profile.score += severity
if profile.score >= BLOCK_THRESHOLD:
profile.blocked = True
profile.blocked_at = datetime.now(timezone.utc)
await session.commit()
Important: Using an in-memory keyed lock for this purpose is an optimisation, not a correctness guarantee. In replicated, stateless deployments — where multiple pods may concurrently handle events for the same IP or entity — cross-pod conflicts remain possible and the database transaction itself must ensure correctness. Cross-pod keyed locking is achievable (e.g., via a distributed lock backed by Redis), but it introduces additional network latency on every acquisition. An in-memory lock is therefore a practical and accepted trade-off: it eliminates intra-process contention at zero network cost, meaningfully reducing the frequency of write-concern failures and rollback traffic, as long as it is treated as an optimisation rather than a substitute for proper transaction semantics at the database level.
Race Conditions in Single-Threaded asyncio
Python's asyncio runs in a single thread, but that does not prevent race conditions. The reason is that every await expression is a yield point — a place where the event loop can suspend the current coroutine and run another one.
Synchronous code between await points executes atomically within a single event-loop iteration and cannot be interleaved. However, any operation that spans multiple await points (e.g., read-then-write against a database) can be interleaved with other coroutines doing the same thing, leading to inconsistent state.
A keyed lock serialises those multi-await operations per key, ensuring that for a given key only one coroutine is inside the critical section at a time — while still allowing unrelated keys to proceed concurrently.
Graceful Teardown
wait_for_all() resolves the first time active_keys_count reaches zero after the call. This is useful during application shutdown or between tests to ensure all in-flight work is complete.
# Wait until all currently active keys have been released.
await lock.wait_for_all()
If new keys are acquired after the call but before the count first reaches zero, those keys must also be released before wait_for_all() resolves. In other words, the trigger is the first moment the lock holds no active keys at all — not the specific set of keys that existed at call time.
For a strict "wait until truly idle" guarantee when new work may keep arriving:
while lock.active_keys_count > 0:
await lock.wait_for_all()
Development
git clone https://github.com/ori88c-python-packages/asyncio-keyed-lock.git
cd asyncio-keyed-lock
uv sync
# Run tests
uv run pytest
# Lint and format
uv run ruff check .
uv run ruff format .
# Type check
uv run mypy src
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncio_keyed_lock-1.0.1.tar.gz.
File metadata
- Download URL: asyncio_keyed_lock-1.0.1.tar.gz
- Upload date:
- Size: 37.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e689e42a425e155bb493537eb11af6c11ff31f99a267b81ecee9345ce8ea7b13
|
|
| MD5 |
34e6a14cce16643fcc547e64cf1ea4e4
|
|
| BLAKE2b-256 |
bc0e43afb47e78685974127842a0d73370488c0af5f8c22d6d4b82e5489f3564
|
File details
Details for the file asyncio_keyed_lock-1.0.1-py3-none-any.whl.
File metadata
- Download URL: asyncio_keyed_lock-1.0.1-py3-none-any.whl
- Upload date:
- Size: 11.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
826a4b5d2b2c22e9612bca54dc97b38b31f7adaadae1a85b5df037dae547e426
|
|
| MD5 |
def36e9bd00e03a1b36bc0faa9ba4b79
|
|
| BLAKE2b-256 |
951a83c73f314025a2f9b0d263433ed664967eeb0308978eaea0e092cc2d7c6f
|