Skip to main content

A Python implementation of the queuer system - a job queuing and processing system with PostgreSQL backend

Project description

queuerPy

Python License Coverage

Python port of the queuer package - a queueing system based on PostgreSQL.

💡 Goal of this package

This queuer is meant to be as easy as possible to use. No specific function signature (except for returning results or raising exceptions for error handling), easy setup and still fast.

The job table contains only queued, scheduled and running tasks. The ended jobs (succeeded, cancelled, failed) are moved to a job_archive table.


🛠️ Installation

PyPI Installation (Recommended)

pip install queuer-py

For development dependencies:

pip install queuer-py[dev]

Development Installation

To integrate the queuer package into your Python project for development:

# Clone the repository
git clone https://github.com/siherrmann/queuerPy.git
cd queuerPy

# Install in development mode
pip install -e .

# Or with development dependencies
pip install -e .[dev]

To use the package you also need a running postgres database with the timescaleDB extension. You can use the docker-compose.yml file in the example folder or start a Docker container with the timescale/timescaledb:latest-pg17 image.


🚀 Getting started

The full initialisation is (in the easiest case):

from queuer import new_queuer

# Create a new queuer instance
q = new_queuer("exampleWorker", 3)

# Add a task to the queuer
q.add_task(example_task)

# Start the queuer
q.start()

That's easy, right? Adding a job is just as easy:

# Add a job to the queue
job = q.add_job(example_task, 5, "12")
print(f"Job added: {job.rid}")

In the initialisation of the queuer the existence of the necessary database tables is checked and if they don't exist they get created. The database is configured with these environment variables:

export QUEUER_DB_HOST=localhost
export QUEUER_DB_PORT=5432
export QUEUER_DB_DATABASE=postgres
export QUEUER_DB_USERNAME=username
export QUEUER_DB_PASSWORD=password1234
export QUEUER_DB_SCHEMA=public

You can find a full example in the example folder.


new_queuer

new_queuer is a convenience constructor that creates a new Queuer instance using default database configuration derived from environment variables. It acts as a wrapper around new_queuer_with_db. The encryption key for the database is taken from the QUEUER_ENCRYPTION_KEY environment variable; if not provided, it defaults to unencrypted results.

new_queuer_with_db is the primary constructor for creating a new Queuer instance. It allows for explicit database configuration and encryption key specification, and initializes all necessary components, including database handlers, internal event listeners, and the worker.

def new_queuer(name: str, max_concurrency: int, *options: OnError) -> Queuer

def new_queuer_with_db(
    name: str, 
    max_concurrency: int, 
    encryption_key: str, 
    db_config: DatabaseConfiguration, 
    *options: OnError
) -> Queuer
  • name: A str identifier for this queuer instance.
  • max_concurrency: An int specifying the maximum number of jobs this queuer can process concurrently.
  • encryption_key: A str used for encrypting sensitive job data in the database. If empty, results will be stored unencrypted.
  • db_config: An optional DatabaseConfiguration. If None, the configuration will be loaded from environment variables.
  • options: Optional OnError configurations to apply to the worker.

This function performs the following setup:

  • Initializes a logger.
  • Sets up the database connection using the provided db_config or environment variables.
  • Creates JobDBHandler, WorkerDBHandler instances for database interactions.
  • Initializes internal notification listeners for job_insert, job_update, and job_delete events.
  • Creates and inserts a new Worker into the database based on the provided name, max_concurrency, and options.
  • If any critical error occurs during this initialization (e.g., database connection failure, worker creation error), the function will raise an exception.

start

The start method initiates the operational lifecycle of the Queuer. It sets up the main processing loops, initializes database listeners, and begins the job processing and polling loops.

def start(self) -> None

Upon calling start:

  • It performs a basic check to ensure internal listeners are initialized.
  • Database listeners are created to listen to job events (inserts, updates, deletes) via PostgreSQL NOTIFY/LISTEN.
  • It starts a poller to periodically poll the database for new jobs to process.
  • It starts a heartbeat ticker to keep the worker status updated.
  • The method returns immediately after starting all background processes.

