Skip to main content

Cluster management for Mindtrace

Project description

PyPI version License

Mindtrace Cluster Module

The Mindtrace Cluster module provides a distributed computing framework for managing and orchestrating jobs across multiple worker nodes. It enables scalable, fault-tolerant job execution with support for various execution environments including Git repositories and Docker containers.

Overview

The cluster module consists of three main components:

  • ClusterManager: Central orchestrator that manages job distribution and worker coordination
  • Node: Worker node that can launch and manage workers
  • Worker: Executable units that process jobs

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  ClusterManager │    │      Node       │    │     Worker      │
│                 │    │                 │    │                 │
│ • Job routing   │◄──►│ • Worker launch │◄──►│ • Job execution │
│ • Status tracking│    │ • Registry access│    │ • Environment mgmt│
│ • Worker mgmt   │    │ • Resource mgmt │    │ • Status reporting│
└─────────────────┘    └─────────────────┘    └─────────────────┘

Core Components

ClusterManager

The central orchestrator that:

  • Routes jobs to appropriate endpoints or workers
  • Tracks job and worker status
  • Manages worker registration and auto-connection
  • Provides a unified API for job submission and monitoring

Key Features:

  • Job schema targeting (direct endpoint routing vs orchestrator)
  • Worker type registration with Git/Docker support
  • Automatic worker-to-job schema connection
  • Real-time status monitoring

Node

A worker node that:

  • Launches workers from the registry
  • Manages worker lifecycle
  • Provides resource isolation

Key Features:

  • Worker registry integration
  • Automatic cluster registration
  • Worker process management

Worker

Executable units that:

  • Process individual jobs
  • Report status back to cluster
  • Support various execution environments

Key Features:

  • Abstract base class for custom workers
  • Built-in status tracking
  • Cluster communication
  • Environment management

Built-in Workers

EchoWorker

A simple worker that echoes messages with optional delay:

from mindtrace.cluster.workers.echo_worker import EchoWorker

# Usage
worker = EchoWorker()
result = worker._run({"message": "Hello World", "delay": 2})
# Returns: {"status": "completed", "output": {"echoed": "Hello World"}}

RunScriptWorker

A worker that executes scripts in isolated environments. For git repositories, will sync the environment using uv sync.

from mindtrace.cluster.workers.run_script_worker import RunScriptWorker

# Git environment example
job_data = {
    "environment": {
        "git": {
            "repo_url": "https://github.com/user/repo",
            "branch": "main",
            "working_dir": "scripts"
        }
    },
    "command": "python process_data.py"
}

# Docker environment example
job_data = {
    "environment": {
        "docker": {
            "image": "python:3.9",
            "working_dir": "/app",
            "volumes": {"/host/path": "/container/path"},
            "environment": {"ENV_VAR": "value"}
        }
    },
    "command": "python script.py"
}

Usage Examples

Basic Cluster Setup

from mindtrace.cluster import ClusterManager, Node
from mindtrace.jobs import JobSchema, job_from_schema

# Launch cluster manager
cluster = ClusterManager.launch(host="localhost", port=8002)

# Launch node
node = Node.launch(
    host="localhost", 
    port=8003, 
    cluster_url=str(cluster.url)
)

# Register worker type
cluster.register_worker_type(
    worker_name="myworker",
    worker_class="myapp.workers.MyWorker",
    worker_params={}
)

# Launch worker
worker_url = "http://localhost:8004"
node.launch_worker(worker_type="myworker", worker_url=worker_url)

# Submit job
job = job_from_schema(my_job_schema, input_data={"key": "value"})
result = cluster.submit_job(job)

Gateway Mode

Use ClusterManager as a gateway to route requests:

from mindtrace.cluster import ClusterManager

# Launch as gateway
gateway = ClusterManager.launch(port=8097)

# Register service
gateway.register_job_to_endpoint(
    job_type="echo_job", 
    endpoint="echo/run"
)

