Full-featured job queue system with multiprocessing support for Linux
Project description
Queue Manager
A Python-based job queue system with per-job processes and signal variables.
Features
- Per-Job Processes: Each job runs in its own OS process for isolation
- Signal Variables: Jobs communicate via shared state protected by mutexes
- Registry Persistence: Job states and results are persisted to a registry
- Process Control: Start, stop, and delete jobs with graceful termination
- Progress Tracking: Real-time progress updates and status monitoring
- Error Handling: Comprehensive error handling and recovery mechanisms
Installation
pip install queuemgr
Quick Start
from queuemgr.queue.job_queue import JobQueue
from queuemgr.core.registry import JsonlRegistry
from queuemgr.jobs.base import QueueJobBase
from queuemgr.core.types import JobStatus
from queuemgr.core.ipc import update_job_state
class MyJob(QueueJobBase):
def execute(self):
# Your job logic here
for i in range(100):
update_job_state(
self._shared_state,
progress=i,
description=f"Processing item {i}"
)
def on_start(self):
update_job_state(self._shared_state, description="Job started")
def on_stop(self):
update_job_state(self._shared_state, description="Job stopped")
def on_end(self):
update_job_state(self._shared_state, description="Job completed")
def on_error(self, exc):
update_job_state(self._shared_state, description=f"Job failed: {exc}")
# Create queue and job
registry = JsonlRegistry("my_registry.jsonl")
queue = JobQueue(registry)
job = MyJob("my-job-1", {"param": "value"})
# Add and start job
queue.add_job(job)
queue.start_job("my-job-1")
# Monitor progress
status = queue.get_job_status("my-job-1")
print(f"Status: {status.status}, Progress: {status.progress}%")
Examples
See the examples/ directory for complete examples:
simple_job.py- Basic job creation and executionerror_handling_job.py- Error handling and recoveryprogress_job.py- Long-running job with progress updatesregistry_example.py- Registry functionality and job history
Development
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=queuemgr
# Format code
black queuemgr tests
# Lint code
flake8 queuemgr tests
# Type check
mypy queuemgr
License
MIT License - see LICENSE file for details.
Author
Vasiliy Zdanovskiy - vasilyvz@gmail.com
vvz-queuemgr
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
queuemgr-1.0.9.tar.gz
(99.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
queuemgr-1.0.9-py3-none-any.whl
(117.7 kB
view details)
File details
Details for the file queuemgr-1.0.9.tar.gz.
File metadata
- Download URL: queuemgr-1.0.9.tar.gz
- Upload date:
- Size: 99.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb630b07d8f9f843b3b750eb7255cd5fae7cd09b82b0f3833bc7fd9f9e4f0158
|
|
| MD5 |
6c83ddc7c45bf05f49877e6bc779ef8b
|
|
| BLAKE2b-256 |
3277509af29d71d853c35fdf9d1649eccc7f7bccd2648ba0cc4d107c4372519e
|
File details
Details for the file queuemgr-1.0.9-py3-none-any.whl.
File metadata
- Download URL: queuemgr-1.0.9-py3-none-any.whl
- Upload date:
- Size: 117.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d77cc6d7e5ac5196e4bad203b70c7bc2cc1ff12c0d8b9882814307eb97b00a33
|
|
| MD5 |
12f0ab3e6aacb5b3009e0f8da4c18086
|
|
| BLAKE2b-256 |
a482882309535e6ba6fa9b7bf76cccfc5e43020d136f6de885a62d5991520e5f
|