Skip to main content

No project description provided

Project description

SSEC-JHU dplutils

CI Status Documentation Status codecov Security Status PyPI

SSEC-JHU Logo

About

This package provides python utilities to define and execute graphs of tasks that operate on and produce dataframes in a batched-streaming manner. The primary aims are as follows:

  • Operate on an indefinite stream of batches of input data.
  • Execute tasks in a distributed fashion using configurable execution backends (e.g. Ray).
  • Schedule resources on a per-task basis.
  • Configurable batching: enable co-location for high-overhead tasks or maximal spread for resource intensive tasks.
  • Provide a simple interface to create tasks and insert them into a pipeline.
  • Provide validations to help ensure tasks are correctly configured prior to execution (potentially with high latency).

Discovery pipelines that generate input samples on-the-fly are particularly well suited for this package, though input can be taken from any source, including on-disk tables.

This package also includes utilities for observing pipelines using metrics tools such as mlflow or aim, and provides functionality for making simple CLI interfaces from pipeline definitions.

Quick Start

Getting up and running is easy: simply install the package:

pip install dplutils

Then define the tasks, connect them into a pipeline, and then iterate over the output dataframes:

import numpy as np
from dplutils.pipeline import PipelineGraph, PipelineTask

# Definitions of task code - note all take dataframe as first argument and return a dataframe
def generate_sample(df):
  return df.assign(sample = np.random.random(len(df)))

def round_sample(df, decimals=1):
  return df.assign(rounded = df['sample'].round(decimals))

def calc_residual(df):
  return df.assign(residual = df['rounded'] - df['sample'])

# Connect them together in an execution graph (along with execution metadata)
pipeline = PipelineGraph([
  PipelineTask('generate', generate_sample),
  PipelineTask('round', round_sample, batch_size=10),
  PipelineTask('resid', calc_residual, num_cpus=2),
])

# Run the tasks and iterate over the outputs, here using the Ray execution framework
from dplutils.pipeline.ray import RayStreamGraphExecutor

executor = RayStreamGraphExecutor(pipeline).set_config('round.kwargs.decimals', 2)
for result_batch in executor.run():
  print(result_batch)
  break  # otherwise it will - by design - run indefinitely!

As an alternative to iterating over batches directly as above, we can use the CLI utilities to run the given executor as a tool. The helper arranges for all options so we just need to define the desired executor:

executor = RayStreamGraphExecutor(pipeline)

if __name__ == '__main__':
  cli_run(executor)

then run our module with parameters as needed. The CLI based run will write the output to a parquet table at a specified location (below assumes code is in ourmodule.py):

python -m ourmodule -o /path/to/outdir --set-config round.batch_size=5

The above is of course a trivial example to demonstrate the structure and simplicity of defining pipelines and how they operate on configurable-sized batches represented as dataframes. For more information on defining tasks, their inputs, seeding the input tasks, more complex graph structures and distributed execution, among other topics, see the documentation at: https://dplutils.readthedocs.io/en/stable/.

Scaling out

One of the goals of this project simplify the scaling out of connected tasks on a variety of systems. PipelineExecutors PipelineExecutor are responsible for this - this package provides a framework for adding executors based on appropriate underlying scheduling/execution systems and provides some implementations, for example using Ray RayStreamGraphExecutor. Setup required to properly configure an executor for scaling depends on the backend used, for example ray relies on having a cluster previously bootstrapped (see https://docs.ray.io/en/latest/cluster/getting-started.html), though can operate locally without any prior setup.

Resource specifications

Another primary goal is to arrange for resource dependencies to be met by the execution environment for a particular task. Resources such as number of CPUs or GPUs are natural targets and supported by many systems. We also want to support arbitrary resource requests, for example if a task requires a large local database on fast disks, this might be available only on one node in a cluster. A custom resource can be used to ensure that batches of a particular task always execute on the environment that has that resource.

This ability depends on the executor backend to support it, so executors implementations typically only make sense for such systems - of which Ray is one.

For instance, if a task required a database as described above, the task might be defined in the following manner:

PipelineTask('bigdboperation', function_needs_big_db, resources={'bigdb': 1})

And if using the Ray executor, then at least one worker in the cluster that has local fast disks with the big database resident would be started similar to:

ray start --address {head-ip} --resources '{"bigdb": 1}'

In other execution systems, the worker might be started in a different manner, but the task definition could remain as-is, enabling easy swapping of the execution environment depending on the situation.

Building and Development

If you need to make modifications to the source code, follow the steps below to get the source and run tests. The process is simple and we use Tox to manage test and build environments. We welcome any contributions, please open a pull request!

Setup

Clone this repository locally:

git clone https://github.com/ssec-jhu/dplutils.git
cd dplutils

Install dependencies:

pip install -r requirements/dev.txt

Tests

Run tox:

  • tox -e test, to run just the tests
  • tox, to run linting, tests and build. This should be run without errors prior to commit

Docker

From the repo directory, run

docker build -f docker/Dockerfile --tag dplutils .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dplutils-0.8.2.tar.gz (173.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dplutils-0.8.2-py3-none-any.whl (37.5 kB view details)

Uploaded Python 3

File details

Details for the file dplutils-0.8.2.tar.gz.

File metadata

  • Download URL: dplutils-0.8.2.tar.gz
  • Upload date:
  • Size: 173.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dplutils-0.8.2.tar.gz
Algorithm Hash digest
SHA256 e2cb1d0d2f20fc6f5bd8752f25626553cf7097cd83cf5735105fd490996b40d5
MD5 25dce04c2cba9618ea3a770324af43cf
BLAKE2b-256 0ac3a612a74ced564df4c1f3b389d99c9b86f3be962eb5ff0388e4292dd219c1

See more details on using hashes here.

Provenance

The following attestation bundles were made for dplutils-0.8.2.tar.gz:

Publisher: dist.yml on ssec-jhu/dplutils

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dplutils-0.8.2-py3-none-any.whl.

File metadata

  • Download URL: dplutils-0.8.2-py3-none-any.whl
  • Upload date:
  • Size: 37.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dplutils-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 dc827b8c2d598bfb88c8910e6c72c11b85a844ba63e1ad9556e689e2705631b2
MD5 a9cd1696bb9da1b5aecdb643d58bebcb
BLAKE2b-256 04402bedb57c0f5bf839b8737f50844589c772de82466766f83367865fcd9ce9

See more details on using hashes here.

Provenance

The following attestation bundles were made for dplutils-0.8.2-py3-none-any.whl:

Publisher: dist.yml on ssec-jhu/dplutils

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page