No project description provided
Project description
SSEC-JHU dplutils
About
This package provides python utilities to define and execute graphs of tasks that operate on and produce dataframes in a batched-streaming manner. The primary aims are as follows:
- Operate on an indefinite stream of batches of input data.
- Execute tasks in a distributed fashion using configurable execution backends (e.g. Ray).
- Schedule resources on a per-task basis.
- Configurable batching: enable co-location for high-overhead tasks or maximal spread for resource intensive tasks.
- Provide a simple interface to create tasks and insert them into a pipeline.
- Provide validations to help ensure tasks are correctly configured prior to execution (potentially with high latency).
Discovery pipelines that generate input samples on-the-fly are particularly well suited for this package, though input can be taken from any source, including on-disk tables.
This package also includes utilities for observing pipelines using metrics tools such as mlflow or aim, and provides functionality for making simple CLI interfaces from pipeline definitions.
Quick Start
Getting up and running is easy: simply install the package:
pip install dplutils
Then define the tasks, connect them into a pipeline, and then iterate over the output dataframes:
import numpy as np
from dplutils.pipeline import PipelineGraph, PipelineTask
# Definitions of task code - note all take dataframe as first argument and return a dataframe
def generate_sample(df):
return df.assign(sample = np.random.random(len(df)))
def round_sample(df, decimals=1):
return df.assign(rounded = df['sample'].round(decimals))
def calc_residual(df):
return df.assign(residual = df['rounded'] - df['sample'])
# Connect them together in an execution graph (along with execution metadata)
pipeline = PipelineGraph([
PipelineTask('generate', generate_sample),
PipelineTask('round', round_sample, batch_size=10),
PipelineTask('resid', calc_residual, num_cpus=2),
])
# Run the tasks and iterate over the outputs, here using the Ray execution framework
from dplutils.pipeline.ray import RayStreamGraphExecutor
executor = RayStreamGraphExecutor(pipeline).set_config('round.kwargs.decimals', 2)
for result_batch in executor.run():
print(result_batch)
break # otherwise it will - by design - run indefinitely!
As an alternative to iterating over batches directly as above, we can use the CLI utilities to run the given executor as a tool. The helper arranges for all options so we just need to define the desired executor:
executor = RayStreamGraphExecutor(pipeline)
if __name__ == '__main__':
cli_run(executor)
then run our module with parameters as needed. The CLI based run will write the
output to a parquet table at a specified location (below assumes code is in
ourmodule.py):
python -m ourmodule -o /path/to/outdir --set-config round.batch_size=5
The above is of course a trivial example to demonstrate the structure and simplicity of defining pipelines and how they operate on configurable-sized batches represented as dataframes. For more information on defining tasks, their inputs, seeding the input tasks, more complex graph structures and distributed execution, among other topics, see the documentation at: https://dplutils.readthedocs.io/en/stable/.
Scaling out
One of the goals of this project simplify the scaling out of connected tasks on a variety of systems. PipelineExecutors PipelineExecutor are responsible for this - this package provides a framework for adding executors based on appropriate underlying scheduling/execution systems and provides some implementations, for example using Ray RayStreamGraphExecutor. Setup required to properly configure an executor for scaling depends on the backend used, for example ray relies on having a cluster previously bootstrapped (see https://docs.ray.io/en/latest/cluster/getting-started.html), though can operate locally without any prior setup.
Resource specifications
Another primary goal is to arrange for resource dependencies to be met by the execution environment for a particular task. Resources such as number of CPUs or GPUs are natural targets and supported by many systems. We also want to support arbitrary resource requests, for example if a task requires a large local database on fast disks, this might be available only on one node in a cluster. A custom resource can be used to ensure that batches of a particular task always execute on the environment that has that resource.
This ability depends on the executor backend to support it, so executors implementations typically only make sense for such systems - of which Ray is one.
For instance, if a task required a database as described above, the task might be defined in the following manner:
PipelineTask('bigdboperation', function_needs_big_db, resources={'bigdb': 1})
And if using the Ray executor, then at least one worker in the cluster that has local fast disks with the big database resident would be started similar to:
ray start --address {head-ip} --resources '{"bigdb": 1}'
In other execution systems, the worker might be started in a different manner, but the task definition could remain as-is, enabling easy swapping of the execution environment depending on the situation.
Building and Development
If you need to make modifications to the source code, follow the steps below to get the source and run tests. The process is simple and we use Tox to manage test and build environments. We welcome any contributions, please open a pull request!
Setup
Clone this repository locally:
git clone https://github.com/ssec-jhu/dplutils.git
cd dplutils
Install dependencies:
pip install -r requirements/dev.txt
Tests
Run tox:
tox -e test, to run just the teststox, to run linting, tests and build. This should be run without errors prior to commit
Docker
From the repo directory, run
docker build -f docker/Dockerfile --tag dplutils .
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dplutils-0.8.2.tar.gz.
File metadata
- Download URL: dplutils-0.8.2.tar.gz
- Upload date:
- Size: 173.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2cb1d0d2f20fc6f5bd8752f25626553cf7097cd83cf5735105fd490996b40d5
|
|
| MD5 |
25dce04c2cba9618ea3a770324af43cf
|
|
| BLAKE2b-256 |
0ac3a612a74ced564df4c1f3b389d99c9b86f3be962eb5ff0388e4292dd219c1
|
Provenance
The following attestation bundles were made for dplutils-0.8.2.tar.gz:
Publisher:
dist.yml on ssec-jhu/dplutils
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dplutils-0.8.2.tar.gz -
Subject digest:
e2cb1d0d2f20fc6f5bd8752f25626553cf7097cd83cf5735105fd490996b40d5 - Sigstore transparency entry: 843338098
- Sigstore integration time:
-
Permalink:
ssec-jhu/dplutils@6209179b7af178017756d0d0ce62014381d78afc -
Branch / Tag:
refs/tags/v0.8.2 - Owner: https://github.com/ssec-jhu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
dist.yml@6209179b7af178017756d0d0ce62014381d78afc -
Trigger Event:
release
-
Statement type:
File details
Details for the file dplutils-0.8.2-py3-none-any.whl.
File metadata
- Download URL: dplutils-0.8.2-py3-none-any.whl
- Upload date:
- Size: 37.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc827b8c2d598bfb88c8910e6c72c11b85a844ba63e1ad9556e689e2705631b2
|
|
| MD5 |
a9cd1696bb9da1b5aecdb643d58bebcb
|
|
| BLAKE2b-256 |
04402bedb57c0f5bf839b8737f50844589c772de82466766f83367865fcd9ce9
|
Provenance
The following attestation bundles were made for dplutils-0.8.2-py3-none-any.whl:
Publisher:
dist.yml on ssec-jhu/dplutils
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dplutils-0.8.2-py3-none-any.whl -
Subject digest:
dc827b8c2d598bfb88c8910e6c72c11b85a844ba63e1ad9556e689e2705631b2 - Sigstore transparency entry: 843338100
- Sigstore integration time:
-
Permalink:
ssec-jhu/dplutils@6209179b7af178017756d0d0ce62014381d78afc -
Branch / Tag:
refs/tags/v0.8.2 - Owner: https://github.com/ssec-jhu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
dist.yml@6209179b7af178017756d0d0ce62014381d78afc -
Trigger Event:
release
-
Statement type: