A Python implementation of the queuer system - a job queuing and processing system with PostgreSQL backend
Project description
queuerPy
Python port of the queuer package - a queueing system based on PostgreSQL.
💡 Goal of this package
This queuer is meant to be as easy as possible to use. No specific function signature (except for returning results or raising exceptions for error handling), easy setup and still fast.
The job table contains only queued, scheduled and running tasks. The ended jobs (succeeded, cancelled, failed) are moved to a job_archive table.
🛠️ Installation
PyPI Installation (Recommended)
pip install queuerPy
To use the package you also need a running postgres database with the timescaleDB extension. You can use the docker-compose.yml file in the example folder or start a Docker container with the timescale/timescaledb:latest-pg17 image.
🚀 Getting started
The full initialisation is (in the easiest case):
from queuer import new_queuer
# Create a new queuer instance
q = new_queuer("exampleWorker", 3)
# Add a task to the queuer
q.add_task(example_task)
# Start the queuer
q.start()
You can also add a task with a decorator so the tasks don't have to be added to some central function:
@queuer.task(name="optionalName")
def example_task():
...
That's easy, right? Adding a job is just as easy:
# Add a job to the queue
job = q.add_job(example_task, 5, "12")
print(f"Job added: {job.rid}")
In the initialisation of the queuer the existence of the necessary database tables is checked and if they don't exist they get created. The database is configured with these environment variables:
export QUEUER_DB_HOST=localhost
export QUEUER_DB_PORT=5432
export QUEUER_DB_DATABASE=postgres
export QUEUER_DB_USERNAME=username
export QUEUER_DB_PASSWORD=password1234
export QUEUER_DB_SCHEMA=public
You can find a full example in the example folder.
new_queuer
new_queuer is a convenience constructor that creates a new Queuer instance using default database configuration derived from environment variables. It acts as a wrapper around new_queuer_with_db. The encryption key for the database is taken from the QUEUER_ENCRYPTION_KEY environment variable; if not provided, it defaults to unencrypted results.
new_queuer_with_db is the primary constructor for creating a new Queuer instance. It allows for explicit database configuration and encryption key specification, and initializes all necessary components, including database handlers, internal event listeners, and the worker.
def new_queuer(name: str, max_concurrency: int, *options: OnError) -> Queuer
def new_queuer_with_db(
name: str,
max_concurrency: int,
encryption_key: str,
db_config: DatabaseConfiguration,
*options: OnError
) -> Queuer
name: Astridentifier for this queuer instance.max_concurrency: Anintspecifying the maximum number of jobs this queuer can process concurrently.encryption_key: Astrused for encrypting sensitive job data in the database. If empty, results will be stored unencrypted.db_config: An optionalDatabaseConfiguration. If None, the configuration will be loaded from environment variables.options: OptionalOnErrorconfigurations to apply to the worker.
This function performs the following setup:
- Initializes a logger.
- Sets up the database connection using the provided
db_configor environment variables. - Creates
JobDBHandler,WorkerDBHandlerinstances for database interactions. - Initializes internal notification listeners for
job_insert,job_update, andjob_deleteevents. - Creates and inserts a new
Workerinto the database based on the providedname,max_concurrency, andoptions. - If any critical error occurs during this initialization (e.g., database connection failure, worker creation error), the function will raise an exception.
start
The start method initiates the operational lifecycle of the Queuer. It sets up the main processing loops, initializes database listeners, and begins the job processing and polling loops.
def start(self) -> None
Upon calling start:
- It performs a basic check to ensure internal listeners are initialized.
- Database listeners are created to listen to job events (inserts, updates, deletes) via PostgreSQL NOTIFY/LISTEN.
- It starts a poller to periodically poll the database for new jobs to process.
- It starts a heartbeat ticker to keep the worker status updated.
- The method returns immediately after starting all background processes.
The method includes proper error handling and will raise exceptions if the queuer is not properly initialized or if there's an error creating the database listeners.
stop
The stop method gracefully shuts down the Queuer instance, releasing resources and ensuring ongoing operations are properly concluded.
def stop(self) -> None
The stop method cancels all jobs, closes database listeners, and cleans up resources.
add_task
The add_task method registers a new job task with the queuer. A task is the actual function that will be executed when a job associated with it is processed.
def add_task(self, task: Callable) -> Task
def add_task_with_name(self, task: Callable, name: str) -> Task
task: ACallablerepresenting the function that will serve as the job's executable logic. The queuer will automatically derive a name for this task based on its function name (e.g.,my_task_function). The derived name must be unique if nonameis given.name: Astrspecifying the custom name for this task. This name must be unique within the queuer's tasks.
This method handles the registration of a task, making the worker able to pick up and execute a job of this task type. It also updates the worker's available tasks in the database. The task should be added before starting the queuer. If there's an issue during task creation or database update, an exception will be raised.
add_next_interval_func
The add_next_interval_func method registers a custom function that determines the next execution time for scheduled jobs. This is useful for implementing complex scheduling logic beyond simple fixed intervals.
def add_next_interval_func(self, nif: Callable) -> Worker
def add_next_interval_func_with_name(self, nif: Callable, name: str) -> Worker
nif: ACallabledefining custom logic for calculating the next interval. The queuer will automatically derive a name for this function. The derived name must be unique if nonameis given.name: Astrspecifying the custom name for this NextIntervalFunc. This name must be unique within the queuer's NextIntervalFuncs.
This method adds the provided NextIntervalFunc to the queuer's available functions, making it usable for jobs with custom scheduling requirements. It updates the worker's configuration in the database.
Worker Options
The OnError class defines how a worker should handle errors when processing a job. This allows for configurable retry behavior.
class OnError:
def __init__(
self,
timeout: float = 30.0,
max_retries: int = 3,
retry_delay: float = 1.0,
retry_backoff: str = RetryBackoff.NONE
)
timeout: The maximum time (in seconds) allowed for a single attempt of a job. If the job exceeds this duration, it's considered to have timed out.max_retries: The maximum number of times a job will be retried after a failure.retry_delay: The initial delay (in seconds) before the first retry attempt. This delay can be modified by theretry_backoffstrategy.retry_backoff: Specifies the strategy used to increase the delay between subsequent retries.
Retry Backoff Strategies
The RetryBackoff enum defines the available strategies for increasing retry delays:
class RetryBackoff(str, Enum):
NONE = "none"
LINEAR = "linear"
EXPONENTIAL = "exponential"
RETRY_BACKOFF_NONE: No backoff. The retry_delay remains constant for all retries.RETRY_BACKOFF_LINEAR: The retry delay increases linearly with each attempt (e.g., delay, 2delay, 3delay).RETRY_BACKOFF_EXPONENTIAL: The retry delay increases exponentially with each attempt (e.g., delay, delay2, delay2*2).
Job options
The Options class allows you to define specific behaviors for individual jobs, overriding default worker settings where applicable.
@dataclass
class Options:
on_error: Optional[OnError] = None
schedule: Optional[Schedule] = None
on_error: An optionalOnErrorconfiguration that will override the worker's default error handling for this specific job. This allows you to define unique retry logic per job.schedule: An optionalScheduleconfiguration for jobs that need to be executed at recurring intervals.
OnError for jobs
The OnError class for jobs is identical to the one used for worker options, allowing granular control over error handling for individual jobs.
Schedule
The Schedule class is used to define recurring jobs.
@dataclass
class Schedule:
start: datetime = None
max_count: int = 1
interval: Optional[timedelta] = None
next_interval: Optional[str] = None
start: The initial time at which the scheduled job should first run.max_count: The maximum number of times the job should be executed. A value of 0 indicates an indefinite number of repetitions (run forever).interval: The duration between consecutive executions of the scheduled job.next_interval: Function name of the NextIntervalFunc returning the time of the next execution of the scheduled job. Eitherintervalornext_intervalhave to be set if themax_countis 0 or greater than 1.
Additional Methods
Job Management
# Add a single job
job = queuer.add_job(my_task, param1, param2)
# Add a job with custom options
from model.options import Options, OnError
options = Options(on_error=OnError(max_retries=5, timeout=60.0))
job = queuer.add_job_with_options(options, my_task, param1)
# Add multiple jobs as a batch
from model.batch_job import BatchJob
batch = [
BatchJob(task=my_task, parameters=[1, "a"]),
BatchJob(task=my_task, parameters=[2, "b"])
]
queuer.add_jobs(batch)
# Wait for a job to finish
finished_job = queuer.wait_for_job_finished(job.rid, timeout_seconds=30.0)
# Get job information
job_info = queuer.get_job(job.rid)
archived_job = queuer.get_job_ended(job.rid) # For completed jobs
Job Queries
# Get jobs by status
running_jobs = queuer.get_jobs(status="RUNNING")
all_jobs = queuer.get_jobs()
# Get jobs by worker
worker_jobs = queuer.get_jobs_by_worker_rid(worker.rid)
⭐ Features
- Async/Await Support: Full asyncio integration with threading fallbacks.
- PostgreSQL NOTIFY/LISTEN: Real-time job notifications without polling overhead.
- Batch Job Processing: Insert job batches efficiently using PostgreSQL's
COPY FROMfeature. - Panic Recovery: Automatic recovery for all running jobs in case of unexpected failures.
- Error Handling: Comprehensive error handling by checking last output parameter for errors.
- Multiple Workers: Multiple queuer instances can run across different microservices while maintaining job start order and isolation.
- Scheduled Jobs: Support for scheduled and periodic jobs with custom intervals.
- Job Lifecycle Management: Easy functions to get jobs and workers, track job status.
- Event Listeners: Listen for job updates, completion, and deletion events (ended jobs).
- Job Completion Helpers: Helper functions to listen for specific finished jobs.
- Retry Mechanisms: Retry mechanism for ended jobs which creates a new job with the same parameters, with configurable retry logic and different backoff strategies.
- Custom Scheduling: Custom NextInterval functions to address custom needs for scheduling (e.g., scheduling with timezone offset).
- Master Worker Management: Automatic master worker setting retention and other central settings. Automatic switch to new master if old worker stops.
- Heartbeat System: Worker heartbeat monitoring and automatic stale worker detection and cancellation by the master.
- Encryption Support: Encryption support for sensitive job data stored in the database.
- Database Integration: Seamless PostgreSQL integration with automatic schema management.
- Type Safety: Full type hints and dataclass-based models for better development experience.
Note: Transactional job insert is the only feature that is not yet implemented in the Python implementation of the queuer.
🧪 Testing
# Run all tests
python -m pytest
# Run with coverage
python -m pytest --cov=. --cov-report=html
# Run specific test files
python -m pytest queuer_test.py -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queuerpy-0.16.0.tar.gz.
File metadata
- Download URL: queuerpy-0.16.0.tar.gz
- Upload date:
- Size: 57.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a7ae20f38f475a119cea51c383b617f3123851cae6af4a0a321eae98bc9011e
|
|
| MD5 |
58ec64a3e9b8259de6fbed5e00599178
|
|
| BLAKE2b-256 |
ce7828bf41aaa1753a88f09f8ed03426a2abbfcc7147e238c17c409a87150e4e
|
File details
Details for the file queuerpy-0.16.0-py3-none-any.whl.
File metadata
- Download URL: queuerpy-0.16.0-py3-none-any.whl
- Upload date:
- Size: 69.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.19
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
24692e7e39765f845d5d5c195392d108c21fc9e2decafde28c297c04e6b61397
|
|
| MD5 |
a82614a54e9a149a4c31068672256f0a
|
|
| BLAKE2b-256 |
a53ea97d0ca5b56144462b178fb604ca233b52fc121bc6b03aae07d0a9f4bbc1
|