Async keyed leased queue with FIFO semantics and O(1) operations
Project description
leasedkeyq
Async Keyed Leased Queue - An asyncio-friendly in-memory queue combining FIFO semantics, dictionary-style keyed access, and lease-based exclusive processing.
Features
- FIFO Queue Semantics: Process items in order with
get() - Keyed Access: Target specific items with
take(key) - Lease-Based Processing: Exclusive access with explicit
ack/release - O(1) Operations: Constant-time enqueue, dequeue, and key-based removal
- Automatic Timeouts: Optional lease expiration with auto-retry
- Blocking Behavior: Async waiting for items or specific keys
- Type Safe: Full type hints with strict mypy compliance
- Zero Dependencies: Pure Python stdlib implementation
Installation
pip install leasedkeyq
Quick Start
import asyncio
from leasedkeyq import LeasedKeyQueue
async def main():
# Create queue with 30-second lease timeout
async with LeasedKeyQueue[str, dict](default_lease_timeout=30.0) as queue:
# Producer: add items
await queue.put("task-1", {"action": "process", "data": 123})
await queue.put("task-2", {"action": "send", "data": 456})
# Consumer: FIFO consumption
key, value, lease = await queue.get()
print(f"Processing {key}: {value}")
try:
# Do work...
await process(value)
# Success: acknowledge
await queue.ack(lease)
except Exception:
# Failure: release for retry
await queue.release(lease, requeue_front=True)
# Targeted consumption
key, value, lease = await queue.take("task-2")
print(f"Specifically got {key}: {value}")
await queue.ack(lease)
asyncio.run(main())
Core Concepts
States
Each key is in exactly one state:
- ABSENT: Not in queue
- AVAILABLE: Ready for consumption
- IN_FLIGHT: Leased to a consumer
API Overview
Producer API
await queue.put(key, value, if_in_flight="update") # update|reject|buffer
Consumer API
# FIFO consumption
key, value, lease = await queue.get(timeout=10.0, lease_timeout=30.0)
# Keyed consumption
key, value, lease = await queue.take("specific-key", timeout=10.0)
Lease Control
await queue.ack(lease) # Permanent removal
await queue.release(lease, requeue_front=True) # Retry
Introspection
value = await queue.peek("key")
has_key = await queue.contains("key")
keys = await queue.available_keys()
inflight = await queue.inflight_keys()
size = await queue.qsize()
In-Flight Policies
Control behavior when putting a key that's currently leased:
update(default): Update the in-flight valuereject: RaiseKeyAlreadyInFlightErrorbuffer: Enqueue a second copy to available
await queue.put("key", new_value, if_in_flight="update")
Lease Timeouts
Prevent stuck items with automatic lease expiration:
# Queue-wide default timeout
queue = LeasedKeyQueue[str, int](default_lease_timeout=30.0)
await queue.start()
# Per-lease override
key, value, lease = await queue.get(lease_timeout=60.0)
Expired leases are automatically released to the front of the queue for retry.
Complexity Guarantees
| Operation | Time Complexity |
|---|---|
put() |
O(1) |
get() |
O(1) |
take(key) |
O(1) |
ack() |
O(1) |
release() |
O(1) |
Achieved through intrusive doubly-linked list with direct node references.
Use Cases
- Task Queues: FIFO processing with retry on failure
- Job Scheduling: Target specific jobs while maintaining order
- Rate Limiting: Lease-based exclusive access prevents double-processing
- Event Processing: Handle events by ID with guaranteed exclusivity
- Workflow Engines: Track in-flight work with timeout-based recovery
Documentation
Development
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Type checking
mypy src/
# Linting
ruff check src/ tests/
License
MIT License - see LICENSE file for details.
Contributing
Contributions welcome! Please open an issue or PR on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file leasedkeyq-0.0.4.tar.gz.
File metadata
- Download URL: leasedkeyq-0.0.4.tar.gz
- Upload date:
- Size: 68.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b5469a1c99e83438dcc70d8a3a03a7f25afab22db3097d5123c36f96f03357d
|
|
| MD5 |
59990b8ea208ab9d3882050f227174be
|
|
| BLAKE2b-256 |
b16d88e873ee5afcaf11bf3c85964065f23de72a8dc6f0be597ffcea116441c0
|
File details
Details for the file leasedkeyq-0.0.4-py3-none-any.whl.
File metadata
- Download URL: leasedkeyq-0.0.4-py3-none-any.whl
- Upload date:
- Size: 10.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
424e0950e2dde09440abc31c633c966850a5740215eec559105a5be9a1a13276
|
|
| MD5 |
2b3616c1e7d48634ee7351eacabf39e5
|
|
| BLAKE2b-256 |
449a5172e9a3adaef9784301f3182c48b8ab969aeeb424c8e5e5f303003bd68a
|