Skip to main content

Package to facilitate queueing of jobs using Redis

Project description

Atlas Consortia JobQ

PyPI version

Atlas Consortia JobQ is a high-performance, Redis-backed priority queue system designed for background task management.

Table of Contents

Installation

Install the package via pip:

pip install atlas-consortia-jobq

Note: Requires a running Redis instance. Refer to the Redis documentation for instructions on installing and running Redis

Quick Start

1. Initialize the Queue

from atlas_consortia_jobq import JobQueue

# Connect to your Redis instance
jq = JobQueue(
    redis_host='localhost',
    redis_port=6379,
    redis_db=0,
    redis_password=None
)

2. Enqueue a Job

Jobs require a function, an entity_id, and optional arguments.

  • job_id: A unique identifier generated for every specific job. This is created during the enqueing process and will be returned so the job may be referenced later.

  • entity_id: The unique identifier of the resource being processed (e.g., a UUID). This prevents the same resource from being queued multiple times.

def my_task(arg1, keyword_arg="default"):
    print(f"Processing: {arg1}, {keyword_arg}")

job_id = jq.enqueue(
    task_func=my_task,
    entity_id="unique_id_123",
    args=["value1"],
    kwargs={"keyword_arg": "value2"},
    priority=2
)

Worker Management

To process jobs, you must start worker subprocesses. This is typically done in a dedicated entry-point script.

from atlas_consortia_jobq import JobQueue

if __name__ == "__main__":
    jq = JobQueue(redis_host='localhost')
    
    # This call spawns 4 worker subprocesses
    jq.start_workers(num_workers=4)

Method Reference

enqueue(task_func, entity_id, args=None, kwargs=None, priority=1)

Adds a job to the queue.

  • If the entity_id is already queued, it updates the priority if the new priority is higher.

  • If the entity_id is currently being processed, it prevents duplicate enqueuing.

update_priority(identifier, new_priority)

Updates the priority of an existing job. The identifier can be a job_id or an entity_id.

get_status(identifier)

Returns a dictionary containing the job_id, position_in_queue, and priority. Here "identifier" can be either the job_id or the entity_id.

get_queue_status()

Returns an overview of the entire queue, including total job counts and a breakdown by priority level.

Features

  • Atomic Operations: Uses Lua scripting to ensure job enqueuing and popping are race-condition free.

  • entity_id Deduplication: Prevents multiple jobs for the same entity_id from cluttering the queue.

  • Priority Support: Supports three priority levels (1=Highest, 2=Medium, 3=Lowest).

  • Automatic Cleanup: Manages metadata and "processing" states automatically upon job completion.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atlas_consortia_jobq-0.1.1.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

atlas_consortia_jobq-0.1.1-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file atlas_consortia_jobq-0.1.1.tar.gz.

File metadata

  • Download URL: atlas_consortia_jobq-0.1.1.tar.gz
  • Upload date:
  • Size: 10.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for atlas_consortia_jobq-0.1.1.tar.gz
Algorithm Hash digest
SHA256 04496383bc58e6c12e9eb88f4f939446a7fa5cc7afeba2c9ce3c751bffe8b3a0
MD5 3efc117ec74f765e77ee756e4b9c56c2
BLAKE2b-256 c6b643a035065bef50df3c98df1a2a27c274d5faf168dfae06a73f0f2dd6958b

See more details on using hashes here.

File details

Details for the file atlas_consortia_jobq-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for atlas_consortia_jobq-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 dcc2c7dafcfa45a76bcd9c8b834e13c52b02f803e802969ee639d1f881619c0e
MD5 e7fe3c1bf766d1a1c21efa68ab6070ff
BLAKE2b-256 7c2f5f88ceb998bb58fb3d181e2fed8a0aebb688c86c69f116f51e3ea89b3474

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page