Skip to main content

Cluster management for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Cluster Module

The Mindtrace Cluster module provides a distributed computing framework for managing and orchestrating jobs across multiple worker nodes. It enables scalable, fault-tolerant job execution with support for various execution environments including Git repositories and Docker containers.

Overview

The cluster module consists of three main components:

  • ClusterManager: Central orchestrator that manages job distribution and worker coordination
  • Node: Worker node that can launch and manage workers
  • Worker: Executable units that process jobs

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  ClusterManager │    │      Node       │    │     Worker      │
│                 │    │                 │    │                 │
│ • Job routing   │◄──►│ • Worker launch │◄──►│ • Job execution │
│ • Status tracking│    │ • Registry access│    │ • Environment mgmt│
│ • Worker mgmt   │    │ • Resource mgmt │    │ • Status reporting│
└─────────────────┘    └─────────────────┘    └─────────────────┘

Core Components

ClusterManager

The central orchestrator that:

  • Routes jobs to appropriate endpoints or workers
  • Tracks job and worker status
  • Manages worker registration and auto-connection
  • Provides a unified API for job submission and monitoring

Key Features:

  • Job schema targeting (direct endpoint routing vs orchestrator)
  • Worker type registration with Git/Docker support
  • Automatic worker-to-job schema connection
  • Real-time status monitoring

Node

A worker node that:

  • Launches workers from the registry
  • Manages worker lifecycle
  • Provides resource isolation

Key Features:

  • Worker registry integration
  • Automatic cluster registration
  • Worker process management

Worker

Executable units that:

  • Process individual jobs
  • Report status back to cluster
  • Support various execution environments

Key Features:

  • Abstract base class for custom workers
  • Built-in status tracking
  • Cluster communication
  • Environment management

Built-in Workers

EchoWorker

A simple worker that echoes messages with optional delay:

from mindtrace.cluster.workers.echo_worker import EchoWorker

# Usage
worker = EchoWorker()
result = worker._run({"message": "Hello World", "delay": 2})
# Returns: {"status": "completed", "output": {"echoed": "Hello World"}}

RunScriptWorker

A worker that executes scripts in isolated environments. For git repositories, will sync the environment using uv sync.

from mindtrace.cluster.workers.run_script_worker import RunScriptWorker

# Git environment example
job_data = {
    "environment": {
        "git": {
            "repo_url": "https://github.com/user/repo",
            "branch": "main",
            "working_dir": "scripts"
        }
    },
    "command": "python process_data.py"
}

# Docker environment example
job_data = {
    "environment": {
        "docker": {
            "image": "python:3.9",
            "working_dir": "/app",
            "volumes": {"/host/path": "/container/path"},
            "environment": {"ENV_VAR": "value"}
        }
    },
    "command": "python script.py"
}

Usage Examples

Basic Cluster Setup

from mindtrace.cluster import ClusterManager, Node
from mindtrace.jobs import JobSchema, job_from_schema

# Launch cluster manager
cluster = ClusterManager.launch(host="localhost", port=8002)

# Launch node
node = Node.launch(
    host="localhost", 
    port=8003, 
    cluster_url=str(cluster.url)
)

# Register worker type
cluster.register_worker_type(
    worker_name="myworker",
    worker_class="myapp.workers.MyWorker",
    worker_params={}
)

# Launch worker
worker_url = "http://localhost:8004"
node.launch_worker(worker_type="myworker", worker_url=worker_url)

# Submit job
job = job_from_schema(my_job_schema, input_data={"key": "value"})
result = cluster.submit_job(job)

Gateway Mode

Use ClusterManager as a gateway to route requests:

from mindtrace.cluster import ClusterManager

# Launch as gateway
gateway = ClusterManager.launch(port=8097)

# Register service
gateway.register_job_to_endpoint(
    job_type="echo_job", 
    endpoint="echo/run"
)

