Queueing data structure on top of a MongoDB collection
Project description
mongo-taskqueue
Queueing data structure on top of a MongoDB collection.
Overview
mongo-taskqueue stores tasks in a single MongoDB collection and exposes a small API for enqueueing, leasing, and completing work. It is designed for simple worker loops that need scheduling, retries, and deduplication without an extra queue service.
Features
- Single-collection queue with priority ordering
- Delayed enqueue and scheduled execution
- Visibility timeouts with optional heartbeat extension
- Retry tracking with optional exponential backoff
- Dedupe keys (string) to avoid duplicate in-flight tasks
- Global and per-key rate limiting
- Dead-letter collection for discarded tasks
- Sync and async APIs
- CLI for common operations
Install
pip install mongo-taskqueue
Async support:
pip install "mongo-taskqueue[async]"
Quick start
from mongotq import get_task_queue
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
)
queue.append({"job": "email", "to": "alice"})
task = queue.next()
if task:
try:
# do work
queue.on_success(task)
except Exception as exc:
queue.on_failure(task, error_message=str(exc))
Configuration
get_task_queue(...) accepts:
ttl: seconds a pending task can live before being marked failed (-1for no expiry)max_retries: maximum failure count before discarddiscard_strategy:keeporremovevisibility_timeout: lease duration for pending tasksretry_backoff_baseandretry_backoff_max: exponential backoff controlsdead_letter_collection: where discarded tasks are copied when usingdiscard_strategy="remove"meta_collection: metadata collection for rate limitsrate_limit_per_second: global and per-key dequeue limitsensure_indexes: create indexes if missing
Task lifecycle
Statuses are available as constants:
STATUS_NEW, STATUS_PENDING, STATUS_FAILED, STATUS_SUCCESSFUL.
Common transitions:
next()leases the next available task and marks itpendingon_success(task)marks itsuccessfulon_failure(task)increments retries and may reschedule or failon_retry(task)releases a leased task back tonewrefresh()requeues expired leases, expires pending tasks (TTL), and discards tasks over retry limits
Delayed tasks
queue.append({"job": "later"}, delay_seconds=30)
queue.append({"job": "at-time"}, scheduled_at=1710000000.0)
Dedupe keys
Dedupe keys are indexed for string values only.
queue.append({"job": "once"}, dedupe_key="job-123")
Duplicate inserts return False.
Rate limiting
Set rate_limit_per_second to throttle dequeue frequency. When a task has a
rateLimitKey, the per-key limit is enforced in addition to the global limit.
If a per-key limit is hit after a task is leased, the task is released and
scheduled slightly in the future.
Dead-letter collection
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
max_retries=1,
discard_strategy="remove",
dead_letter_collection="jobs_dead",
)
Discarded tasks are copied to the dead-letter collection during refresh().
Async usage
from mongotq import AsyncTaskQueue
queue = AsyncTaskQueue(
database="app",
collection="jobs",
host="mongodb://localhost:27017",
ttl=-1,
)
await queue.append({"job": "async"})
task = await queue.next()
if task:
await queue.on_success(task)
CLI
The CLI is installed as mongotq.
Environment variables:
MONGOTQ_HOST(orMONGO_URI)MONGOTQ_DATABASEMONGOTQ_COLLECTIONMONGOTQ_TTL
Example:
mongotq \
--host mongodb://localhost:27017 \
--database app \
--collection jobs \
append '{"job": "email"}'
Common commands:
append,next,pop,refresh,size,statushead,tail,purge,requeue-failedheartbeat,dead-letter-count,resolve-anomalies
Testing
Tests are designed to run in GitHub Actions. Local runs are skipped unless
GITHUB_ACTIONS=true is set.
License
MIT. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mongo_taskqueue-1.0.4.tar.gz.
File metadata
- Download URL: mongo_taskqueue-1.0.4.tar.gz
- Upload date:
- Size: 19.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aedcbeda38e031baa361c7386d24c0974c822a0cdf588563d513283b76658d4a
|
|
| MD5 |
4787fb25088f57bc8574daade5d4ce76
|
|
| BLAKE2b-256 |
c4ea023ee2a61de41d4c14a1082804f050be7abd532287e59f460f210ad127c4
|
File details
Details for the file mongo_taskqueue-1.0.4-py3-none-any.whl.
File metadata
- Download URL: mongo_taskqueue-1.0.4-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
579bdca9ae154f6546f83475cab7fcef7f8ea84ac4f5e6cf0c4ffb238d2df67d
|
|
| MD5 |
d1f7961d28c1fe807e59c83c0ded079d
|
|
| BLAKE2b-256 |
d4ee1634522f7fca38bfd95fbfb8600f649b43372f0fcbfa3a2259b96c577bc0
|