Skip to main content

Queue service built on top of mongo.

Project description

mongo_queue

Task queue built on mongo with channels and unique job id.

WebsiteautobotAI Automation Platform

Inspired from kapilt/mongoqueue

Change Log:

v0.1.0

  • Added optional inc_attempt parameter for job.release. This will allow user to choose if they want to increment the attempt when releasing a job.

v0.0.9

  • Added method find_by_id to find a job by it's id.

v0.0.7

  • Added mongo backward compatibility. The aggregate function was using lookup which is only available after Mongo 3.6 (Not avaialble in the DocumentDB), Modified lookup to use old syntax.

v0.0.6

  • Added sleep and state feature while releasing a job. This provides a way to not pickup job until provided seconds and store state for long running jobs.

v0.0.5

  • Added depends_on feature. You can create dependency between jobs by supplying depends_on[] with previously created job ids.

v0.0.3

  • Added unique index with job_id and channel. This is to make sure that the same job is not added multiple times. If not job id provided an unique id generated by default.

Usage

Install the package.

pip install mongo_queue

Usage Example:

  • Create Queue Object
from mongo_queue.queue import Queue
from pymongo import MongoClient

queue = Queue(MongoClient('localhost', 27017).task_queue, consumer_id="consumer-1", timeout=300, max_attempts=3)
  • Add task to queue default channel
queue.put({"task_id": 1})
  • Add task to queue with priority to default channel
queue.put({"task_id": 1}, priority=1)
  • Add task to queue in a specific channel
queue.put({"task_id": 1}, priority=1, channel="channel_1")
  • Add task to queue with unique job_id
queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
  • Add task with dependency
job1 = queue.put({"task_id": 1}, priority=1, channel="channel_1", job_id="x_job")
job2 = queue.put({"task_id": 2}, priority=1, channel="channel_1", job_id="x_job", depends_on=[job1])
  • Get the next job to be executed from the default channel
job = queue.next()
  • Get the next job to be executed from a specific channel
job = queue.next(channel="channel_1")
  • Update job progress for long-running jobs
job.progress(count=10)
  • Put the job back in queue, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.
  • You can also set state and sleep while releaseing a job
  • sleep in seconds. The job will not be picked up again till the sleep time expires.
  • state you can store state in the job for long running jobs.
job.release()
# or
job.release(sleep=10, state={"some": "state"})
  • Put the job back in queue with error, this will be picked up again later, this will update attempts after max attempts the job will not be picked up again.
job.error("Some error occured")
  • Complete the job. This will delete job from the database.
job.complete()

Build Steps

# Setup venv of python version 3.6 and above
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
pip install wheel
pip install --upgrade twine
python setup.py sdist bdist_wheel
python -m twine upload --repository-url https://upload.pypi.org/legacy/ dist/*

Local Development and Testing

export MONGO_URI=mongodb+srv://username:pwd@mongourl/test?retryWrites=true&w=majority
cd mong_queue # Root directory of the package
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python -m unittest mongo_queue.test

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mongo_queue_service-0.1.0.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

mongo_queue_service-0.1.0-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file mongo_queue_service-0.1.0.tar.gz.

File metadata

  • Download URL: mongo_queue_service-0.1.0.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.13

File hashes

Hashes for mongo_queue_service-0.1.0.tar.gz
Algorithm Hash digest
SHA256 63043cf363ba804c4745f4d6042bb5186bbfd04b8dd37a002078c3c01c18ff4e
MD5 c760173458179177f8c5998c3211834e
BLAKE2b-256 b8bce50e9629f9aff5055d8378d357b8b443c921ac7d2b9b189b57f8b405a014

See more details on using hashes here.

File details

Details for the file mongo_queue_service-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mongo_queue_service-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 615e168ed6ac42f0a2636675a56bbfadb34397f64f45f603149fae19e69c5a4a
MD5 477c69eab0e50ca099a48969716459bd
BLAKE2b-256 cae1da0a95017dd5db460f48125031dfac844676a425cc38ab32e84fa23feea3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page