Skip to main content

A DuckDB-based queue manager

Project description

Queuack Logo Queuack — lightweight DuckDB-backed job queue 🦆

Queuack Mascot

Queuack (aka DuckQueue) is a pragmatic, single-node job queue that stores jobs in a DuckDB table. It’s built for dev/test and small-to-medium production workloads where you want durability without the operational overhead of Redis/RabbitMQ/Celery.

This README is developer-focused: practical examples, caveats, and the exact behavior around backpressure and testing so you don't waste time chasing surprises.


Highlights

  • ✅ Persistent queue backed by DuckDB (file or :memory:)
  • ✅ Claim / ack semantics with visibility timeout (stale-claim recovery)
  • ✅ Priorities, delayed jobs, retries, dead-letter view
  • ✅ Worker with optional thread-pool concurrency and backpressure control
  • ⚠️ Job payloads are pickled callables & args (see security notes)
  • Minimal runtime surface: duckdb + stdlib

Requirements

  • Python 3.11+ (tested on 3.12)
  • duckdb>=0.9.0
  • Optional: pytest / pytest-cov for tests

Install:

uv sync

Quick usage

File: example.py

import time
from queuack.core import DuckQueue, Worker, job

q = DuckQueue(":memory:")

def add(a, b):
    return a + b

# Enqueue directly
jid = q.enqueue(add, args=(2, 3))
print("job id:", jid)

# Decorator => add.delay()
@job(q, queue="default")
def greet(name):
    time.sleep(0.1)
    return f"hello {name}"

# Enqueue via decorator helper
greeting_job = greet.delay("Bruno")

# Claim + run manually (simple consumer)
job = q.claim()
if job:
    res = job.execute()
    q.ack(job.id, result=res)

# Or run a long-running worker (threaded)
w = Worker(q, queues=["default"], concurrency=2)
# w.run()  # blocks, use in a daemon/thread or run in your service

Backpressure behavior — exact semantics you need to know

Queuack has two thresholds:

  • warning level at 1000 pending jobs (configurable in code if you want)
  • hard limit at 10000 pending jobs → raises BackpressureError

Important detail (this matters for tests):

  • The code computes pending = existing_pending + delayed before inserting the job.
  • The check uses elif pending >= 1000: to emit a UserWarning and log a warning. That means the 1001st enqueue attempt (when pending is 1000 right before inserting) will trigger the UserWarning.

API reference (core)

DuckQueue(db_path="duckqueue.db", default_queue="default")

Create/open the DuckDB-backed queue.

  • db_path: DuckDB file path or ":memory:"
  • default_queue: default name for enqueues/claims

enqueue(func, args=(), kwargs=None, queue=None, priority=50, delay_seconds=0, max_attempts=3, timeout_seconds=300, check_backpressure=True) -> str

Serialize and insert a job. Returns job id (UUID).

  • If check_backpressure=True will:

    • emit a UserWarning and log if pending >= 1000,
    • raise BackpressureError if pending > 10000.

enqueue_batch(jobs: List[(func, args, kwargs)], queue=None, priority=50, max_attempts=3) -> List[str]

Insert many jobs in one transaction.

claim(queue=None, worker_id=None, claim_timeout=300) -> Optional[Job]

Atomically claim the next eligible job and return a Job object. Implements stale-claim recovery if claimed_at older than claim_timeout.

ack(job_id, result=None, error=None)

Acknowledge completion. If error provided and attempts < max_attempts, the job is requeued; otherwise moved to failed.

nack(job_id, requeue=True)

Negative acknowledgement — default requeues.

stats(queue=None) -> dict

Return counts by status (pending, claimed, done, failed, delayed).

get_job(job_id), get_result(job_id), list_dead_letters(limit=100), purge(...)


Worker

Worker(queue, queues=None, worker_id=None, concurrency=1, max_jobs_in_flight=None)

  • queues accepts ["default"] or [("emails", 100), ("reports", 50)] to set claim order by priority.
  • concurrency uses threads (ThreadPoolExecutor). CPU-bound jobs will be limited by the GIL — use processes externally if needed.
  • max_jobs_in_flight defaults to concurrency * 2 and is used for local backpressure (stop claiming when too many jobs in flight).

Note: Worker.run() registers signal handlers if run from the main thread (SIGINT / SIGTERM).


Security & portability caveats

  • Jobs serialize callables and args with pickle. This:

    • is not safe for untrusted input (pickle can execute arbitrary code on load),
    • makes job payloads non-portable across refactors (if function location or signature changes, old pickles may fail).
  • If you need cross-language portability or safe deserialization, switch to JSON-serializable payloads or an external blob store + small metadata in DuckDB.


Performance & scaling notes

  • DuckDB works well for read-heavy analytical workloads — it handles our atomic updates fine for single-node or low-concurrency setups.
  • Not intended for high-throughput broker-style workloads (millions of messages/s). For moderate use (thousands to tens of thousands of jobs), it’s fine.
  • For heavy CPU jobs, run workers in processes (spawn separate Python processes each hosting a Worker instance) or offload to a process pool.

Tests

Use pytest. Example:

pytest -q

Roadmap / Ideas

  • Optional JSON-serializable job payload mode
  • Better instrumentation (prometheus metrics hooks)
  • Process pool worker variant for CPU-bound workloads
  • Web UI for inspections & replaying dead letters

Contributing

Pull requests welcome. Keep changes small and include tests. If you change serialization behavior, add migration guidance.

Run test suite locally:

uv venv
source .venv/bin/activate
uv sync
pytest -q --cov=queuack

License

MIT © 2025 Bruno Peixoto

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuack-0.1.1.tar.gz (22.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuack-0.1.1-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file queuack-0.1.1.tar.gz.

File metadata

  • Download URL: queuack-0.1.1.tar.gz
  • Upload date:
  • Size: 22.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.2

File hashes

Hashes for queuack-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9a9571f5a6598327dbb33b8eafe820428d2eab7bce4351e9fbe5104b4aa7b1fa
MD5 21bdd0a2a89ae42b055c17ebf7c66c8f
BLAKE2b-256 8fe471c71f1db3c5f0dbec5ffc022b5de4b82b84299c30da48eed8ca880811ef

See more details on using hashes here.

File details

Details for the file queuack-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: queuack-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.2

File hashes

Hashes for queuack-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d3a1ba1a5075627d5a5ca4e5e04398e6f77329d2b2599d0dba653ae0cd5aced7
MD5 8ca4a72cbaaf7a954fa596c45c484efe
BLAKE2b-256 001238b10f50d3a82bc827d3379919d57ee28c60f8b4d25dc6b443741c637dc2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page