Skip to main content

A custom pipeline library for data transformations

Project description

PipelinePy - Flexible Data Transformation Library

PipelinePy is a flexible library designed to streamline the process of data transformation within an application using a declarative approach. By providing an easy-to-use interface for setting up sequences of data transformations, it allows users to define what should happen to their data without needing to manage how the data is processed explicitly. Each transformation in the pipeline can be tailored with contextual logic, supporting complex data handling strategies in a clear and concise manner.

Key Features

  • Declarative Data Transformations: Specify your data processing logic declaratively, making your code more readable and maintainable.
  • Context-Aware Operations: Leverage a shared context to dynamically adjust the behavior of transformations throughout the data pipeline.
  • Pre and Post Hooks: Execute custom logic before or after transformations to extend functionality without modifying the core processing stages.
  • Flexible Configuration: Configure and extend pipelines to suit various data sources and processing requirements.

Installation

Install PipelinePy via pip:

pip install pipelinepy

Getting Started

Below is a comprehensive example showcasing how to set up and use the PipelinePy library:

Import Library Components

from pipelinepy import Pipeline
from pipelinepy.transformations import Lambda, Transformation

Define Transformations

Transformation functions manipulate data. Here's how you can define some common transformations:

def Add(value):
    return Lambda(lambda data, value=value: [x + value for x in data])

def SubOne():
    return Lambda(lambda data: [x - 1 for x in data])

def Multiply(value):
    return Lambda(lambda data, value: [x * value for x in data], value)

def Print(prefix=""):
    return Lambda(lambda data, prefix: [print(prefix, data), data][1], prefix)

Custom Transformation Classes

For more complex logic, you can define transformation classes:

class Power(Transformation):
    def __init__(self, exponent):
        self.exponent = exponent

    def apply(self, data, context=None):
        return [x ** self.exponent for x in data]

class Square(Transformation):
    def apply(self, data, context=None):
        return [x * x for x in data]

class Filter(Transformation):
    def __init__(self, condition):
        self.condition = condition

    def apply(self, data, context=None):
        return list(filter(self.condition, data))

class SortBy(Transformation):
    def __init__(self, key_function, reverse=False):
        self.key_function = key_function
        self.reverse = reverse

    def apply(self, data, context=None):
        return sorted(data, key=self.key_function, reverse=self.reverse)

Initialize Data

You can start your pipeline with predefined or dynamically generated data:

class InitializeData(Transformation):
    def apply(self, data, context=None):
        return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Example Usage

Here is how you can set up and run a pipeline with multiple stages:

def main():
    context = {'increment': 1}
    pipeline = Pipeline(context)
    pipeline.add_pre_hook(pre_hook)
    pipeline.add_post_hook(post_hook)
    pipeline\
    .stage(InitializeData, description="Initial data")\
    .stage(Add, 2, description="Add 1")\
    .stage(RaiseTo, 2, description="Raise to 2")\
    .stage(Square, description="Square")\
    .stage(Print, prefix="Before SubOne")\
    .stage(lambda data: [x - 1 for x in data])\
    .stage(Multiply, 2)\
    .stage(Filter, lambda x: x > 200, description="Filter even numbers")\
    .stage(SortBy, lambda x: x, reverse=True, description="Sort descending")\
    .stage(Take, 2)\
    .stage(Print, prefix="Final Output")\
    .run()

if __name__ == "__main__":
    main()

Advanced Configuration

PipelinePy supports various configurations to tailor the behavior of data transformations according to specific needs. Here are some advanced configurations you might consider:

  • Context Customization: You can pass a context object that carries runtime-specific data throughout the pipeline execution. This can be useful for conditionally altering the behavior of transformations based on external factors.

  • Dynamic Data Sources: The pipeline can dynamically source its initial data from external APIs or databases at runtime, allowing for highly adaptive data processing workflows.

Example: Configuring a Dynamic Data Source

class DynamicData(Transformation):
    def apply(self, data, context=None):
        # Assume fetch_data is a function that retrieves data based on some criteria
        return fetch_data(context.get('data_source'))

Contributing

We welcome contributions from the community! Here are some ways you can contribute:

  • Submit Bug Reports and Feature Requests: Use the Issues section of our GitHub repository to report problems or suggest new features.

  • Submit Pull Requests: If you've developed a fix or an enhancement, submit a pull request with your changes. Please ensure your code adheres to the existing style and includes tests covering new or changed functionality.

Pull Request Process

  1. Fork the repository and create your branch from main.
  2. If you've added code, update the documentation as necessary.
  3. Ensure your code adheres to the existing style guidelines.
  4. Issue the pull request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pieplinepy-0.2.tar.gz (6.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page