Skip to main content

Async keyed leased queue with FIFO semantics and O(1) operations

Project description

leasedkeyq

Async Keyed Leased Queue - An asyncio-friendly in-memory queue combining FIFO semantics, dictionary-style keyed access, and lease-based exclusive processing.

Python Version PyPI Version License

Features

  • FIFO Queue Semantics: Process items in order with get()
  • Keyed Access: Target specific items with take(key)
  • Lease-Based Processing: Exclusive access with explicit ack/release
  • O(1) Operations: Constant-time enqueue, dequeue, and key-based removal
  • Automatic Timeouts: Optional lease expiration with auto-retry
  • Blocking Behavior: Async waiting for items or specific keys
  • Type Safe: Full type hints with strict mypy compliance
  • Zero Dependencies: Pure Python stdlib implementation

Installation

pip install leasedkeyq

Quick Start

import asyncio
from leasedkeyq import LeasedKeyQueue

async def main():
    # Create queue with 30-second lease timeout
    async with LeasedKeyQueue[str, dict](default_lease_timeout=30.0) as queue:
        # Producer: add items
        await queue.put("task-1", {"action": "process", "data": 123})
        await queue.put("task-2", {"action": "send", "data": 456})

        # Consumer: FIFO consumption
        key, value, lease = await queue.get()
        print(f"Processing {key}: {value}")

        try:
            # Do work...
            await process(value)
            # Success: acknowledge
            await queue.ack(lease)
        except Exception:
            # Failure: release for retry
            await queue.release(lease, requeue_front=True)

        # Targeted consumption
        key, value, lease = await queue.take("task-2")
        print(f"Specifically got {key}: {value}")
        await queue.ack(lease)

asyncio.run(main())

Core Concepts

States

Each key is in exactly one state:

  • ABSENT: Not in queue
  • AVAILABLE: Ready for consumption
  • IN_FLIGHT: Leased to a consumer

API Overview

Producer API

await queue.put(key, value, if_in_flight="update")  # update|reject|buffer

Consumer API

# FIFO consumption
key, value, lease = await queue.get(timeout=10.0, lease_timeout=30.0)

# Keyed consumption
key, value, lease = await queue.take("specific-key", timeout=10.0)

Lease Control

await queue.ack(lease)                              # Permanent removal
await queue.release(lease, requeue_front=True)      # Retry

Introspection

value = await queue.peek("key")
has_key = await queue.contains("key")
keys = await queue.available_keys()
inflight = await queue.inflight_keys()
size = await queue.qsize()

In-Flight Policies

Control behavior when putting a key that's currently leased:

  • update (default): Update the in-flight value
  • reject: Raise KeyAlreadyInFlightError
  • buffer: Enqueue a second copy to available
await queue.put("key", new_value, if_in_flight="update")

Lease Timeouts

Prevent stuck items with automatic lease expiration:

# Queue-wide default timeout
queue = LeasedKeyQueue[str, int](default_lease_timeout=30.0)
await queue.start()

# Per-lease override
key, value, lease = await queue.get(lease_timeout=60.0)

Expired leases are automatically released to the front of the queue for retry.

Complexity Guarantees

Operation Time Complexity
put() O(1)
get() O(1)
take(key) O(1)
ack() O(1)
release() O(1)

Achieved through intrusive doubly-linked list with direct node references.

Use Cases

  • Task Queues: FIFO processing with retry on failure
  • Job Scheduling: Target specific jobs while maintaining order
  • Rate Limiting: Lease-based exclusive access prevents double-processing
  • Event Processing: Handle events by ID with guaranteed exclusivity
  • Workflow Engines: Track in-flight work with timeout-based recovery

Documentation

Development

# Install with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/

# Linting
ruff check src/ tests/

License

MIT License - see LICENSE file for details.

Contributing

Contributions welcome! Please open an issue or PR on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

leasedkeyq-0.0.4.tar.gz (68.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

leasedkeyq-0.0.4-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file leasedkeyq-0.0.4.tar.gz.

File metadata

  • Download URL: leasedkeyq-0.0.4.tar.gz
  • Upload date:
  • Size: 68.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for leasedkeyq-0.0.4.tar.gz
Algorithm Hash digest
SHA256 9b5469a1c99e83438dcc70d8a3a03a7f25afab22db3097d5123c36f96f03357d
MD5 59990b8ea208ab9d3882050f227174be
BLAKE2b-256 b16d88e873ee5afcaf11bf3c85964065f23de72a8dc6f0be597ffcea116441c0

See more details on using hashes here.

File details

Details for the file leasedkeyq-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: leasedkeyq-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for leasedkeyq-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 424e0950e2dde09440abc31c633c966850a5740215eec559105a5be9a1a13276
MD5 2b3616c1e7d48634ee7351eacabf39e5
BLAKE2b-256 449a5172e9a3adaef9784301f3182c48b8ab969aeeb424c8e5e5f303003bd68a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page