Skip to main content

Job management for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Jobs

A job queue system that works with different backends (local, Redis, RabbitMQ).

Backends

  • Local
  • Redis
  • RabbitMQ

Setup Redis:

# Local installation
redis-server

# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

# Test connection
redis-cli ping 

RabbitMQ Backend

Setup RabbitMQ:

# Using Docker (recommended)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

Core Components:

  • Consumer - Base class for processing jobs
  • Orchestrator - Manages job queues and routing
  • Job, JobSchema - Job data structures
  • LocalClient, RedisClient, RabbitMQClient - Backend implementations

Architecture

graph TD
    Consumer["Your Consumer<br/>MathsConsumer('maths_operations')"] 
    Orchestrator["Orchestrator"]
    Schema["Job Schema<br/>(maths_operations)"]
    Job["Job Instance"]
    Queue["Queue<br/>(maths_operations)"]
    
    LocalClient["LocalClient"]
    RedisClient["RedisClient<br/>(requires Redis server)"] 
    RabbitMQClient["RabbitMQClient<br/>(requires RabbitMQ server)"]
    
    Consumer --> Orchestrator
    Schema --> Orchestrator
    Job --> Orchestrator
    Orchestrator --> Queue
    
    Orchestrator -.-> LocalClient
    Orchestrator -.-> RedisClient
    Orchestrator -.-> RabbitMQClient
    
    Queue --> Consumer

Basic Example

from mindtrace.jobs import Orchestrator, LocalClient, Consumer, JobSchema, job_from_schema
from pydantic import BaseModel

# Set up the orchestrator with local backend
orchestrator = Orchestrator(LocalClient())

# Define your job input/output models (inherit from BaseModel directly)
class MathsInput(BaseModel):
    operation: str = "add"
    a: float = 2.0
    b: float = 1.0

class MathsOutput(BaseModel):
    result: float = 0.0
    operation_performed: str = ""

schema = JobSchema(name="maths_operations", input_schema=MathsInput, output_schema=MathsOutput)
orchestrator.register(schema)

# Create a consumer
class MathsConsumer(Consumer):
    def run(self, job_dict: dict) -> dict:
        # Access input data from the dict
        input_data = job_dict.get('input_data', {})
        operation = input_data.get('operation', 'add')
        a = input_data.get('a')
        b = input_data.get('b')
        
        # Your processing logic here
        if operation == "add":
            result = a + b
        elif operation == "multiply":
            result = a * b
        elif operation == "power":
            result = a ** b
        else:
            raise ValueError(f"Unknown operation: {operation}")
        
        return {
            "result": result,
            "operation_performed": f"{operation}({a}, {b}) = {result}"
        }

# Connect and consume jobs
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)

# Add a job to the queue
job = job_from_schema(schema, MathsInput(
    operation="multiply",
    a=7.0,
    b=3.0
))
orchestrator.publish("maths_operations", job)

# Process jobs
consumer.consume(num_messages=1)

Using Different Backends

Redis Backend

from mindtrace.jobs import RedisClient

redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

RabbitMQ Backend

from mindtrace.jobs import RabbitMQClient

rabbitmq_backend = RabbitMQClient(
    host="localhost", 
    port=5672, 
    username="user", 
    password="password"
)
orchestrator = Orchestrator(rabbitmq_backend)

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

Backend Switching

The job system supports seamless switching between backends without changing your consumer code:

# Development: Use local backend
backend = LocalClient()

# Testing: Switch to Redis
backend = RedisClient(host="localhost", port=6379, db=0)

# Production: Switch to RabbitMQ  
backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")

# Same orchestrator and consumer code works with any backend
orchestrator = Orchestrator(backend)
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

Automatic Backend Detection

When you connect a consumer, the orchestrator automatically detects the backend type and creates the appropriate consumer backend:

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)  # Automatically detects backend type

Implementation:

  1. The Orchestrator detects its backend type using type(self.backend).__name__
  2. Creates the corresponding consumer backend:
    • LocalClientLocalConsumerBackend
    • RedisClientRedisConsumerBackend
    • RabbitMQClientRabbitMQConsumerBackend
def create_consumer_backend_for_schema(self, schema: JobSchema) -> ConsumerBackendBase:
    backend_type = type(self.backend).__name__
    if backend_type == "LocalClient":
        return LocalConsumerBackend(queue_name, self)
    elif backend_type == "RedisClient":
        return RedisConsumerBackend(queue_name, self, poll_timeout=5)
    elif backend_type == "RabbitMQClient":
        return RabbitMQConsumerBackend(queue_name, self, prefetch_count=1)

Priority Queues

Local Priority Queue

# Declare priority queue
backend = LocalClient()
orchestrator = Orchestrator(backend)
backend.declare_queue("priority_tasks", queue_type="priority")

# Publish with different priorities (higher numbers = higher priority)
orchestrator.publish("priority_tasks", urgent_job, priority=10)
orchestrator.publish("priority_tasks", normal_job, priority=5)
orchestrator.publish("priority_tasks", background_job, priority=1)

Redis Priority Queue

# REQUIRES: Redis server running
redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)
redis_backend.declare_queue("redis_priority", queue_type="priority")

# Higher priority jobs processed first
orchestrator.publish("redis_priority", critical_job, priority=100)
orchestrator.publish("redis_priority", normal_job, priority=50)

RabbitMQ Priority Queue

# REQUIRES: RabbitMQ server running
rabbitmq_backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")
orchestrator = Orchestrator(rabbitmq_backend)
# RabbitMQ supports max priority 0-255
rabbitmq_backend.declare_queue("rabbitmq_priority", max_priority=255)

orchestrator.publish("rabbitmq_priority", critical_job, priority=255)
orchestrator.publish("rabbitmq_priority", normal_job, priority=128)

API Reference

Consumer

class Consumer:
    def __init__(self, job_type_name: str)
    def connect(self, orchestrator: Orchestrator)
    def consume(self, num_messages: Optional[int] = None)
    def run(self, job_dict: dict) -> dict  # Implement this method

Orchestrator

class Orchestrator:
    def __init__(self, backend)
    def register(self, schema: JobSchema) -> str
    def publish(self, queue_name: str, job: Job, **kwargs) -> str
    def receive_message(self, queue_name: str) -> Optional[dict]
    def count_queue_messages(self, queue_name: str) -> int

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_jobs-0.10.0.tar.gz (28.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_jobs-0.10.0-py3-none-any.whl (37.9 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_jobs-0.10.0.tar.gz.

File metadata

  • Download URL: mindtrace_jobs-0.10.0.tar.gz
  • Upload date:
  • Size: 28.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_jobs-0.10.0.tar.gz
Algorithm Hash digest
SHA256 daf89366422cd84cad2ca884bb980cec4e8685b8c8ee0e3b77029bdc699d6037
MD5 9ebeba9ee03385b90a3fa31f8b538300
BLAKE2b-256 f03b3b2d9076f74ae3e1a8f191ee3f81b065b09feafa02c70100f64451214fca

See more details on using hashes here.

File details

Details for the file mindtrace_jobs-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: mindtrace_jobs-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 37.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_jobs-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 182c12df419acb166cc8c61787130927f097f94644aa8b6e4cbaca23d3e1c3db
MD5 b8705c17839cda54e44ab5a76ac3d959
BLAKE2b-256 271b9ece468a85003f11e5be1825fda8f737f1c921e7f1dab1a38fe9b88ff6c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page