An extensible and scalable MongoDB-based job queue system for Python
Project description
MongoQueue with Job Stores
This implementation of MongoQueue provides job store functionality, allowing you to organize jobs into separate stores with customizable capacity limits.
Features
- Create named job stores with specific capacity limits
- Isolate jobs in different stores for better organization and management
- Limit the number of jobs in specific stores to control resource usage
- All MongoDB queue operations available for each job store
- Store-specific job processing with dedicated workers
- Statistics for job stores and capacity management
Architecture
JobStore- Base class for managing jobs in a specific storeMongoQueue- A special JobStore that manages jobs with no specific store (main queue) and maintains a registry of all job stores
Usage
Basic Usage
from mq-python import MongoQueue
# Initialize the main MongoQueue
mq = MongoQueue()
# Create job stores with different capacities
users_store = mq.get_job_store("users", max_capacity=5)
posts_store = mq.get_job_store("posts", max_capacity=10)
analytics_store = mq.get_job_store("analytics") # No capacity limit
# Add jobs to different stores
mq.put({"type": "system_maintenance"}) # Job in main queue (no store_name)
users_store.put({"user_id": "user_1", "action": "update_profile"}) # Job in users store
posts_store.put({"post_id": "post_1", "action": "index_content"}) # Job in posts store
# List jobs in the users store
users_jobs = users_store.list_jobs()
print("Users store jobs:", users_jobs)
# List jobs in the main queue
main_queue_jobs = mq.list_jobs()
print("Main queue jobs:", main_queue_jobs)
# Get capacity stats for users store
users_capacity = users_store.get_capacity_stats()
print("Users store capacity:", users_capacity)
# List available job stores
stores = mq.list_job_stores()
print("Available stores:", stores)
Running Jobs
Option 1: Simple Job Processing
The simplest way to process jobs from a specific job store:
# Define a job processing function
def process_job(job):
print(f"Processing job {job.id} with payload: {job.payload}")
# Process the job...
job.complete()
return True
# Process jobs from a specific store
# This will run continuously, processing jobs as they become available
users_store.run_jobs(process_job, max_workers=2, poll_interval=1)
Option 2: Process Jobs from All Stores
Process jobs from all job stores in parallel:
# This will run jobs from all stores (including main queue) in parallel
# It creates worker pools for each store
mq.run_all_job_stores(process_job, max_workers_per_store=2, poll_interval=1)
Option 3: Using ThreadPoolExecutor
For more control over job processing, you can use ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
import signal
def run_store_with_timeout(store, timeout=10):
"""Run jobs from a store with a timeout"""
# Set up a timeout handler
def handle_timeout(signum, frame):
raise TimeoutError(f"Processing time limit reached")
signal.signal(signal.SIGALRM, handle_timeout)
signal.alarm(timeout)
try:
store.run_jobs(process_job, max_workers=2, poll_interval=0.5)
except TimeoutError:
print("Timeout reached")
finally:
signal.alarm(0)
# Process jobs using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
# Get all stores
stores = [mq] + [mq.get_job_store(name) for name in mq.list_job_stores()]
# Submit jobs for each store with a timeout
futures = {
executor.submit(run_store_with_timeout, store, 5): store
for store in stores
}
# Wait for all to complete
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error: {str(e)}")
Option 4: Independent Threads
Run job stores in separate threads:
import threading
# Create a thread for each store
threads = []
# Add a thread for the main queue
main_thread = threading.Thread(
target=lambda: mq.run_jobs(process_job, max_workers=2)
)
threads.append(main_thread)
# Add threads for each job store
for store_name in mq.list_job_stores():
store = mq.get_job_store(store_name)
thread = threading.Thread(
target=lambda s=store: s.run_jobs(process_job, max_workers=2)
)
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for threads to complete (or use a timeout mechanism)
for thread in threads:
thread.join()
Recommended Production Setup
For production environments, we recommend:
- Using a separate process for job processing
- Implementing proper error handling and logging
- Using a process manager like Supervisor to manage job processing services
- Setting up monitoring and alerting for job processing
Example production setup:
# job_processor.py
from mq import MongoQueue
import logging
import signal
import sys
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize the main MongoQueue
mq = MongoQueue()
def process_job(job):
try:
logger.info(f"Processing job {job.id}")
# Process the job...
job.complete()
return True
except Exception as e:
logger.error(f"Error processing job {job.id}: {str(e)}")
job.failed(str(e))
return False
def handle_shutdown(signum, frame):
logger.info("Shutdown signal received, exiting gracefully...")
sys.exit(0)
def main():
# Register signal handlers
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
# Get the store name from command line arguments
store_name = sys.argv[1] if len(sys.argv) > 1 else None
logger.info(f"Starting job processor for store: {store_name or 'main'}")
try:
if store_name:
# Process jobs from a specific store
job_store = mq.get_job_store(store_name)
job_store.run_jobs(process_job, max_workers=2)
else:
# Process jobs from the main queue
mq.run_jobs(process_job, max_workers=2)
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
Supervisor configuration:
[program:mq_main]
command=python job_processor.py
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/main.err.log
stdout_logfile=/var/log/mq/main.out.log
[program:mq_users]
command=python job_processor.py users
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/users.err.log
stdout_logfile=/var/log/mq/users.out.log
[program:mq_posts]
command=python job_processor.py posts
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/posts.err.log
stdout_logfile=/var/log/mq/posts.out.log
Capacity Management
Job stores with capacity limits will raise a ValueError when you attempt to exceed the limit:
# This will raise ValueError if users_store already has 5 jobs
try:
users_store.put({"user_id": "new_user", "action": "verify_email"})
except ValueError as e:
print(f"Capacity error: {e}")
Statistics
Get information about job stores and their capacities:
# Get capacity stats for a specific store
capacity_stats = users_store.get_capacity_stats()
print(capacity_stats)
# Example output:
# {
# 'store_name': 'users',
# 'current_jobs': 3,
# 'max_capacity': 5,
# 'is_full': False,
# 'available_capacity': 2
# }
# Get overall stats including job stores
stats = mq.stats()
print(stats)
# Example output:
# {
# 'jobs': {
# 'total': 15,
# 'pending': 10,
# 'processing': 2,
# 'failed': 1,
# 'completed': 2,
# 'main_queue': 3,
# },
# 'workers': {
# 'total': 5,
# 'active': 5,
# 'inactive': 0,
# },
# 'stores': {
# 'users': 5,
# 'posts': 7,
# 'analytics': 4,
# }
# }
Configuration
You can configure default job store settings in your environment variables:
MQ_JOB_STORES_DEFAULT_CAPACITY=100 # Default capacity for all stores
Or directly in your code:
from mq.config import config
config.job_stores_default_capacity = 100 # Default capacity for all stores
config.job_stores_capacities = {
"users": 5,
"posts": 10,
"analytics": None # No limit
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mq_python-0.0.1.tar.gz.
File metadata
- Download URL: mq_python-0.0.1.tar.gz
- Upload date:
- Size: 17.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
075e84d8eaba7eb2ede7758c7ad0f6e0f74913110519afa54d8550d13e1a908b
|
|
| MD5 |
1bf9774d1f8e8a34743792f6f1d7fbad
|
|
| BLAKE2b-256 |
227014c736465ee3902c215e3fba11c1acad0802f882de7cdea276b995d85265
|
File details
Details for the file mq_python-0.0.1-py3-none-any.whl.
File metadata
- Download URL: mq_python-0.0.1-py3-none-any.whl
- Upload date:
- Size: 16.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f6179c10c7bb66510807e380c1e6a48283c5b42d3cd8ff67a9494f08d733969c
|
|
| MD5 |
c027c45a2a67da646e052e13ec039d4e
|
|
| BLAKE2b-256 |
a37205ff0622104c2f04b2f0ce9a00bdb99479df7290923245ced643c090bf08
|