Skip to main content

Cluster management for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Cluster Module

The Mindtrace Cluster module provides a distributed computing framework for managing and orchestrating jobs across multiple worker nodes. It enables scalable, fault-tolerant job execution with support for various execution environments including Git repositories and Docker containers.

Overview

The cluster module consists of three main components:

  • ClusterManager: Central orchestrator that manages job distribution and worker coordination
  • Node: Worker node that can launch and manage workers
  • Worker: Executable units that process jobs

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  ClusterManager │    │      Node       │    │     Worker      │
│                 │    │                 │    │                 │
│ • Job routing   │◄──►│ • Worker launch │◄──►│ • Job execution │
│ • Status tracking│    │ • Registry access│    │ • Environment mgmt│
│ • Worker mgmt   │    │ • Resource mgmt │    │ • Status reporting│
└─────────────────┘    └─────────────────┘    └─────────────────┘

Core Components

ClusterManager

The central orchestrator that:

  • Routes jobs to appropriate endpoints or workers
  • Tracks job and worker status
  • Manages worker registration and auto-connection
  • Provides a unified API for job submission and monitoring

Key Features:

  • Job schema targeting (direct endpoint routing vs orchestrator)
  • Worker type registration with Git/Docker support
  • Automatic worker-to-job schema connection
  • Real-time status monitoring

Node

A worker node that:

  • Launches workers from the registry
  • Manages worker lifecycle
  • Provides resource isolation

Key Features:

  • Worker registry integration
  • Automatic cluster registration
  • Worker process management

Worker

Executable units that:

  • Process individual jobs
  • Report status back to cluster
  • Support various execution environments

Key Features:

  • Abstract base class for custom workers
  • Built-in status tracking
  • Cluster communication
  • Environment management

Built-in Workers

EchoWorker

A simple worker that echoes messages with optional delay:

from mindtrace.cluster.workers.echo_worker import EchoWorker

# Usage
worker = EchoWorker()
result = worker._run({"message": "Hello World", "delay": 2})
# Returns: {"status": "completed", "output": {"echoed": "Hello World"}}

RunScriptWorker

A worker that executes scripts in isolated environments. For git repositories, will sync the environment using uv sync.

from mindtrace.cluster.workers.run_script_worker import RunScriptWorker

# Git environment example
job_data = {
    "environment": {
        "git": {
            "repo_url": "https://github.com/user/repo",
            "branch": "main",
            "working_dir": "scripts"
        }
    },
    "command": "python process_data.py"
}

# Docker environment example
job_data = {
    "environment": {
        "docker": {
            "image": "python:3.9",
            "working_dir": "/app",
            "volumes": {"/host/path": "/container/path"},
            "environment": {"ENV_VAR": "value"}
        }
    },
    "command": "python script.py"
}

Usage Examples

Basic Cluster Setup

from mindtrace.cluster import ClusterManager, Node
from mindtrace.jobs import JobSchema, job_from_schema

# Launch cluster manager
cluster = ClusterManager.launch(host="localhost", port=8002)

# Launch node
node = Node.launch(
    host="localhost", 
    port=8003, 
    cluster_url=str(cluster.url)
)

# Register worker type
cluster.register_worker_type(
    worker_name="myworker",
    worker_class="myapp.workers.MyWorker",
    worker_params={}
)

# Launch worker
worker_url = "http://localhost:8004"
node.launch_worker(worker_type="myworker", worker_url=worker_url)

# Submit job
job = job_from_schema(my_job_schema, input_data={"key": "value"})
result = cluster.submit_job(job)

Gateway Mode

Use ClusterManager as a gateway to route requests:

from mindtrace.cluster import ClusterManager

# Launch as gateway
gateway = ClusterManager.launch(port=8097)

# Register service
gateway.register_job_to_endpoint(
    job_type="echo_job", 
    endpoint="echo/run"
)

# Submit job (automatically routed to endpoint)
job = job_from_schema(echo_job_schema, input_data={"message": "Hello"})
result = gateway.submit_job(job)

Git-based Worker

Launch workers from Git repositories:

# Register worker from Git
cluster.register_worker_type(
    worker_name="gitworker",
    worker_class="myapp.worker.MyWorker",
    worker_params={},
    git_repo_url="https://github.com/user/worker-repo",
    git_branch="main",
    git_working_dir="worker"
)

# Launch worker (automatically clones repo)
node.launch_worker(worker_type="gitworker", worker_url="http://localhost:8005")

Configuration

Environment Variables

# Redis configuration
MINDTRACE_CLUSTER__DEFAULT_REDIS_URL=redis://localhost:6379
MINDTRACE_WORKER__DEFAULT_REDIS_URL=redis://localhost:6379

# MinIO configuration (for worker registry)
MINDTRACE_CLUSTER__MINIO_ENDPOINT=localhost:9000
MINDTRACE_CLUSTER__MINIO_ACCESS_KEY=minioadmin
MINDTRACE_CLUSTER__MINIO_SECRET_KEY=minioadmin
MINDTRACE_CLUSTER__MINIO_BUCKET=workers

Database Models

The cluster uses several database models for tracking:

  • JobStatus: Tracks job execution status and results
  • JobSchemaTargeting: Maps job types to endpoints
  • WorkerStatus: Tracks worker availability and current job
  • WorkerAutoConnect: Maps worker types to job schemas
  • WorkerStatusLocal: Local worker status tracking

API Reference

Since these are Services, the normal way to use them is via a ConnectionManager, but they can also be accessed dirctly.

ClusterManager Endpoints

  • POST /submit_job - Submit a job for execution
  • POST /register_job_to_endpoint - Route job type to specific endpoint
  • POST /register_job_to_worker - Connect job type to worker
  • POST /get_job_status - Get job execution status
  • POST /register_worker_type - Register new worker type
  • POST /launch_worker - Launch worker on node, automatically connecting it to the Orchestrator if it's registered to a job type
  • POST /get_worker_status - Get worker status
  • POST /query_worker_status - Query live worker status

Node Endpoints

  • POST /launch_worker - Launch worker from registry

Worker Endpoints

  • POST /run - Execute a job
  • POST /connect_to_cluster - Connect to cluster orchestrator
  • POST /get_status - Get worker status
  • POST /start - Initialize worker

Examples

See the samples/cluster/ directory for examples:

  • cluster_as_gateway.py - Using ClusterManager as a gateway
  • cluster_with_node.py - Basic cluster with node setup
  • cluster_with_node_autoregister.py - Automatic worker registration
  • run_script/ - RunScriptWorker examples
  • multiprocess/ - Multi-process cluster examples
  • separate_node/ - Distributed node examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_cluster-0.4.0.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_cluster-0.4.0-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_cluster-0.4.0.tar.gz.

File metadata

  • Download URL: mindtrace_cluster-0.4.0.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for mindtrace_cluster-0.4.0.tar.gz
Algorithm Hash digest
SHA256 72c15f2d09a7677ee29c66469e4f6fc2c487c76f6d296c1d8e4b6548fd8116ca
MD5 9fd6b4642b64f65e0e36fa43e517a914
BLAKE2b-256 9c5e8e0d0a0c193ff48629d64268dd93496f4534bfca1ff49a8205c301d64764

See more details on using hashes here.

File details

Details for the file mindtrace_cluster-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mindtrace_cluster-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 30a0efb2df74904faa33f847b923a0b96b569203b559d2ba80def0e3887b47ad
MD5 7c1d2f455ab00acebb7d91bd91c302af
BLAKE2b-256 35b7abbfb1193a95ac46bc78a0e76fd228fc297ecce0fe028e91ec4016393be8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page