Skip to main content

Job management for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Jobs

A job queue system that works with different backends (local, Redis, RabbitMQ).

Backends

  • Local
  • Redis
  • RabbitMQ

Setup Redis:

# Local installation
redis-server

# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

# Test connection
redis-cli ping 

RabbitMQ Backend

Setup RabbitMQ:

# Using Docker (recommended)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

Core Components:

  • Consumer - Base class for processing jobs
  • Orchestrator - Manages job queues and routing
  • Job, JobSchema - Job data structures
  • LocalClient, RedisClient, RabbitMQClient - Backend implementations

Architecture

graph TD
    Consumer["Your Consumer<br/>MathsConsumer('maths_operations')"] 
    Orchestrator["Orchestrator"]
    Schema["Job Schema<br/>(maths_operations)"]
    Job["Job Instance"]
    Queue["Queue<br/>(maths_operations)"]
    
    LocalClient["LocalClient"]
    RedisClient["RedisClient<br/>(requires Redis server)"] 
    RabbitMQClient["RabbitMQClient<br/>(requires RabbitMQ server)"]
    
    Consumer --> Orchestrator
    Schema --> Orchestrator
    Job --> Orchestrator
    Orchestrator --> Queue
    
    Orchestrator -.-> LocalClient
    Orchestrator -.-> RedisClient
    Orchestrator -.-> RabbitMQClient
    
    Queue --> Consumer

Basic Example

from mindtrace.jobs import Orchestrator, LocalClient, Consumer, JobSchema, job_from_schema
from pydantic import BaseModel

# Set up the orchestrator with local backend
orchestrator = Orchestrator(LocalClient())

# Define your job input/output models (inherit from BaseModel directly)
class MathsInput(BaseModel):
    operation: str = "add"
    a: float = 2.0
    b: float = 1.0

class MathsOutput(BaseModel):
    result: float = 0.0
    operation_performed: str = ""

schema = JobSchema(name="maths_operations", input_schema=MathsInput, output_schema=MathsOutput)
orchestrator.register(schema)

# Create a consumer
class MathsConsumer(Consumer):
    def run(self, job_dict: dict) -> dict:
        # Access input data from the dict
        input_data = job_dict.get('input_data', {})
        operation = input_data.get('operation', 'add')
        a = input_data.get('a')
        b = input_data.get('b')
        
        # Your processing logic here
        if operation == "add":
            result = a + b
        elif operation == "multiply":
            result = a * b
        elif operation == "power":
            result = a ** b
        else:
            raise ValueError(f"Unknown operation: {operation}")
        
        return {
            "result": result,
            "operation_performed": f"{operation}({a}, {b}) = {result}"
        }

# Connect and consume jobs
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)

# Add a job to the queue
job = job_from_schema(schema, MathsInput(
    operation="multiply",
    a=7.0,
    b=3.0
))
orchestrator.publish("maths_operations", job)

# Process jobs
consumer.consume(num_messages=1)

Using Different Backends

Redis Backend

from mindtrace.jobs import RedisClient

redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

RabbitMQ Backend

from mindtrace.jobs import RabbitMQClient

rabbitmq_backend = RabbitMQClient(
    host="localhost", 
    port=5672, 
    username="user", 
    password="password"
)
orchestrator = Orchestrator(rabbitmq_backend)

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

Backend Switching

The job system supports seamless switching between backends without changing your consumer code:

# Development: Use local backend
backend = LocalClient()

# Testing: Switch to Redis
backend = RedisClient(host="localhost", port=6379, db=0)

# Production: Switch to RabbitMQ  
backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")

# Same orchestrator and consumer code works with any backend
orchestrator = Orchestrator(backend)
consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)
consumer.consume()

Automatic Backend Detection

When you connect a consumer, the orchestrator automatically detects the backend type and creates the appropriate consumer backend:

consumer = MathsConsumer("maths_operations")
consumer.connect(orchestrator)  # Automatically detects backend type

Implementation:

  1. The Orchestrator detects its backend type using type(self.backend).__name__
  2. Creates the corresponding consumer backend:
    • LocalClientLocalConsumerBackend
    • RedisClientRedisConsumerBackend
    • RabbitMQClientRabbitMQConsumerBackend
def create_consumer_backend_for_schema(self, schema: JobSchema) -> ConsumerBackendBase:
    backend_type = type(self.backend).__name__
    if backend_type == "LocalClient":
        return LocalConsumerBackend(queue_name, self)
    elif backend_type == "RedisClient":
        return RedisConsumerBackend(queue_name, self, poll_timeout=5)
    elif backend_type == "RabbitMQClient":
        return RabbitMQConsumerBackend(queue_name, self, prefetch_count=1)

Priority Queues

Local Priority Queue

# Declare priority queue
backend = LocalClient()
orchestrator = Orchestrator(backend)
backend.declare_queue("priority_tasks", queue_type="priority")

# Publish with different priorities (higher numbers = higher priority)
orchestrator.publish("priority_tasks", urgent_job, priority=10)
orchestrator.publish("priority_tasks", normal_job, priority=5)
orchestrator.publish("priority_tasks", background_job, priority=1)

Redis Priority Queue

# REQUIRES: Redis server running
redis_backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(redis_backend)
redis_backend.declare_queue("redis_priority", queue_type="priority")

# Higher priority jobs processed first
orchestrator.publish("redis_priority", critical_job, priority=100)
orchestrator.publish("redis_priority", normal_job, priority=50)

RabbitMQ Priority Queue

# REQUIRES: RabbitMQ server running
rabbitmq_backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")
orchestrator = Orchestrator(rabbitmq_backend)
# RabbitMQ supports max priority 0-255
rabbitmq_backend.declare_queue("rabbitmq_priority", max_priority=255)

orchestrator.publish("rabbitmq_priority", critical_job, priority=255)
orchestrator.publish("rabbitmq_priority", normal_job, priority=128)

API Reference

Consumer

class Consumer:
    def __init__(self, job_type_name: str)
    def connect(self, orchestrator: Orchestrator)
    def consume(self, num_messages: Optional[int] = None)
    def run(self, job_dict: dict) -> dict  # Implement this method

Orchestrator

class Orchestrator:
    def __init__(self, backend)
    def register(self, schema: JobSchema) -> str
    def publish(self, queue_name: str, job: Job, **kwargs) -> str
    def receive_message(self, queue_name: str) -> Optional[dict]
    def count_queue_messages(self, queue_name: str) -> int

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_jobs-0.5.0.tar.gz (27.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_jobs-0.5.0-py3-none-any.whl (37.8 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_jobs-0.5.0.tar.gz.

File metadata

  • Download URL: mindtrace_jobs-0.5.0.tar.gz
  • Upload date:
  • Size: 27.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for mindtrace_jobs-0.5.0.tar.gz
Algorithm Hash digest
SHA256 9041af2fc0d74f04ab6f500170c7fde2717ed441e62a8db6fded284bf2d994e3
MD5 f4cae8aae3df5f03cdb62685ea272566
BLAKE2b-256 fd6b0aecdfd6c52c17cfa4b12cf87d4e163c2d2c5fcddc30aeccd38b439e90e9

See more details on using hashes here.

File details

Details for the file mindtrace_jobs-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: mindtrace_jobs-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 37.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for mindtrace_jobs-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 96d69e3c49b44a2ca73c3550fd96b58aed9858bf3be089879db5bef44a7331d3
MD5 7e630d6e738d8d591e61d99c7054b42a
BLAKE2b-256 d3b95c3b22071865dfd9a76dc415de71b10d491339b23ee751f90943b4f389b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page