Job management for Mindtrace
Project description
Mindtrace Jobs
A job queue system that works with different backends (local, Redis, RabbitMQ).
Backends
- Local
- Redis
- RabbitMQ
Setup Redis:
# Local installation
redis-server
# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest
# Test connection
redis-cli ping
RabbitMQ Backend
Setup RabbitMQ:
# Using Docker (recommended)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
Core Components:
Consumer- Base class for processing jobsOrchestrator- Manages job queues and routingJob,JobSchema- Job data structuresLocalClient,RedisClient,RabbitMQClient- Backend implementations
Architecture
graph TD
Consumer["Your Consumer<br/>MathsConsumer('maths_operations')"]
Orchestrator["Orchestrator"]
Schema["Job Schema<br/>(maths_operations)"]
Job["Job Instance"]
Queue["Queue<br/>(maths_operations)"]
LocalClient["LocalClient"]
RedisClient["RedisClient<br/>(requires Redis server)"]
RabbitMQClient["RabbitMQClient<br/>(requires RabbitMQ server)"]
Consumer --> Orchestrator
Schema --> Orchestrator
Job --> Orchestrator
Orchestrator --> Queue
Orchestrator -.-> LocalClient
Orchestrator -.-> RedisClient
Orchestrator -.-> RabbitMQClient
Queue --> Consumer
Basic Example
from mindtrace.jobs import Orchestrator, LocalClient, Consumer, JobSchema, job_from_schema
from pydantic import BaseModel
# Set up the orchestrator with local backend
orchestrator = Orchestrator(LocalClient())
# Define your job input/output models (inherit from BaseModel directly)
class MathsInput(BaseModel):
operation: str = "add"
a: float = 2.0
b: float = 1.0
class MathsOutput(BaseModel):
result: float = 0.0
operation_performed: str = ""
schema = JobSchema(name="maths_operations", input_schema=MathsInput, output_schema=MathsOutput)
orchestrator.register(schema)
# Create a consumer
class MathsConsumer(Consumer):
def run(self, job_dict: dict) -> dict:
# Access input data from the dict
input_data = job_dict.get('input_data', {})
operation = input_data.get('operation', 'add')
a = input_data.get('a')
b = input_data.get('b')
# Your processing logic here
if operation == "add":
result = a + b
elif operation == "multiply":
result = a * b
elif operation == "power":
result = a ** b
else:
raise ValueError(f"Unknown operation: {operation}")
return {
"result": result,
"operation_performed": f"{operation}({a}, {b}) = {result}"
}
# Connect and consume jobs
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
# Add a job to the queue
job = job_from_schema(schema, MathsInput(
operation="multiply",
a=7.0,
b=3.0
))
orchestrator.publish("maths_operations", job)
# Process jobs
consumer.consume(num_messages=1)
Using Different Backends
Redis Backend
from mindtrace.jobs import RedisClient
redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()
RabbitMQ Backend
from mindtrace.jobs import RabbitMQClient
rabbitmq_backend = RabbitMQClient(
host="localhost",
port=5672,
username="user",
password="password"
)
orchestrator = Orchestrator(rabbitmq_backend)
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()
Backend Switching
The job system supports seamless switching between backends without changing your consumer code:
# Development: Use local backend
backend = LocalClient()
# Testing: Switch to Redis
backend = RedisClient(host="localhost", port=6379, db=0)
# Production: Switch to RabbitMQ
backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")
# Same orchestrator and consumer code works with any backend
orchestrator = Orchestrator(backend)
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()
Automatic Backend Detection
When you connect a consumer, the orchestrator automatically detects the backend type and creates the appropriate consumer backend:
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator) # Automatically detects backend type
Implementation:
- The
Orchestratordetects its backend type usingtype(self.backend).__name__ - Creates the corresponding consumer backend:
LocalClient→LocalConsumerBackendRedisClient→RedisConsumerBackendRabbitMQClient→RabbitMQConsumerBackend
def create_consumer_backend_for_schema(self, schema: JobSchema) -> ConsumerBackendBase:
backend_type = type(self.backend).__name__
if backend_type == "LocalClient":
return LocalConsumerBackend(queue_name, self)
elif backend_type == "RedisClient":
return RedisConsumerBackend(queue_name, self, poll_timeout=5)
elif backend_type == "RabbitMQClient":
return RabbitMQConsumerBackend(queue_name, self, prefetch_count=1)
Priority Queues
Local Priority Queue
# Declare priority queue
backend = LocalClient()
orchestrator = Orchestrator(backend)
backend.declare_queue("priority_tasks", queue_type="priority")
# Publish with different priorities (higher numbers = higher priority)
orchestrator.publish("priority_tasks", urgent_job, priority=10)
orchestrator.publish("priority_tasks", normal_job, priority=5)
orchestrator.publish("priority_tasks", background_job, priority=1)
Redis Priority Queue
# REQUIRES: Redis server running
redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)
redis_backend.declare_queue("redis_priority", queue_type="priority")
# Higher priority jobs processed first
orchestrator.publish("redis_priority", critical_job, priority=100)
orchestrator.publish("redis_priority", normal_job, priority=50)
RabbitMQ Priority Queue
# REQUIRES: RabbitMQ server running
rabbitmq_backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")
orchestrator = Orchestrator(rabbitmq_backend)
# RabbitMQ supports max priority 0-255
rabbitmq_backend.declare_queue("rabbitmq_priority", max_priority=255)
orchestrator.publish("rabbitmq_priority", critical_job, priority=255)
orchestrator.publish("rabbitmq_priority", normal_job, priority=128)
API Reference
Consumer
class Consumer:
def __init__(self, job_type_name: str)
def connect(self, orchestrator: Orchestrator)
def consume(self, num_messages: Optional[int] = None)
def run(self, job_dict: dict) -> dict # Implement this method
Orchestrator
class Orchestrator:
def __init__(self, backend)
def register(self, schema: JobSchema) -> str
def publish(self, queue_name: str, job: Job, **kwargs) -> str
def receive_message(self, queue_name: str) -> Optional[dict]
def count_queue_messages(self, queue_name: str) -> int
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mindtrace_jobs-0.10.0.tar.gz.
File metadata
- Download URL: mindtrace_jobs-0.10.0.tar.gz
- Upload date:
- Size: 28.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
daf89366422cd84cad2ca884bb980cec4e8685b8c8ee0e3b77029bdc699d6037
|
|
| MD5 |
9ebeba9ee03385b90a3fa31f8b538300
|
|
| BLAKE2b-256 |
f03b3b2d9076f74ae3e1a8f191ee3f81b065b09feafa02c70100f64451214fca
|
File details
Details for the file mindtrace_jobs-0.10.0-py3-none-any.whl.
File metadata
- Download URL: mindtrace_jobs-0.10.0-py3-none-any.whl
- Upload date:
- Size: 37.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
182c12df419acb166cc8c61787130927f097f94644aa8b6e4cbaca23d3e1c3db
|
|
| MD5 |
b8705c17839cda54e44ab5a76ac3d959
|
|
| BLAKE2b-256 |
271b9ece468a85003f11e5be1825fda8f737f1c921e7f1dab1a38fe9b88ff6c6
|