A custom pipeline library for data transformations
Project description
PipelinePy - Flexible Data Transformation Library
PipelinePy is a flexible library designed to streamline the process of data transformation within an application using a declarative approach. By providing an easy-to-use interface for setting up sequences of data transformations, it allows users to define what should happen to their data without needing to manage how the data is processed explicitly. Each transformation in the pipeline can be tailored with contextual logic, supporting complex data handling strategies in a clear and concise manner.
Key Features
- Declarative Data Transformations: Specify your data processing logic declaratively, making your code more readable and maintainable.
- Context-Aware Operations: Leverage a shared context to dynamically adjust the behavior of transformations throughout the data pipeline.
- Pre and Post Hooks: Execute custom logic before or after transformations to extend functionality without modifying the core processing stages.
- Flexible Configuration: Configure and extend pipelines to suit various data sources and processing requirements.
Installation
Install PipelinePy via pip:
pip install pipelinepy
Getting Started
Below is a comprehensive example showcasing how to set up and use the PipelinePy library:
Import Library Components
from pipelinepy import Pipeline
from pipelinepy.transformations import Lambda, Transformation
Define Transformations
Transformation functions manipulate data. Here's how you can define some common transformations:
def Add(value):
return Lambda(lambda data, value=value: [x + value for x in data])
def SubOne():
return Lambda(lambda data: [x - 1 for x in data])
def Multiply(value):
return Lambda(lambda data, value: [x * value for x in data], value)
def Print(prefix=""):
return Lambda(lambda data, prefix: [print(prefix, data), data][1], prefix)
Custom Transformation Classes
For more complex logic, you can define transformation classes:
class Power(Transformation):
def __init__(self, exponent):
self.exponent = exponent
def apply(self, data, context=None):
return [x ** self.exponent for x in data]
class Square(Transformation):
def apply(self, data, context=None):
return [x * x for x in data]
class Filter(Transformation):
def __init__(self, condition):
self.condition = condition
def apply(self, data, context=None):
return list(filter(self.condition, data))
class SortBy(Transformation):
def __init__(self, key_function, reverse=False):
self.key_function = key_function
self.reverse = reverse
def apply(self, data, context=None):
return sorted(data, key=self.key_function, reverse=self.reverse)
Initialize Data
You can start your pipeline with predefined or dynamically generated data:
class InitializeData(Transformation):
def apply(self, data, context=None):
return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Example Usage
Here is how you can set up and run a pipeline with multiple stages:
def main():
context = {'increment': 1}
pipeline = Pipeline(context)
pipeline.add_pre_hook(pre_hook)
pipeline.add_post_hook(post_hook)
pipeline\
.stage(InitializeData, description="Initial data")\
.stage(Add, 2, description="Add 1")\
.stage(RaiseTo, 2, description="Raise to 2")\
.stage(Square, description="Square")\
.stage(Print, prefix="Before SubOne")\
.stage(lambda data: [x - 1 for x in data])\
.stage(Multiply, 2)\
.stage(Filter, lambda x: x > 200, description="Filter even numbers")\
.stage(SortBy, lambda x: x, reverse=True, description="Sort descending")\
.stage(Take, 2)\
.stage(Print, prefix="Final Output")\
.run()
if __name__ == "__main__":
main()
Advanced Configuration
PipelinePy supports various configurations to tailor the behavior of data transformations according to specific needs. Here are some advanced configurations you might consider:
-
Context Customization: You can pass a context object that carries runtime-specific data throughout the pipeline execution. This can be useful for conditionally altering the behavior of transformations based on external factors.
-
Dynamic Data Sources: The pipeline can dynamically source its initial data from external APIs or databases at runtime, allowing for highly adaptive data processing workflows.
Example: Configuring a Dynamic Data Source
class DynamicData(Transformation):
def apply(self, data, context=None):
# Assume fetch_data is a function that retrieves data based on some criteria
return fetch_data(context.get('data_source'))
Contributing
We welcome contributions from the community! Here are some ways you can contribute:
-
Submit Bug Reports and Feature Requests: Use the Issues section of our GitHub repository to report problems or suggest new features.
-
Submit Pull Requests: If you've developed a fix or an enhancement, submit a pull request with your changes. Please ensure your code adheres to the existing style and includes tests covering new or changed functionality.
Pull Request Process
- Fork the repository and create your branch from
main
. - If you've added code, update the documentation as necessary.
- Ensure your code adheres to the existing style guidelines.
- Issue the pull request.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file pieplinepy-0.2.tar.gz
.
File metadata
- Download URL: pieplinepy-0.2.tar.gz
- Upload date:
- Size: 6.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4b1ee46a1bc769d6e19035d64c0f87d248096a85d6103060d889ad1b3c57ead0 |
|
MD5 | 99244dc35aceffeeb1b4200a27ec15d8 |
|
BLAKE2b-256 | cd1aa50e05901f916ebbb00d7fb6a37c95fc289511ddc652eed3fa63a0a857b3 |