Skip to main content

Job management for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Jobs

The Jobs module provides Mindtrace’s backend-agnostic job queue system for publishing typed jobs, consuming them with Python workers, and switching between local, Redis, and RabbitMQ backends with minimal application changes.

Features

  • Typed job definitions with JobSchema and Pydantic models
  • Backend-agnostic orchestration through Orchestrator
  • Consumer workers built by subclassing Consumer
  • Multiple backends for local, Redis, and RabbitMQ execution
  • Queue variants including FIFO, stack, and priority queues
  • Convenient job creation with job_from_schema()

Quick Start

from pydantic import BaseModel

from mindtrace.jobs import Consumer, JobSchema, LocalClient, Orchestrator, job_from_schema


class MathsInput(BaseModel):
    operation: str = "add"
    a: float = 2.0
    b: float = 1.0


class MathsOutput(BaseModel):
    result: float = 0.0
    operation_performed: str = ""


schema = JobSchema(
    name="maths_operations",
    input_schema=MathsInput,
    output_schema=MathsOutput,
)

orchestrator = Orchestrator(LocalClient())
orchestrator.register(schema)


class MathsConsumer(Consumer):
    def run(self, job_dict: dict) -> dict:
        payload = job_dict.get("payload", {})
        operation = payload.get("operation", "add")
        a = payload.get("a")
        b = payload.get("b")

        if operation == "add":
            result = a + b
        elif operation == "multiply":
            result = a * b
        else:
            raise ValueError(f"Unknown operation: {operation}")

        return {
            "result": result,
            "operation_performed": f"{operation}({a}, {b}) = {result}",
        }


consumer = MathsConsumer()
consumer.connect_to_orchestrator(orchestrator, "maths_operations")

job = job_from_schema(schema, MathsInput(operation="multiply", a=7.0, b=3.0))
orchestrator.publish("maths_operations", job)
consumer.consume(num_messages=1)

In practice, the jobs package is built around four concepts:

  • a schema describing the job payload
  • an orchestrator that owns queues and publishing
  • a backend that stores/transports messages
  • a consumer that processes jobs from one or more queues

JobSchema and Job

JobSchema is currently an alias of TaskSchema from mindtrace-core. It gives a queue/job type a name plus typed input/output models.

from pydantic import BaseModel

from mindtrace.jobs import JobSchema


class ReportInput(BaseModel):
    report_id: str
    include_charts: bool = True


class ReportOutput(BaseModel):
    path: str


schema = JobSchema(
    name="build_report",
    input_schema=ReportInput,
    output_schema=ReportOutput,
)

A Job is the executable instance that gets queued. In most cases you do not construct it by hand; you use job_from_schema().

from mindtrace.jobs import job_from_schema


job = job_from_schema(schema, {"report_id": "rpt-123", "include_charts": True})
print(job.id)
print(job.schema_name)

Orchestrator

Orchestrator is the publishing and queue-management layer. It owns a backend and handles things like:

  • registering schemas
  • declaring queues
  • publishing jobs
  • counting queue messages
  • cleaning or deleting queues
from mindtrace.jobs import LocalClient, Orchestrator


orchestrator = Orchestrator(LocalClient())
queue_name = orchestrator.register(schema)
print(queue_name)

Publishing typed input directly

If a schema has been registered for a queue, you can publish either:

  • a full Job
  • or a matching Pydantic input model
orchestrator.publish("build_report", ReportInput(report_id="rpt-001"))

That convenience is often nicer than manually creating the Job every time.

Consumer

Subclass Consumer and implement run(job_dict: dict) -> dict.

from mindtrace.jobs import Consumer


class ReportConsumer(Consumer):
    def run(self, job_dict: dict) -> dict:
        payload = job_dict.get("payload", {})
        report_id = payload.get("report_id")
        return {"path": f"/tmp/{report_id}.pdf"}

Then connect the consumer to an orchestrator and start consuming:

consumer = ReportConsumer()
consumer.connect_to_orchestrator(orchestrator, "build_report")
consumer.consume(num_messages=1)

Consuming until empty

consumer.consume_until_empty()

That is useful for local scripts, test runs, or backlog-draining workflows.

Backends

The package supports three backend families.

Local backend

LocalClient is the simplest backend and a good default for local development or single-process workflows.

from mindtrace.jobs import LocalClient, Orchestrator


backend = LocalClient()
orchestrator = Orchestrator(backend)

Internally, the local backend stores queues through the registry-backed local implementation and also supports local queue variants such as:

  • LocalQueue
  • LocalStack
  • LocalPriorityQueue

