Skip to main content

Distributed task execution framework

Project description

Logo

Distry

Distributed task execution framework. Scale your Python functions across multiple workers.

Features

  • Zero-config setup - Auto-detects and installs dependencies
  • Simple API - Just client.map(func, inputs)
  • Fault-tolerant - Handles worker failures gracefully
  • Automatic Job Batching - Large jobs are automatically split to fit worker RAM limits.
  • Package management - Installs required packages on workers
  • Global indexing - Results returned in input order

Installation

# Client only (for task submission)
pip install distry-py[client]

# Worker only (for task execution)
pip install distry-py[worker]

# Full installation
pip install distry-py[all]

Quick Start

1. Start Workers

# Terminal 1 - Worker 1
distry-worker --port 8001

# Terminal 2 - Worker 2 (with RAM limit)
distry-worker --port 8002 --max-ram 2g

2. Run Tasks

from distry import Client

# Connect to workers
client = Client(["http://127.0.0.1:8001", "http://127.0.0.1:8002"])

# Define function (any Python function works!)
import numpy as np

def process_data(x):
    return np.mean([x, x**2, x**3])

# Process inputs in parallel
results = client.map(process_data, [1, 2, 3, 4, 5])

print(results)
# [1.0, 6.0, 19.0, 40.0, 69.0]

client.close()

2b. Using the Decorator (for single function calls)

For simpler cases where you want to execute a single function call on a worker, you can use the @distry decorator.

from distry import register_workers, distry
import numpy as np

# Connect to workers
register_workers(["http://127.0.0.1:8001", "http://127.0.0.1:8002"])

@distry
def process_data(x, power=2):
    return np.mean([x, x**power])

# Process a single input on a randomly selected worker
result = process_data(10)
print(result)
# 55.0

# With keyword arguments
result_power_3 = process_data(10, power=3)
print(result_power_3)
# 505.0

3. Advanced Usage

from distry import Client

client = Client(worker_urls)

# Custom packages (optional - auto-detection works too)
def scipy_func(x):
    from scipy.special import factorial
    return float(factorial(x))

results = client.map(
    scipy_func,
    [1, 2, 3, 4],
    required_packages=['scipy'],  # Manual specification
    max_workers=2
)

# Results with error handling
def risky_func(x):
    if x == 3:
        raise ValueError("Oops!")
    return x * 2

results = client.map(risky_func, [1, 2, 3, 4])  # [2, 4, None, 8]

client.close()

API Reference

Client

from distry import Client

client = Client(worker_urls, max_concurrent_jobs=10)

# Map function across inputs
results = client.map(
    func,           # Any Python function
    inputs,         # List of inputs
    max_workers=4,  # Limit concurrent workers
    timeout=60,     # Timeout per input
    required_packages=None  # Auto-detected
)

# Cluster status
status = client.get_cluster_status()

client.close()

Worker

from distry import WorkerServer

# Programmatic worker
server = WorkerServer(host="0.0.0.0", port=8000)
server.run()

# Or use CLI
# distry-worker --host 0.0.0.0 --port 8000

CLI

# Start worker
distry-worker --help
distry-worker --host 0.0.0.0 --port 8000 --max-ram 4g

# The client will automatically split large jobs into batches
# to fit the worker's RAM limit.

# View worker endpoints
# GET /health
# GET /status
# GET /installed_packages

What Happens Under the Hood?

  1. Function Analysis: Auto-detects imports from your function
  2. Package Installation: Installs missing packages on workers
  3. Task Distribution: Splits inputs across available workers
  4. Result Collection: Gathers results with global indexing
  5. Error Handling: Failed inputs return None, others succeed

Use Cases

  • Data Processing: Apply functions to large datasets
  • ML Inference: Scale model predictions across workers
  • API Calls: Parallelize HTTP requests
  • Computational Tasks: CPU-intensive calculations
  • Batch Processing: Process files, images, or documents

Limitations

  • Single function per job (no complex workflows)
  • 30s timeout per input (configurable)
  • Synchronous function execution on workers
  • Basic package management (no virtual environments)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distry_py-0.2.0.tar.gz (79.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distry_py-0.2.0-py3-none-any.whl (28.6 kB view details)

Uploaded Python 3

File details

Details for the file distry_py-0.2.0.tar.gz.

File metadata

  • Download URL: distry_py-0.2.0.tar.gz
  • Upload date:
  • Size: 79.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for distry_py-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a425b3b25926438d7367bb8adbed65dcc10584ee220c6bcaa02fff8141710a38
MD5 6421f3bb281a74a00e470e1e6957e04a
BLAKE2b-256 d045fe4a7191482af7841275c7010b9cd4e12c203d466904571940e359970fc3

See more details on using hashes here.

File details

Details for the file distry_py-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: distry_py-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 28.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for distry_py-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f0425a6c5e7ee656c5ba44999ff2ea3b8269575530682f41d3bb6f643504d5e6
MD5 9b92dfd2d19796bfa40fffaf5f38bd36
BLAKE2b-256 1f926537f29b39967630e028fdf5cff81dab386f3c0bfb0bc1295be006fd5991

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page