Skip to main content

A DuckDB-based queue manager

Project description

Queuack Logo Queuack - A lightweight job queue

Queuack Mascot

Queuack (aka DuckQueue) is a pragmatic, single-node job queue that stores jobs in a DuckDB table. It’s built for dev/test and small-to-medium production workloads where you want durability without the operational overhead of Redis/RabbitMQ/Celery.

This README is developer-focused: practical examples, caveats, and the exact behavior around backpressure and testing so you don't waste time chasing surprises.


Highlights

  • ✅ Persistent queue backed by DuckDB (file or :memory:)
  • ✅ Claim / ack semantics with visibility timeout (stale-claim recovery)
  • ✅ Priorities, delayed jobs, retries, dead-letter view
  • ✅ Worker with optional thread-pool concurrency and backpressure control
  • ⚠️ Job payloads are pickled callables & args (see security notes)
  • Minimal runtime surface: duckdb + stdlib

Requirements

  • Python 3.11+ (tested on 3.12)
  • duckdb>=0.9.0
  • Optional: pytest / pytest-cov for tests

Install:

uv sync

Quick usage

File: example.py

import time
from queuack.core import DuckQueue, Worker, job

q = DuckQueue(":memory:")

def add(a, b):
    return a + b

# Enqueue directly
jid = q.enqueue(add, args=(2, 3))
print("job id:", jid)

# Decorator => add.delay()
@job(q, queue="default")
def greet(name):
    time.sleep(0.1)
    return f"hello {name}"

# Enqueue via decorator helper
greeting_job = greet.delay("Bruno")

# Claim + run manually (simple consumer)
job = q.claim()
if job:
    res = job.execute()
    q.ack(job.id, result=res)

# Or run a long-running worker (threaded)
w = Worker(q, queues=["default"], concurrency=2)
# w.run()  # blocks, use in a daemon/thread or run in your service

Backpressure behavior — exact semantics you need to know

Queuack has two thresholds:

  • warning level at 1000 pending jobs (configurable in code if you want)
  • hard limit at 10000 pending jobs → raises BackpressureError

Important detail (this matters for tests):

  • The code computes pending = existing_pending + delayed before inserting the job.
  • The check uses elif pending >= 1000: to emit a UserWarning and log a warning. That means the 1001st enqueue attempt (when pending is 1000 right before inserting) will trigger the UserWarning.

API reference (core)

DuckQueue(db_path="duckqueue.db", default_queue="default")

Create/open the DuckDB-backed queue.

  • db_path: DuckDB file path or ":memory:"
  • default_queue: default name for enqueues/claims

enqueue(func, args=(), kwargs=None, queue=None, priority=50, delay_seconds=0, max_attempts=3, timeout_seconds=300, check_backpressure=True) -> str

Serialize and insert a job. Returns job id (UUID).

  • If check_backpressure=True will:

    • emit a UserWarning and log if pending >= 1000,
    • raise BackpressureError if pending > 10000.

enqueue_batch(jobs: List[(func, args, kwargs)], queue=None, priority=50, max_attempts=3) -> List[str]

Insert many jobs in one transaction.

claim(queue=None, worker_id=None, claim_timeout=300) -> Optional[Job]

Atomically claim the next eligible job and return a Job object. Implements stale-claim recovery if claimed_at older than claim_timeout.

ack(job_id, result=None, error=None)

Acknowledge completion. If error provided and attempts < max_attempts, the job is requeued; otherwise moved to failed.

nack(job_id, requeue=True)

Negative acknowledgement — default requeues.

stats(queue=None) -> dict

Return counts by status (pending, claimed, done, failed, delayed).

get_job(job_id), get_result(job_id), list_dead_letters(limit=100), purge(...)


Worker

Worker(queue, queues=None, worker_id=None, concurrency=1, max_jobs_in_flight=None)

  • queues accepts ["default"] or [("emails", 100), ("reports", 50)] to set claim order by priority.
  • concurrency uses threads (ThreadPoolExecutor). CPU-bound jobs will be limited by the GIL — use processes externally if needed.
  • max_jobs_in_flight defaults to concurrency * 2 and is used for local backpressure (stop claiming when too many jobs in flight).

Note: Worker.run() registers signal handlers if run from the main thread (SIGINT / SIGTERM).


Security & portability caveats

  • Jobs serialize callables and args with pickle. This:

    • is not safe for untrusted input (pickle can execute arbitrary code on load),
    • makes job payloads non-portable across refactors (if function location or signature changes, old pickles may fail).
  • If you need cross-language portability or safe deserialization, switch to JSON-serializable payloads or an external blob store + small metadata in DuckDB.


Performance & scaling notes

  • DuckDB works well for read-heavy analytical workloads — it handles our atomic updates fine for single-node or low-concurrency setups.
  • Not intended for high-throughput broker-style workloads (millions of messages/s). For moderate use (thousands to tens of thousands of jobs), it’s fine.
  • For heavy CPU jobs, run workers in processes (spawn separate Python processes each hosting a Worker instance) or offload to a process pool.

Tests

Use pytest. Example:

pytest -q

Roadmap / Ideas

  • Optional JSON-serializable job payload mode
  • Better instrumentation (prometheus metrics hooks)
  • Process pool worker variant for CPU-bound workloads
  • Web UI for inspections & replaying dead letters

Contributing

Pull requests welcome. Keep changes small and include tests. If you change serialization behavior, add migration guidance.

Run test suite locally:

uv venv
source .venv/bin/activate
uv sync
pytest -q --cov=queuack

License

MIT © 2025 Bruno Peixoto

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuack-0.1.2.tar.gz (62.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuack-0.1.2-py3-none-any.whl (31.3 kB view details)

Uploaded Python 3

File details

Details for the file queuack-0.1.2.tar.gz.

File metadata

  • Download URL: queuack-0.1.2.tar.gz
  • Upload date:
  • Size: 62.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.2

File hashes

Hashes for queuack-0.1.2.tar.gz
Algorithm Hash digest
SHA256 aaed3966f11a2ba9d3e046dd682053f194a47cfc14b93fb8691bac4436004d8b
MD5 a40df2b11f5c2d636f3217cf905ba5ee
BLAKE2b-256 06761e528dbfd7ace5b8fc4fa718af409491b5a79db31fbaec8f2ebe82f29560

See more details on using hashes here.

File details

Details for the file queuack-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: queuack-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 31.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.2

File hashes

Hashes for queuack-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8cefcebc0a8718cc8f581fc5b3285dfeca3c0f0a6d95fa1e0a159e547562b21a
MD5 9a685379b2b61982e445530e338ed7cb
BLAKE2b-256 c109d8d77551f4280f39a32a4fe0d25c3f9aa3e45a669ebd78da2199842bfe37

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page