Skip to main content

A custom pipeline library for data transformations

Project description

To make your Pipeline class more fluid, extensible, and flexible, you can implement several enhancements that improve its usability, adaptability, and integration capabilities with different processing scenarios. Here are a few suggestions to consider:

1. Parameterized Transformations

Allow transformations to accept parameters dynamically. This can be achieved by adjusting how transformations are defined and instantiated.

class ParameterizedLambda(Transformation):
    def __init__(self, function, *args, **kwargs):
        self.function = function
        self.args = args
        self.kwargs = kwargs

    def apply(self, data):
        return self.function(data, *self.args, **self.kwargs)

def CustomTransformation(factor):
    return ParameterizedLambda(lambda data, factor: [x * factor for x in data], factor)

2. Method Chaining with Context

Implement a method chaining mechanism that allows passing a context or state through the pipeline, which can be very useful for more complex data processing that requires state awareness between stages.

class Pipeline:
    def __init__(self, data, context=None):
        self.data = data
        self.context = context or {}
        self.operations = []

    def stage(self, transformation):
        if callable(transformation) and not isinstance(transformation, Transformation):
            transformation = transformation() if not callable(transformation()) else Lambda(transformation)
        self.operations.append(transformation)
        return self

    def run(self):
        result = self.data
        for operation in self.operations:
            result = operation.apply(result)
        return result

    def set_context(self, key, value):
        self.context[key] = value
        return self

    def get_context(self, key):
        return self.context.get(key)

3. Support for Asynchronous Operations

Incorporate asynchronous processing to handle long-running or I/O-bound tasks efficiently within your pipeline.

import asyncio

class AsyncLambda(Transformation):
    async def apply(self, data):
        async_function = asyncio.coroutine(self.function)
        return await async_function(data)

async def async_stage(transformation):
    if callable(transformation) and not isinstance(transformation, Transformation):
        transformation = transformation() if not callable(transformation()) else AsyncLambda(transformation)
    self.operations.append(transformation)
    return self

async def async_run(self):
    result = self.data
    for operation in self.operations:
        if isinstance(operation, AsyncLambda):
            result = await operation.apply(result)
        else:
            result = operation.apply(result)
    return result

4. Hooks and Interceptors

Add hooks or interceptors to stages, allowing custom logic to be executed before or after each stage. This is especially useful for debugging, logging, and modifying data or operations conditionally.

class Pipeline:
    def __init__(self, data):
        self.data = data
        self.operations = []
        self.before_each_hook = None
        self.after_each_hook = None

    def before_each(self, hook):
        self.before_each_hook = hook
        return self

    def after_each(self, hook):
        self.after_each_hook = hook
        return self

    def run(self):
        result = self.data
        for operation in self.operations:
            if callable(self.before_each_hook):
                self.before_each_hook(operation)
            result = operation.apply(result)
            if callable(self.after_each_hook):
                self.after_each_hook(operation, result)
        return result

5. Flexible Data Types

Ensure that your pipeline can handle various types of data inputs (e.g., lists, dictionaries, custom objects) and not just numerical arrays or lists.

By implementing these features, you enhance the Pipeline class to be more robust, capable of handling complex data transformations, supporting asynchronous operations, and fitting into more diverse application scenarios. These enhancements make the pipeline not just a tool for simple transformations but a powerful component suitable for sophisticated data processing workflows.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pieplinepy-0.1.tar.gz (3.5 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page