Redis backend

Use RedisClient when you want Redis-backed queues.

from mindtrace.jobs import Orchestrator, RedisClient


backend = RedisClient(host="localhost", port=6379, db=0)
orchestrator = Orchestrator(backend)

Redis is a good fit when you want a lightweight shared broker across multiple processes or machines.

RabbitMQ backend

Use RabbitMQClient when you want RabbitMQ-backed routing and queueing.

from mindtrace.jobs import Orchestrator, RabbitMQClient


backend = RabbitMQClient(
    host="localhost",
    port=5672,
    username="user",
    password="password",
)
orchestrator = Orchestrator(backend)

RabbitMQ is a better fit when you want broker-oriented messaging behavior, exchanges, and mature queue features such as max-priority support.

Switching Backends

One of the main design goals of the jobs package is that your job schema and consumer logic should not need major changes when switching backends.

# Development
backend = LocalClient()

# Shared test environment
backend = RedisClient(host="localhost", port=6379, db=0)

# Production / broker-oriented setup
backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")

orchestrator = Orchestrator(backend)
consumer = ReportConsumer()
consumer.connect_to_orchestrator(orchestrator, "build_report")

The core publishing and consuming flow stays largely the same.

Queue Types and Priority

The local and Redis backends expose queue-type selection when declaring a queue.

FIFO queue

orchestrator.register(schema, queue_type="fifo")

Stack / LIFO queue

backend.declare_queue("stack_tasks", queue_type="stack")

Priority queue

backend.declare_queue("priority_tasks", queue_type="priority")
orchestrator.publish("priority_tasks", priority_job, priority=100)
orchestrator.publish("priority_tasks", background_job, priority=10)

RabbitMQ priority queues

RabbitMQ does not use the same queue_type argument. Instead, you declare a queue with max_priority.

backend = RabbitMQClient(host="localhost", port=5672, username="user", password="password")
backend.declare_queue("rabbitmq_priority", max_priority=255)

Then publish with a priority value:

orchestrator = Orchestrator(backend)
orchestrator.publish("rabbitmq_priority", job, priority=255)

Redis Setup

For Redis-backed jobs, start a Redis server first.

$ redis-server

Or with Docker:

$ docker run -d --name redis -p 6379:6379 redis:latest
$ redis-cli ping

RabbitMQ Setup

For RabbitMQ-backed jobs, start a RabbitMQ server first.

$ docker run -d --name rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    -e RABBITMQ_DEFAULT_USER=user \
    -e RABBITMQ_DEFAULT_PASS=password \
    rabbitmq:3-management

Examples

Related examples in the repo:

Testing

If you are working in the full Mindtrace repo, run tests for this module specifically:

$ git clone https://github.com/Mindtrace/mindtrace.git && cd mindtrace
$ uv sync --dev --all-extras
$ ds test: jobs
$ ds test: --unit jobs

Practical Notes and Caveats

  • JobSchema is currently an alias of TaskSchema, so older naming in the jobs package may reflect that transition.
  • Consumers operate on job_dict payloads, so your run() implementation should be defensive about the shape it expects.
  • Local, Redis, and RabbitMQ backends expose similar high-level workflows, but their queue semantics and operational requirements differ.
  • Redis and RabbitMQ require external services; the local backend is the simplest place to start.
  • Priority queue support exists across backends, but the declaration model differs for RabbitMQ vs. local/Redis backends.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_jobs-0.12.0.tar.gz (29.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_jobs-0.12.0-py3-none-any.whl (38.4 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_jobs-0.12.0.tar.gz.

File metadata

  • Download URL: mindtrace_jobs-0.12.0.tar.gz
  • Upload date:
  • Size: 29.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_jobs-0.12.0.tar.gz
Algorithm Hash digest
SHA256 565e7f6f4936089d925c39c070e15986c6cb42eea63eb443ecbc59d1c9c2a1f5
MD5 1b4090accf5ef1cde55490da2116ed05
BLAKE2b-256 05ab4048f5578099569aee1918aec087a23220b62aef43eea8e789eba27b587a

See more details on using hashes here.

File details

Details for the file mindtrace_jobs-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: mindtrace_jobs-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 38.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_jobs-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d746a61488ab89b214aaedcc9535e1be833f89f745d39ab96fce58de52c6756b
MD5 5fbed70c8ce49a3fce399dc09b501c40
BLAKE2b-256 162dbad224ba20a76da3357102e26fedc8950ece7ea28df9fdb2fc0f5d266153

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page