Skip to main content

Queueing data structure on top of a MongoDB collection

Project description

mongo-taskqueue

PyPI Python Versions Downloads CI Docs License

Queueing data structure on top of a MongoDB collection.

Overview

mongo-taskqueue stores tasks in a single MongoDB collection and exposes a small API for enqueueing, leasing, and completing work. It is designed for simple worker loops that need scheduling, retries, and deduplication without an extra queue service.

Features

  • Single-collection queue with priority ordering
  • Delayed enqueue and scheduled execution
  • Visibility timeouts with optional heartbeat extension
  • Retry tracking with optional exponential backoff
  • Dedupe keys (string) to avoid duplicate in-flight tasks
  • Global and per-key rate limiting
  • Dead-letter collection for discarded tasks
  • Sync and async APIs
  • CLI for common operations

Install

pip install mongo-taskqueue

Async support:

pip install "mongo-taskqueue[async]"

Quick start

from mongotq import get_task_queue

queue = get_task_queue(
    database_name="app",
    collection_name="jobs",
    host="mongodb://localhost:27017",
    ttl=-1,
)

queue.append({"job": "email", "to": "alice"})

task = queue.next()
if task:
    try:
        # do work
        queue.on_success(task)
    except Exception as exc:
        queue.on_failure(task, error_message=str(exc))

Configuration

get_task_queue(...) accepts:

  • ttl: seconds a pending task can live before being marked failed (-1 for no expiry)
  • max_retries: maximum failure count before discard
  • discard_strategy: keep or remove
  • visibility_timeout: lease duration for pending tasks
  • retry_backoff_base and retry_backoff_max: exponential backoff controls
  • dead_letter_collection: where discarded tasks are copied when using discard_strategy="remove"
  • meta_collection: metadata collection for rate limits
  • rate_limit_per_second: global and per-key dequeue limits
  • ensure_indexes: create indexes if missing

Task lifecycle

Statuses are available as constants: STATUS_NEW, STATUS_PENDING, STATUS_FAILED, STATUS_SUCCESSFUL.

Common transitions:

  • next() leases the next available task and marks it pending
  • on_success(task) marks it successful
  • on_failure(task) increments retries and may reschedule or fail
  • on_retry(task) releases a leased task back to new
  • refresh() requeues expired leases, expires pending tasks (TTL), and discards tasks over retry limits

Delayed tasks

queue.append({"job": "later"}, delay_seconds=30)
queue.append({"job": "at-time"}, scheduled_at=1710000000.0)

Dedupe keys

Dedupe keys are indexed for string values only.

queue.append({"job": "once"}, dedupe_key="job-123")

Duplicate inserts return False.

Rate limiting

Set rate_limit_per_second to throttle dequeue frequency. When a task has a rateLimitKey, the per-key limit is enforced in addition to the global limit. If a per-key limit is hit after a task is leased, the task is released and scheduled slightly in the future.

Dead-letter collection

queue = get_task_queue(
    database_name="app",
    collection_name="jobs",
    host="mongodb://localhost:27017",
    ttl=-1,
    max_retries=1,
    discard_strategy="remove",
    dead_letter_collection="jobs_dead",
)

Discarded tasks are copied to the dead-letter collection during refresh().

Async usage

from mongotq import AsyncTaskQueue

queue = AsyncTaskQueue(
    database="app",
    collection="jobs",
    host="mongodb://localhost:27017",
    ttl=-1,
)

await queue.append({"job": "async"})

task = await queue.next()
if task:
    await queue.on_success(task)

CLI

The CLI is installed as mongotq.

Environment variables:

  • MONGOTQ_HOST (or MONGO_URI)
  • MONGOTQ_DATABASE
  • MONGOTQ_COLLECTION
  • MONGOTQ_TTL

Example:

mongotq \
  --host mongodb://localhost:27017 \
  --database app \
  --collection jobs \
  append '{"job": "email"}'

Common commands:

  • append, next, pop, refresh, size, status
  • head, tail, purge, requeue-failed
  • heartbeat, dead-letter-count, resolve-anomalies

Testing

Tests are designed to run in GitHub Actions. Local runs are skipped unless GITHUB_ACTIONS=true is set.

License

MIT. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_taskqueue-1.0.4.tar.gz (19.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mongo_taskqueue-1.0.4-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file mongo_taskqueue-1.0.4.tar.gz.

File metadata

  • Download URL: mongo_taskqueue-1.0.4.tar.gz
  • Upload date:
  • Size: 19.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mongo_taskqueue-1.0.4.tar.gz
Algorithm Hash digest
SHA256 aedcbeda38e031baa361c7386d24c0974c822a0cdf588563d513283b76658d4a
MD5 4787fb25088f57bc8574daade5d4ce76
BLAKE2b-256 c4ea023ee2a61de41d4c14a1082804f050be7abd532287e59f460f210ad127c4

See more details on using hashes here.

File details

Details for the file mongo_taskqueue-1.0.4-py3-none-any.whl.

File metadata

  • Download URL: mongo_taskqueue-1.0.4-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mongo_taskqueue-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 579bdca9ae154f6546f83475cab7fcef7f8ea84ac4f5e6cf0c4ffb238d2df67d
MD5 d1f7961d28c1fe807e59c83c0ded079d
BLAKE2b-256 d4ee1634522f7fca38bfd95fbfb8600f649b43372f0fcbfa3a2259b96c577bc0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page