# Submit job (automatically routed to endpoint)
job = job_from_schema(echo_job_schema, input_data={"message": "Hello"})
result = gateway.submit_job(job)

Git-based Worker

Launch workers from Git repositories:

# Register worker from Git
cluster.register_worker_type(
    worker_name="gitworker",
    worker_class="myapp.worker.MyWorker",
    worker_params={},
    git_repo_url="https://github.com/user/worker-repo",
    git_branch="main",
    git_working_dir="worker"
)

# Launch worker (automatically clones repo)
node.launch_worker(worker_type="gitworker", worker_url="http://localhost:8005")

Configuration

Environment Variables

# Redis configuration
MINDTRACE_CLUSTER_DEFAULT_REDIS_URL=redis://localhost:6379
MINDTRACE_WORKER_REDIS_DEFAULT_URL=redis://localhost:6379

# MinIO configuration (for worker registry)
MINDTRACE_CLUSTER_MINIO_ENDPOINT=localhost:9000
MINDTRACE_CLUSTER_MINIO_ACCESS_KEY=minioadmin
MINDTRACE_CLUSTER_MINIO_SECRET_KEY=minioadmin
MINDTRACE_CLUSTER_MINIO_BUCKET=workers

Database Models

The cluster uses several database models for tracking:

  • JobStatus: Tracks job execution status and results
  • JobSchemaTargeting: Maps job types to endpoints
  • WorkerStatus: Tracks worker availability and current job
  • WorkerAutoConnect: Maps worker types to job schemas
  • WorkerStatusLocal: Local worker status tracking

API Reference

Since these are Services, the normal way to use them is via a ConnectionManager, but they can also be accessed dirctly.

ClusterManager Endpoints

  • POST /submit_job - Submit a job for execution
  • POST /register_job_to_endpoint - Route job type to specific endpoint
  • POST /register_job_to_worker - Connect job type to worker
  • POST /get_job_status - Get job execution status
  • POST /register_worker_type - Register new worker type
  • POST /launch_worker - Launch worker on node, automatically connecting it to the Orchestrator if it's registered to a job type
  • POST /get_worker_status - Get worker status
  • POST /query_worker_status - Query live worker status

Node Endpoints

  • POST /launch_worker - Launch worker from registry

Worker Endpoints

  • POST /run - Execute a job
  • POST /connect_to_cluster - Connect to cluster orchestrator
  • POST /get_status - Get worker status
  • POST /start - Initialize worker

Examples

See the samples/cluster/ directory for examples:

  • cluster_as_gateway.py - Using ClusterManager as a gateway
  • cluster_with_node.py - Basic cluster with node setup
  • cluster_with_node_autoregister.py - Automatic worker registration
  • run_script/ - RunScriptWorker examples
  • multiprocess/ - Multi-process cluster examples
  • separate_node/ - Distributed node examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_cluster-0.3.0.tar.gz (21.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_cluster-0.3.0-py3-none-any.whl (21.7 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_cluster-0.3.0.tar.gz.

File metadata

  • Download URL: mindtrace_cluster-0.3.0.tar.gz
  • Upload date:
  • Size: 21.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for mindtrace_cluster-0.3.0.tar.gz
Algorithm Hash digest
SHA256 031ef7313052f31346a12a641608c16823b32432c4e62925852290aaa9f11494
MD5 2d0ad9d9046748f07e493e5912c132e7
BLAKE2b-256 63c76643768e5d389346201743ae221960bf73742d3fa25b5e02b24529ecc851

See more details on using hashes here.

File details

Details for the file mindtrace_cluster-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mindtrace_cluster-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 deb6171c121069905d150272003689ed7d532faf02250b9574a3cc524b9c867d
MD5 bc97cc70a89904a5137ee390b56b7865
BLAKE2b-256 67108d2558bf7648699befd63108cefd95625fd591d30665e8273aa5218158e6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page