This library allows for the easy construction and management of Dask clusters.
Project description
Distributed Runner
This library allows for the easy construction and management of ephemeral Dask clusters on AWS via a simple context manager. This allows you to perform distributed and parallelized computations with ease.
Installation
pip install distrunner
Usage
In your scheduler (Airfow etc.) use something like this:
from distrunner import DistRunner
import whatismyip
def main():
with DistRunner(workers=10) as cldr:
results = cldr.client.map(print_ip, range(10))
outcome = cldr.client.gather(results)
print(outcome)
def print_ip(x):
return f"My IP address is {whatismyip.whatismyip()}"
The "local" flag will determine whether a remote cluster is created. For example, the following will all run locally instead of spinning up infrastructure:
from distrunner import DistRunner
import whatismyip
def main():
with DistRunner(workers=10, local=True) as cldr:
results = cldr.client.map(print_ip, range(10))
outcome = cldr.client.gather(results)
print(outcome)
def print_ip(x):
return f"My IP address is {whatismyip.whatismyip()}"
You will need to set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables to use the Fargate clusters (or run from an environment with an authorised IAM role).
Running in Managed Workflows for Apache Airflow (MWAA)
Running inside an AWS MWAA environment requires a little more setup than running locally. This is because of the way that requirements are handled in Airflow. You also need to ensure that any called functions are nested underneath the task in question. The above examples, refactored for Airflow, could read:
REQUIREMENTS = [
"distrunner>=1.3.0",
"whatismyip,
"coiled",
"dask[complete]",
]
@dag(
default_args=DEFAULT_ARGS,
schedule_interval="@daily",
catchup=False,
dagrun_timeout=timedelta(hours=16),
start_date=datetime(2023, 4, 16),
tags=["api"],
)
def main_task():
@task.virtualenv(
task_id="main_task",
requirements=REQUIREMENTS,
system_site_packages=True,
)
def entry_point(requirements):
def print_ip(x):
return f"My IP address is {whatismyip.whatismyip()}"
from distrunner.distrunner import DistRunner
with DistRunner(
workers=1,
requirements=requirements,
application_name="test_application",
) as cldr:
results = cldr.client.map(snapshot_routes_body, range(1))
cldr.client.gather(results)
entry_point(REQUIREMENTS)
main_task()
Features
- Context manager handling of Dask Fargate clusters with scale-to-zero on complete
- Easy ability to switch between local and distributed/remote development
What it Does
This library allows you to easily run functions across a Dask cluster.
Credits
Copyright © Crossref 2023
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for distrunner-1.4.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 144a6d6cd8a22e5fd0f491d7e7376af5e2b055e3da899dffd4951d8bb6dbc234 |
|
MD5 | a59a24c14c0a835d30c65c512978c3fc |
|
BLAKE2b-256 | 386774cdc3e78895a77ecfc2dfb168e198ae9cbf99dd67bd7279d3c55f8f330a |