A flexible framework for building and executing command pipelines in Python
Project description
Piperun - Command Pipeline Framework for Python
piperun is a flexible framework for building and executing command pipelines in Python. It provides tools for running shell commands, creating task pipelines, and executing operations in parallel.
Overview
piperun simplifies the process of building complex command workflows by providing:
- Command construction and execution with detailed control
- Sequential pipeline execution
- Parallel processing capabilities
- Integration with Dask for distributed computation
piperun was designed with the following principles in mind:
- Simplicity: Easy to use with intuitive interfaces
- Flexibility: Works with any callable or command
- Composability: Build complex workflows from simple pieces
- Performance: Efficient execution with parallel processing capabilities
- Control: Detailed control over execution flow
Installation
Install piperun using pip:
pip install piperun
Install from source in editable mode:
git clone https://github.com/franioli/piperun.git
cd piperun
pip install -e .
Core Components
Command
The Command class provides an intuitive interface for constructing and executing shell commands:
from piperun import Command
# Create a simple command
cmd = Command("ls -l")
cmd.run()
# Add arguments dynamically
cmd = Command("parallel_stereo")
cmd.extend("image1.tif", "image2.tif", t="rpc", max_level=2)
cmd.run()
Key features:
- Handles both positional and keyword arguments
- Converts Python arguments to command-line format
- Provides execution timing
- Captures command output
- Supports boolean flags and various parameter formats
Pipeline
The Pipeline class enables chaining multiple processing steps:
from piperun import Pipeline, Command
# Create a pipeline
pipeline = Pipeline()
# Add steps
pipeline.add_step(Command("mkdir -p output"))
pipeline.add_step(Command("convert input.jpg output/output.png"))
# Execute all steps
pipeline.run()
Features:
- Sequential execution of steps
- Control for running specific steps or ranges
- Support for any step with a
run()method - Nested pipeline capability
DelayedTask
The DelayedTask class integrates with Dask for delayed execution:
from piperun import DelayedTask
# Create a delayed task
def process_data(x):
return x * 2
task = DelayedTask(process_data, 10)
result = task.compute() # Executes when needed
Features:
- Lazy evaluation of tasks
- Execution timing measurement
- Visualization of task graphs
ParallelBlock
The ParallelBlock class enables concurrent execution of multiple steps:
from piperun import ParallelBlock, Command
# Create commands
commands = [
Command(f"process_file {i}.txt")
for i in range(10)
]
# Run in parallel
with ParallelBlock(commands, workers=4) as block:
block.run()
Features:
- Automatic Dask cluster management
- Configurable worker count
- Support for both Command and DelayedTask objects
Advanced Usage
Pipeline Composition
Pipelines can be composed of various step types, including other pipelines:
# Create nested pipelines
preprocessing = Pipeline([
Command("clean_data input.csv"),
Command("validate_data input.csv")
])
processing = Pipeline([
Command("process_data input.csv output.csv")
])
# Combine pipelines
main_pipeline = Pipeline()
main_pipeline.add_step(preprocessing)
main_pipeline.add_step(processing)
main_pipeline.run()
Parallel Execution with Custom Worker Count
# Run operations in parallel with custom worker count
parallel_tasks = ParallelBlock(workers=8)
for file in input_files:
parallel_tasks.add_step(Command(f"process {file}"))
parallel_tasks.run()
Flow Control in Pipelines
# Run specific pipeline segments
pipeline = Pipeline([...])
# Run only step 3
pipeline.run_step(3)
# Run from step 2 to the end
pipeline.run_from_step(2)
# Run up to step 4 (not including step 4)
pipeline.run_until_step(4)
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file piperun-0.1.1.tar.gz.
File metadata
- Download URL: piperun-0.1.1.tar.gz
- Upload date:
- Size: 17.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b6523cb2e9d2c347cdb9eebec8c115f1e9a861c923301f402ac4c87225ac638
|
|
| MD5 |
6208a2cbb77cb057ac9147045755bc80
|
|
| BLAKE2b-256 |
48f67378b5f305c57f681a34f86d29d50f510ecf7b1f76434715abfbb261b1b2
|
Provenance
The following attestation bundles were made for piperun-0.1.1.tar.gz:
Publisher:
python-publish.yml on franioli/piperun
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
piperun-0.1.1.tar.gz -
Subject digest:
2b6523cb2e9d2c347cdb9eebec8c115f1e9a861c923301f402ac4c87225ac638 - Sigstore transparency entry: 176927486
- Sigstore integration time:
-
Permalink:
franioli/piperun@fa6d787e86e7e36f4283e2eefe80e21c23a0ad3e -
Branch / Tag:
refs/tags/0.1.1 - Owner: https://github.com/franioli
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@fa6d787e86e7e36f4283e2eefe80e21c23a0ad3e -
Trigger Event:
release
-
Statement type:
File details
Details for the file piperun-0.1.1-py3-none-any.whl.
File metadata
- Download URL: piperun-0.1.1-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
982679f92b3f47832497c63d0ecb96614aca5cc88df64de335cafd260540450d
|
|
| MD5 |
3d1af4621839f83e832699c138075911
|
|
| BLAKE2b-256 |
0f7f3aecd177481ae8664a13818944f3d9df9563f38f8a5c8bdbfed3d74a5149
|
Provenance
The following attestation bundles were made for piperun-0.1.1-py3-none-any.whl:
Publisher:
python-publish.yml on franioli/piperun
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
piperun-0.1.1-py3-none-any.whl -
Subject digest:
982679f92b3f47832497c63d0ecb96614aca5cc88df64de335cafd260540450d - Sigstore transparency entry: 176927488
- Sigstore integration time:
-
Permalink:
franioli/piperun@fa6d787e86e7e36f4283e2eefe80e21c23a0ad3e -
Branch / Tag:
refs/tags/0.1.1 - Owner: https://github.com/franioli
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@fa6d787e86e7e36f4283e2eefe80e21c23a0ad3e -
Trigger Event:
release
-
Statement type: