Skip to main content

The Pipeline Design Pattern realization in python.

Project description

The Pipeline Design Pattern

tpdp it is the Pipeline Design Pattern realization in Python.

Quickstart

Install with pip

pip install tpdp

and creating your first pipeline

from tpdp import Pipeline, State, Step

class SimpleState(State):
    pipeline_name: str
    step_1_count: int = 0
    step_2_count: int = 0

init_state = SimpleState(pipeline_name="simple_pipeline")

pipeline = Pipeline(name="simple_pipeline")

class Step1(Step):
    def run(self, state: SimpleState, **kwargs) -> SimpleState:
        state.step_1_count += 1
        self.logger.info("step_1_count increased by 1")
        return state

class Step2(Step):
    def run(self, state: SimpleState, **kwargs) -> SimpleState:
        state.step_2_count += 1
        self.logger.info("step_2_count increased by 1")
        return state

step_1 = Step1(step_name="step_1")
step_2 = Step2(step_name="step_2")

pipeline.registry_step(step_1)
pipeline.registry_step(step_2)
pipeline.registry_step(step_1)
pipeline.registry_step(step_2)
pipeline.registry_step(step_1)
pipeline.registry_step(step_2)
pipeline.registry_step(step_1)

result = pipeline.run(init_state=init_state)
final_state = pipeline.get_state()

print(f"final_state.step_1_count = {final_state.step_1_count}")
print(f"final_state.step_2_count = {final_state.step_2_count}")

You will see the following logs

[...: INFO] Pipeline: Step registered: step_name="step_1"
[...: INFO] Pipeline: Step registered: step_name="step_2"
[...: INFO] Pipeline: Step registered: step_name="step_1"
[...: INFO] Pipeline: Step registered: step_name="step_2"
[...: INFO] Pipeline: Step registered: step_name="step_1"
[...: INFO] Pipeline: Step registered: step_name="step_2"
[...: INFO] Pipeline: Step registered: step_name="step_1"
[...: INFO] Pipeline: Pipeline start: pipeline_name="simple_pipeline", start_at="..."
[...: INFO] Pipeline: Step start: step_name="step_1"
[...: INFO] Step1: step_1_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_1", step_duration="0.000024"
[...: INFO] Pipeline: Step start: step_name="step_2"
[...: INFO] Step2: step_2_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_2", step_duration="0.000020"
[...: INFO] Pipeline: Step start: step_name="step_1"
[...: INFO] Step1: step_1_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_1", step_duration="0.000016"
[...: INFO] Pipeline: Step start: step_name="step_2"
[...: INFO] Step2: step_2_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_2", step_duration="0.000016"
[...: INFO] Pipeline: Step start: step_name="step_1"
[...: INFO] Step1: step_1_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_1", step_duration="0.000016"
[...: INFO] Pipeline: Step start: step_name="step_2"
[...: INFO] Step2: step_2_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_2", step_duration="0.000016"
[...: INFO] Pipeline: Step start: step_name="step_1"
[...: INFO] Step1: step_1_count increased by 1
[...: INFO] Pipeline: Step finish: step_name="step_1", step_duration="0.000023"
[...: INFO] Pipeline: Pipeline finish: pipeline_name="simple_pipeline", finish_at="...", duration="0.0005033016204833984"
final_state.step_1_count = 4
final_state.step_2_count = 3

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tpdp-0.1.0.tar.gz (9.8 kB view hashes)

Uploaded Source

Built Distribution

tpdp-0.1.0-py3-none-any.whl (5.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page