The method includes proper error handling and will raise exceptions if the queuer is not properly initialized or if there's an error creating the database listeners.


stop

The stop method gracefully shuts down the Queuer instance, releasing resources and ensuring ongoing operations are properly concluded.

def stop(self) -> None

The stop method cancels all jobs, closes database listeners, and cleans up resources.


add_task

The add_task method registers a new job task with the queuer. A task is the actual function that will be executed when a job associated with it is processed.

def add_task(self, task: Callable) -> Task

def add_task_with_name(self, task: Callable, name: str) -> Task
  • task: A Callable representing the function that will serve as the job's executable logic. The queuer will automatically derive a name for this task based on its function name (e.g., my_task_function). The derived name must be unique if no name is given.
  • name: A str specifying the custom name for this task. This name must be unique within the queuer's tasks.

This method handles the registration of a task, making the worker able to pick up and execute a job of this task type. It also updates the worker's available tasks in the database. The task should be added before starting the queuer. If there's an issue during task creation or database update, an exception will be raised.


add_next_interval_func

The add_next_interval_func method registers a custom function that determines the next execution time for scheduled jobs. This is useful for implementing complex scheduling logic beyond simple fixed intervals.

def add_next_interval_func(self, nif: Callable) -> Worker

def add_next_interval_func_with_name(self, nif: Callable, name: str) -> Worker
  • nif: A Callable defining custom logic for calculating the next interval. The queuer will automatically derive a name for this function. The derived name must be unique if no name is given.
  • name: A str specifying the custom name for this NextIntervalFunc. This name must be unique within the queuer's NextIntervalFuncs.

This method adds the provided NextIntervalFunc to the queuer's available functions, making it usable for jobs with custom scheduling requirements. It updates the worker's configuration in the database.


Worker Options

The OnError class defines how a worker should handle errors when processing a job. This allows for configurable retry behavior.

class OnError:
    def __init__(
        self,
        timeout: float = 30.0,
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_backoff: str = RetryBackoff.NONE
    )
  • timeout: The maximum time (in seconds) allowed for a single attempt of a job. If the job exceeds this duration, it's considered to have timed out.
  • max_retries: The maximum number of times a job will be retried after a failure.
  • retry_delay: The initial delay (in seconds) before the first retry attempt. This delay can be modified by the retry_backoff strategy.
  • retry_backoff: Specifies the strategy used to increase the delay between subsequent retries.

Retry Backoff Strategies

The RetryBackoff enum defines the available strategies for increasing retry delays:

class RetryBackoff(str, Enum):
    NONE = "none"
    LINEAR = "linear"
    EXPONENTIAL = "exponential"
  • RETRY_BACKOFF_NONE: No backoff. The retry_delay remains constant for all retries.
  • RETRY_BACKOFF_LINEAR: The retry delay increases linearly with each attempt (e.g., delay, 2delay, 3delay).
  • RETRY_BACKOFF_EXPONENTIAL: The retry delay increases exponentially with each attempt (e.g., delay, delay2, delay2*2).

Job options

The Options class allows you to define specific behaviors for individual jobs, overriding default worker settings where applicable.

@dataclass
class Options:
    on_error: Optional[OnError] = None
    schedule: Optional[Schedule] = None
  • on_error: An optional OnError configuration that will override the worker's default error handling for this specific job. This allows you to define unique retry logic per job.
  • schedule: An optional Schedule configuration for jobs that need to be executed at recurring intervals.

OnError for jobs

The OnError class for jobs is identical to the one used for worker options, allowing granular control over error handling for individual jobs.

Schedule

The Schedule class is used to define recurring jobs.

