Skip to main content

Simple library to start a Dask cluster on Kubernetes

Project description

dask_k8

Create Dask clusters in Kubernetes easily.

The aim of this package is to be able to start a Dask client from outside of a Kubernetes cluster connecting to a Dask scheduler/workers running inside of a Kubernetes cluster.

The dashboard of the dask scheduler running inside Kubernetes is accessible, the corresponding url is printed after the cluster creation.

First ensure you have proper Kubernetes access, try running kubectl get pods for instance.

Installation

pip install dask_k8

Example usage

from dask_k8 import DaskCluster

# Use a kubernetes namespace where you have the proper rights, the cluster_id is to distinguish between possible different clusters
cluster = DaskCluster(namespace="dhlab", cluster_id="seguin-0")

# Initialize cluster
cluster.create()
# Get a dask.distributed.Client
dask_client = cluster.make_dask_client()
# Increase/decrease the number of workers
cluster.scale(40, blocking=True)  # Will block until all the workers are effectively connected to the scheduler

# Do the computation
dask_client.compute(...)

# IMPORTANT: Release the kubernetes resources, it is not done automatically
cluster.close()

In order not to forget to release the resources, the following can be done:

from dask_k8 import DaskCluster
from dask.diagnostics import progress
from dask.distributed import wait

cluster = DaskCluster(namespace="dhlab", cluster_id="seguin-0")

with cluster:
    dask_client = cluster.make_dask_client()  # Waits for the scheduler to be started
    cluster.scale(40)  # Waits for the workers to be started
    # Compute
    dask_client.compute(..., sync=True)
    # Or
    future = dask_client.compute(...)
    progress(future)
    wait(future)

The corresponding output is:

Scheduler: tcp://10.90.47.7:31791
Dashboard: http://10.90.47.7:7062
Could not connect to scheduler, retrying...
Could not connect to scheduler, retrying...
Currently 0 workers out of the 40 required, waiting...
Currently 13 workers out of the 40 required, waiting...
Currently 21 workers out of the 40 required, waiting...
Currently 32 workers out of the 40 required, waiting...
Currently 33 workers out of the 40 required, waiting...
Currently 33 workers out of the 40 required, waiting...
Currently 34 workers out of the 40 required, waiting...
Reached the desired 40 workers!

Specifying the workers/scheduler specifications

Arbitrary pod specification can be given both for the scheduler and the worker.

from dask_k8 import DaskCluster

cluster = DaskCluster(namespace="dhlab", cluster_id="seguin-0", worker_pod_spec="""
  containers:
    - image: daskdev/dask:latest
      args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '1', --no-bokeh, --memory-limit, 4GB, --death-timeout, '60']
      imagePullPolicy: Always
      name: dask-worker
      env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: EXTRA_PIP_PACKAGES
          value: s3fs
        - name: EXTRA_CONDA_PACKAGES
          value:
      resources:
        requests:
          cpu: 1
          memory: "4G"
        limits:
          cpu: 1
          memory: "4G"
""")

How does it work?

  • Kubernetes services are started to connect the dask scheduler and its dashboard to the outside of the Kubernetes cluster. They can be seen with kubectl get svc when DaskCluster is running (after calling .create()).
  • Two Kubernetes deployments are created, one for the scheduler and one for the workers. They can be seen with kubectl get deployments.
  • The corresponding pods are automatically managed by Kubernetes and their states can be seen with kubectl get pods.

Project

The 'impresso - Media Monitoring of the Past' project is funded by the Swiss National Science Foundation (SNSF) under grant number CRSII5_173719 (Sinergia program). The project aims at developing tools to process and explore large-scale collections of historical newspapers, and at studying the impact of this new tooling on historical research practices. More information at https://impresso-project.ch.

License

Copyright (C) 2020 The impresso team. Contributors to this program include: Benoit Seguin.

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but without any warranty; without even the implied warranty of merchantability or fitness for a particular purpose. See the GNU Affero General Public License for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dask-k8-0.1.1.tar.gz (5.8 kB view details)

Uploaded Source

File details

Details for the file dask-k8-0.1.1.tar.gz.

File metadata

  • Download URL: dask-k8-0.1.1.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.0 requests/2.24.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.51.0 CPython/3.6.7

File hashes

Hashes for dask-k8-0.1.1.tar.gz
Algorithm Hash digest
SHA256 530db86a4ff238234121c455f58abf0b272d905e40746bbd83d696dda38748d4
MD5 d2fe9aa844e34ee70c9c795ecb7777a7
BLAKE2b-256 a9afee9ff6f6969534a33c3c0b567c03cdfe44ede0fd3cfa3a03edef381fc3af

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page