A DuckDB-based queue manager
Project description
Queuack — lightweight DuckDB-backed job queue 🦆
Queuack (aka DuckQueue) is a pragmatic, single-node job queue that stores jobs in a DuckDB table. It’s built for dev/test and small-to-medium production workloads where you want durability without the operational overhead of Redis/RabbitMQ/Celery.
This README is developer-focused: practical examples, caveats, and the exact behavior around backpressure and testing so you don't waste time chasing surprises.
Highlights
- ✅ Persistent queue backed by DuckDB (file or
:memory:) - ✅ Claim / ack semantics with visibility timeout (stale-claim recovery)
- ✅ Priorities, delayed jobs, retries, dead-letter view
- ✅ Worker with optional thread-pool concurrency and backpressure control
- ⚠️ Job payloads are pickled callables & args (see security notes)
- Minimal runtime surface:
duckdb+ stdlib
Requirements
- Python 3.11+ (tested on 3.12)
duckdb>=0.9.0- Optional:
pytest/pytest-covfor tests
Install:
uv sync
Quick usage
File: example.py
import time
from queuack.core import DuckQueue, Worker, job
q = DuckQueue(":memory:")
def add(a, b):
return a + b
# Enqueue directly
jid = q.enqueue(add, args=(2, 3))
print("job id:", jid)
# Decorator => add.delay()
@job(q, queue="default")
def greet(name):
time.sleep(0.1)
return f"hello {name}"
# Enqueue via decorator helper
greeting_job = greet.delay("Bruno")
# Claim + run manually (simple consumer)
job = q.claim()
if job:
res = job.execute()
q.ack(job.id, result=res)
# Or run a long-running worker (threaded)
w = Worker(q, queues=["default"], concurrency=2)
# w.run() # blocks, use in a daemon/thread or run in your service
Backpressure behavior — exact semantics you need to know
Queuack has two thresholds:
- warning level at 1000 pending jobs (configurable in code if you want)
- hard limit at 10000 pending jobs → raises
BackpressureError
Important detail (this matters for tests):
- The code computes
pending = existing_pending + delayedbefore inserting the job. - The check uses
elif pending >= 1000:to emit aUserWarningand log a warning. That means the 1001st enqueue attempt (whenpendingis 1000 right before inserting) will trigger theUserWarning.
API reference (core)
DuckQueue(db_path="duckqueue.db", default_queue="default")
Create/open the DuckDB-backed queue.
db_path: DuckDB file path or":memory:"default_queue: default name for enqueues/claims
enqueue(func, args=(), kwargs=None, queue=None, priority=50, delay_seconds=0, max_attempts=3, timeout_seconds=300, check_backpressure=True) -> str
Serialize and insert a job. Returns job id (UUID).
-
If
check_backpressure=Truewill:- emit a
UserWarningand log if pending >= 1000, - raise
BackpressureErrorif pending > 10000.
- emit a
enqueue_batch(jobs: List[(func, args, kwargs)], queue=None, priority=50, max_attempts=3) -> List[str]
Insert many jobs in one transaction.
claim(queue=None, worker_id=None, claim_timeout=300) -> Optional[Job]
Atomically claim the next eligible job and return a Job object. Implements stale-claim recovery if claimed_at older than claim_timeout.
ack(job_id, result=None, error=None)
Acknowledge completion. If error provided and attempts < max_attempts, the job is requeued; otherwise moved to failed.
nack(job_id, requeue=True)
Negative acknowledgement — default requeues.
stats(queue=None) -> dict
Return counts by status (pending, claimed, done, failed, delayed).
get_job(job_id), get_result(job_id), list_dead_letters(limit=100), purge(...)
Worker
Worker(queue, queues=None, worker_id=None, concurrency=1, max_jobs_in_flight=None)
queuesaccepts["default"]or[("emails", 100), ("reports", 50)]to set claim order by priority.concurrencyuses threads (ThreadPoolExecutor). CPU-bound jobs will be limited by the GIL — use processes externally if needed.max_jobs_in_flightdefaults toconcurrency * 2and is used for local backpressure (stop claiming when too many jobs in flight).
Note: Worker.run() registers signal handlers if run from the main thread (SIGINT / SIGTERM).
Security & portability caveats
-
Jobs serialize callables and args with
pickle. This:- is not safe for untrusted input (pickle can execute arbitrary code on load),
- makes job payloads non-portable across refactors (if function location or signature changes, old pickles may fail).
-
If you need cross-language portability or safe deserialization, switch to JSON-serializable payloads or an external blob store + small metadata in DuckDB.
Performance & scaling notes
- DuckDB works well for read-heavy analytical workloads — it handles our atomic updates fine for single-node or low-concurrency setups.
- Not intended for high-throughput broker-style workloads (millions of messages/s). For moderate use (thousands to tens of thousands of jobs), it’s fine.
- For heavy CPU jobs, run workers in processes (spawn separate Python processes each hosting a Worker instance) or offload to a process pool.
Tests
Use pytest. Example:
pytest -q
Roadmap / Ideas
- Optional JSON-serializable job payload mode
- Better instrumentation (prometheus metrics hooks)
- Process pool worker variant for CPU-bound workloads
- Web UI for inspections & replaying dead letters
Contributing
Pull requests welcome. Keep changes small and include tests. If you change serialization behavior, add migration guidance.
Run test suite locally:
uv venv
source .venv/bin/activate
uv sync
pytest -q --cov=queuack
License
MIT © 2025 Bruno Peixoto
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queuack-0.1.1.tar.gz.
File metadata
- Download URL: queuack-0.1.1.tar.gz
- Upload date:
- Size: 22.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9a9571f5a6598327dbb33b8eafe820428d2eab7bce4351e9fbe5104b4aa7b1fa
|
|
| MD5 |
21bdd0a2a89ae42b055c17ebf7c66c8f
|
|
| BLAKE2b-256 |
8fe471c71f1db3c5f0dbec5ffc022b5de4b82b84299c30da48eed8ca880811ef
|
File details
Details for the file queuack-0.1.1-py3-none-any.whl.
File metadata
- Download URL: queuack-0.1.1-py3-none-any.whl
- Upload date:
- Size: 12.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3a1ba1a5075627d5a5ca4e5e04398e6f77329d2b2599d0dba653ae0cd5aced7
|
|
| MD5 |
8ca4a72cbaaf7a954fa596c45c484efe
|
|
| BLAKE2b-256 |
001238b10f50d3a82bc827d3379919d57ee28c60f8b4d25dc6b443741c637dc2
|