@dataclass
class Schedule:
    start: datetime = None
    max_count: int = 1
    interval: Optional[timedelta] = None
    next_interval: Optional[str] = None
  • start: The initial time at which the scheduled job should first run.
  • max_count: The maximum number of times the job should be executed. A value of 0 indicates an indefinite number of repetitions (run forever).
  • interval: The duration between consecutive executions of the scheduled job.
  • next_interval: Function name of the NextIntervalFunc returning the time of the next execution of the scheduled job. Either interval or next_interval have to be set if the max_count is 0 or greater than 1.

Additional Methods

Job Management

# Add a single job
job = queuer.add_job(my_task, param1, param2)

# Add a job with custom options
from model.options import Options, OnError
options = Options(on_error=OnError(max_retries=5, timeout=60.0))
job = queuer.add_job_with_options(options, my_task, param1)

# Add multiple jobs as a batch
from model.batch_job import BatchJob
batch = [
    BatchJob(task=my_task, parameters=[1, "a"]),
    BatchJob(task=my_task, parameters=[2, "b"])
]
queuer.add_jobs(batch)

# Wait for a job to finish
finished_job = queuer.wait_for_job_finished(job.rid, timeout_seconds=30.0)

# Get job information
job_info = queuer.get_job(job.rid)
archived_job = queuer.get_job_ended(job.rid)  # For completed jobs

Job Queries

# Get jobs by status
running_jobs = queuer.get_jobs(status="RUNNING")
all_jobs = queuer.get_jobs()

# Get jobs by worker
worker_jobs = queuer.get_jobs_by_worker_rid(worker.rid)

⭐ Features

  • Async/Await Support: Full asyncio integration with threading fallbacks.
  • PostgreSQL NOTIFY/LISTEN: Real-time job notifications without polling overhead.
  • Batch Job Processing: Insert job batches efficiently.
  • Transaction Support: Insert jobs within transactions for rollback capability.
  • Error Recovery: Comprehensive error handling and retry mechanisms.
  • Multiple Workers: Multiple queuer instances can run across different services while maintaining job order and isolation.
  • Scheduled Jobs: Support for scheduled and periodic jobs with custom intervals.
  • Job Lifecycle Management: Easy functions to get jobs and workers, track job status.
  • Event Listeners: Listen for job updates, completion, and deletion events.
  • Retry Mechanisms: Configurable retry logic with different backoff strategies.
  • Custom Scheduling: Custom NextInterval functions for complex scheduling needs.
  • Heartbeat System: Worker heartbeat monitoring and automatic cleanup of stale workers.
  • Database Integration: Seamless PostgreSQL integration with automatic schema management.
  • Type Safety: Full type hints and dataclass-based models for better development experience.

🧪 Testing

# Run all tests
python -m pytest

# Run with coverage
python -m pytest --cov=. --cov-report=html

# Run specific test files
python -m pytest queuer_test.py -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuerpy-0.5.0.tar.gz (52.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuerpy-0.5.0-py3-none-any.whl (62.1 kB view details)

Uploaded Python 3

File details

Details for the file queuerpy-0.5.0.tar.gz.

File metadata

  • Download URL: queuerpy-0.5.0.tar.gz
  • Upload date:
  • Size: 52.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for queuerpy-0.5.0.tar.gz
Algorithm Hash digest
SHA256 f3433549e7df2f3374d7c7fd3cdfb1672aa14c3cc0986e38e4c094d052bb4276
MD5 823570bf46214fc362640bba3c96d1e0
BLAKE2b-256 d9d14234d67386e442fb06b50c4b7fd9a66e556782db59ffbddbe6704ee8b81a

See more details on using hashes here.

File details

Details for the file queuerpy-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: queuerpy-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 62.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.19

File hashes

Hashes for queuerpy-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a418d060a8a4af3d91d4475e7869d0f0df74a5de1dcb1fab9c74fd8a2f1aa939
MD5 e685f34cc2bb53678a4d6836c3d8fdeb
BLAKE2b-256 59a3534a86760f2c36672e2430f3fc07be9c7e8b80fc0b3e84c34be4ae5efd53

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page