# Submit job (automatically routed to endpoint)
job = job_from_schema(echo_job_schema, input_data={"message": "Hello"})
result = gateway.submit_job(job)

Git-based Worker

Launch workers from Git repositories:

# Register worker from Git
cluster.register_worker_type(
    worker_name="gitworker",
    worker_class="myapp.worker.MyWorker",
    worker_params={},
    git_repo_url="https://github.com/user/worker-repo",
    git_branch="main",
    git_working_dir="worker"
)

# Launch worker (automatically clones repo)
node.launch_worker(worker_type="gitworker", worker_url="http://localhost:8005")

Configuration

Environment Variables

# Redis configuration
MINDTRACE_CLUSTER__DEFAULT_REDIS_URL=redis://localhost:6379
MINDTRACE_WORKER__DEFAULT_REDIS_URL=redis://localhost:6379

# MinIO configuration (for worker registry)
MINDTRACE_CLUSTER__MINIO_ENDPOINT=localhost:9000
MINDTRACE_CLUSTER__MINIO_ACCESS_KEY=minioadmin
MINDTRACE_CLUSTER__MINIO_SECRET_KEY=minioadmin
MINDTRACE_CLUSTER__MINIO_BUCKET=workers

Database Models

The cluster uses several database models for tracking:

  • JobStatus: Tracks job execution status and results
  • JobSchemaTargeting: Maps job types to endpoints
  • WorkerStatus: Tracks worker availability and current job
  • WorkerAutoConnect: Maps worker types to job schemas
  • WorkerStatusLocal: Local worker status tracking

API Reference

Since these are Services, the normal way to use them is via a ConnectionManager, but they can also be accessed dirctly.

ClusterManager Endpoints

  • POST /submit_job - Submit a job for execution
  • POST /register_job_to_endpoint - Route job type to specific endpoint
  • POST /register_job_to_worker - Connect job type to worker
  • POST /get_job_status - Get job execution status
  • POST /register_worker_type - Register new worker type
  • POST /launch_worker - Launch worker on node, automatically connecting it to the Orchestrator if it's registered to a job type
  • POST /get_worker_status - Get worker status
  • POST /query_worker_status - Query live worker status

Node Endpoints

  • POST /launch_worker - Launch worker from registry

Worker Endpoints

  • POST /run - Execute a job
  • POST /connect_to_cluster - Connect to cluster orchestrator
  • POST /get_status - Get worker status
  • POST /start - Initialize worker

Examples

See the samples/cluster/ directory for examples:

  • cluster_as_gateway.py - Using ClusterManager as a gateway
  • cluster_with_node.py - Basic cluster with node setup
  • cluster_with_node_autoregister.py - Automatic worker registration
  • run_script/ - RunScriptWorker examples
  • multiprocess/ - Multi-process cluster examples
  • separate_node/ - Distributed node examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_cluster-0.7.0.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_cluster-0.7.0-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_cluster-0.7.0.tar.gz.

File metadata

  • Download URL: mindtrace_cluster-0.7.0.tar.gz
  • Upload date:
  • Size: 24.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for mindtrace_cluster-0.7.0.tar.gz
Algorithm Hash digest
SHA256 e7bf108e4e8c32885b2b3bd6d710788696ae27ef6405c6431fd958c4f950a471
MD5 05df0ed345cbe5142943c7fded471465
BLAKE2b-256 f76e4bc51f919440661279fd6b3968974bcdf2fb90c45cc86de8e85c4b8e808d

See more details on using hashes here.

File details

Details for the file mindtrace_cluster-0.7.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mindtrace_cluster-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d872f9d8e4b9d8c69aa0a9647e8a08df64cee95c8bfcd6b81ad0f166b297f9ad
MD5 3c23094e70b08936f1f979ac8761bfb5
BLAKE2b-256 92ba982cc2cfccbb0efd88b66bb54322eddc438db3d80699385f14b5d01f24c8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page