Skip to main content

A lightweight pipeline framework

Project description

PipeLayer

PipeLayer is a lightweight Python pipeline framework. Define a series of filters, and chain them together to create modular applications.

Table of Contents

Installation

From the command line:

pip install pipelayer

Getting Started

Step 1: Application Settings

Create a class called AppSettings that inherits from pipelayer.Settings:

app_settings.py

from pipelayer import Settings


class AppSettings(Settings):
    """
    Complete this by adding constants, key/value data from AWS Parameter Store, etc

    The pipelayer.Settings class inherits from pydantic.BaseModel, so fields must be typed appropriately
    """
    ...

NOTE: You do not have to use the Settings base class in your application. Any class can be provided.

Step 2: Application Context

Create a class called AppContext that inherits from the pipelayer.Context class:

app_context.py

from logging import Logger

from app_settings import AppSettings
from pipelayer import Context


class AppContext(Context):
    def __init__(self, settings: AppSettings, log: Logger = Nones):
        self.__settings = settings
        self.__log = log

    @property
    def settings(self) -> AppSettings:
        return self.__settings

    @property
    def log(self) -> Logger:
        return self.__log

Step 3: Create Pipeline Filters

Filters can be defined in the following ways:

  • Classes that inherits from pipelayer.Filter and implement the run method
  • Functions (instance/class/static/module) that have the following signature func(context: Any, data: Any)
  • Anonymous functions (lambda) with two arguments that follow the same pattern for regular functions: my_func = lambda context, data: data

hello_world_filters.py

from pipelayer import Filter


class HelloFilter(Filter):
    def run(self, context) -> str:
        return "Hello"


class WorldFilter(Filter):
    def run(self, context, data) -> str:
        return f"{data},  World!"

functions.py

def create_message_dict(context, data: str) -> dict:
    return {"message": data}

Step 4: Create a Pipeline

Create a module to run the pipeline:

app.py

from logging import Logger
from pipelayer import Pipeline

from app_context import AppContext
from app_settings import AppSettings
from functions import create_message
from hello_world_filters import HelloFilter, WorldFilter

app_settings = AppSettings()
app_context = AppContext(app_settings, Logger("Logger"))
hello_world_pipeline = Pipeline.create(app_context, "Hello World Pipeline")

output = hello_world_pipeline.run([
    HelloFilter,                           # pipeline.Filter type
    WorldFilter(),                         # pipeline.Filter instance
    create_message_dict                    # function type
    lambda context, data: json.dumps(data) # anonymous function
])

# output = '{"message": "Hello, World!"}'

print(f"Pipeline Output: {output}")
print(hello_world_pipeline.manifest.__dict__)

Step 5: Run the Pipeline

from the command line:

run app.py

The Framework

pipelayer.Pipeline

Properties

name (str)
An optional name. It's used in the Manifest.

context (Context)
An instance of pipelayer.Context.

manifest (Manifest)
An instance of pipelayer.Manifest that is created at runtime.

Methods

create(context: Union[Context, Any], name: str = "") -> Pipeline
The factory method to create a pipeline

run(filters: List[Filter], data: Any = None) -> Any
The pipeline runner that iterates through the filters and pipes filter output to the next filter.

pipelayer.Filter

A functional unit that implements the run method, and the optional pre_process and post_process methods.

Properties

name (str)
Optional. Used by the Manifest

pre_process (callable)
Optional.

post_process (callable)
Optional.

Methods

run(context: Union[Context, Any], data: Any) -> Any
The type of the data argument in the abstract class is Any, but you can use the correct type for the data when implementing method. The same is true for the return type.

pipelayer.Context

The Context object is used to pass application-level information to each filter.

Properties

settings (Any)
An optional abstract property for storing application-level data.

log (Any)
An optional abstract property for a common logger that can be used within pipeline filters.

manifest (Manifest)
A model that keeps a record of start/stop times for a pipeline, as well as each filter.

pipelayer.Settings

The settings class is an optional base class for applications settings that inherits from pydantic.BaseModel.

pipelayer.Manifest

The Manifest keeps a record of Pipeline and Filter activity.

Utilities

pipelayer.util.render_manifest(manifest: Manifest, indent: int = 2) -> str

Static function that renders formatted JSON data

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelayer-0.2.0.tar.gz (9.1 kB view hashes)

Uploaded Source

Built Distribution

pipelayer-0.2.0-py3-none-any.whl (8.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page