Skip to main content

Distributed task execution framework

Project description

Logo

Distry

Distributed task execution framework. Scale your Python functions across multiple workers.

Features

  • Zero-config setup - Auto-detects and installs dependencies
  • Simple API - Just client.map(func, inputs)
  • Fault-tolerant - Handles worker failures gracefully
  • Automatic Job Batching - Large jobs are automatically split to fit worker RAM limits.
  • Package management - Installs required packages on workers
  • Global indexing - Results returned in input order

Installation

# Client only (for task submission)
pip install distry-py[client]

# Worker only (for task execution)
pip install distry-py[worker]

# Full installation
pip install distry-py[all]

Quick Start

1. Start Workers

# Terminal 1 - Worker 1
distry-worker --port 8001

# Terminal 2 - Worker 2 (with RAM limit)
distry-worker --port 8002 --max-ram 2g

2. Run Tasks

from distry import Client

# Connect to workers
client = Client(["http://127.0.0.1:8001", "http://127.0.0.1:8002"])

# Define function (any Python function works!)
import numpy as np

def process_data(x):
    return np.mean([x, x**2, x**3])

# Process inputs in parallel
results = client.map(process_data, [1, 2, 3, 4, 5])

print(results)
# [1.0, 6.0, 19.0, 40.0, 69.0]

client.close()

2b. Using the Decorator (for single function calls)

For simpler cases where you want to execute a single function call on a worker, you can use the @distry decorator.

from distry import register_workers, distry
import numpy as np

# Connect to workers
register_workers(["http://127.0.0.1:8001", "http://127.0.0.1:8002"])

@distry
def process_data(x, power=2):
    return np.mean([x, x**power])

# Process a single input on a randomly selected worker
result = process_data(10)
print(result)
# 55.0

# With keyword arguments
result_power_3 = process_data(10, power=3)
print(result_power_3)
# 505.0

3. Advanced Usage

from distry import Client

client = Client(worker_urls)

# Custom packages (optional - auto-detection works too)
def scipy_func(x):
    from scipy.special import factorial
    return float(factorial(x))

results = client.map(
    scipy_func,
    [1, 2, 3, 4],
    required_packages=['scipy'],  # Manual specification
    max_workers=2
)

# Results with error handling
def risky_func(x):
    if x == 3:
        raise ValueError("Oops!")
    return x * 2

results = client.map(risky_func, [1, 2, 3, 4])  # [2, 4, None, 8]

client.close()

API Reference

Client

from distry import Client

client = Client(worker_urls, max_concurrent_jobs=10)

# Map function across inputs
results = client.map(
    func,           # Any Python function
    inputs,         # List of inputs
    max_workers=4,  # Limit concurrent workers
    timeout=60,     # Timeout per input
    required_packages=None  # Auto-detected
)

# Cluster status
status = client.get_cluster_status()

client.close()

Worker

from distry import WorkerServer

# Programmatic worker
server = WorkerServer(host="0.0.0.0", port=8000)
server.run()

# Or use CLI
# distry-worker --host 0.0.0.0 --port 8000

CLI

# Start worker
distry-worker --help
distry-worker --host 0.0.0.0 --port 8000 --max-ram 4g

# The client will automatically split large jobs into batches
# to fit the worker's RAM limit.

# View worker endpoints
# GET /health
# GET /status
# GET /installed_packages

What Happens Under the Hood?

  1. Function Analysis: Auto-detects imports from your function
  2. Package Installation: Installs missing packages on workers
  3. Task Distribution: Splits inputs across available workers
  4. Result Collection: Gathers results with global indexing
  5. Error Handling: Failed inputs return None, others succeed

Use Cases

  • Data Processing: Apply functions to large datasets
  • ML Inference: Scale model predictions across workers
  • API Calls: Parallelize HTTP requests
  • Computational Tasks: CPU-intensive calculations
  • Batch Processing: Process files, images, or documents

Limitations

  • Single function per job (no complex workflows)
  • 30s timeout per input (configurable)
  • Synchronous function execution on workers
  • Basic package management (no virtual environments)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distry_py-0.1.2.tar.gz (29.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distry_py-0.1.2-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file distry_py-0.1.2.tar.gz.

File metadata

  • Download URL: distry_py-0.1.2.tar.gz
  • Upload date:
  • Size: 29.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for distry_py-0.1.2.tar.gz
Algorithm Hash digest
SHA256 fa7f366146cc5e3edb52fed684890cb93b476e3d93cde2c2ff4415ecaa1383e6
MD5 2509e7daa76f41b6e5266679d7b8fd25
BLAKE2b-256 76511d0b21994464b3364c96a2bb871bf06bd28da57562239d10fd6c133c3aa8

See more details on using hashes here.

File details

Details for the file distry_py-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: distry_py-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 12.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for distry_py-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7bb89020a4e7fc4b1df39acb3d03ddcc6961d1a6ef9c9ae4f4114e2147592d98
MD5 919c99e0dc5d18934fae82062faf9ff1
BLAKE2b-256 9adc59ee3fd43ad222fca099b53266fb8b81279b6b1d36f1bd45e21f61109a8a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page