Skip to main content

Simple but effective data pipeline builder

Project description

SimplePipeline

A lightweight Python library for building and executing data pipelines with logging and error handling.

Python License

📖 Features

✅ Define data pipelines with processing steps, conditions, and outputs
✅ Support multiple ingests (data sources)
Graceful failure handling with logging
✅ Customizable logging with plug-and-play loggers (FileLogger)
✅ Filter logs by level (INFO, SUCCESS, ERROR)
Visualize the pipeline with an interactive graph

📥 Installation

Prerequisites

Ensure you have Python 3.9+ installed. You can check your Python version with:

python --version

Installing SimplePipeline

To install SimplePipeline, run the following command:

pip install simple-pipelines

📝 Usage

Creating a Pipeline

To create a pipeline, import the SimplePipeline class from the simple_pipelines module:

from simple_pipelines import SimplePipeline

Create a new pipeline instance:

pipeline = SimplePipeline("My Pipeline")

Adding Ingests

Ingests are functions that return data. You can add ingests to your pipeline using the create_ingest method:

def ingest_users():
    return pd.DataFrame({
        "ID": [1, 2, 3, 4, 5],
        "Age": [25, 32, 40, 23, 36],
        "Score": [80, 90, 85, 78, 88]
    })

pipeline.create_ingest(ingest_users, "Users")

The create_ingest method takes two arguments: the ingest function and a name for the ingest. The ingest function should return a pandas.DataFrame or a dictionary.

Adding Steps

Steps are functions that process data. You can add steps to your pipeline using the pipe method:

def process_users(input, ingests):
    df = ingests["Users"].copy()
    df["B"] = df["Age"] * 2
    return df

pipeline.pipe(process_users, "Process Users")

The pipe method takes two arguments: the step function and a name for the step. The step function should take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The step function should return the processed data.

Adding Conditions

Conditions are functions that evaluate whether a step should be executed. You can add conditions to your pipeline using the condition method:

def is_vip(input, ingests):
    return input["Purchase_Amount"].sum() > 150

def branch_vip(input, ingests):
    input["Classification"] = "VIP"
    return input

def branch_regular(input, ingests):
    input["Classification"] = "Regular"
    return input

pipeline.condition(
    conditions={is_vip: branch_vip},
    default_branch=branch_regular,
    name="Check VIP Status"
)

The condition method takes three arguments: a dictionary of conditions and their corresponding branches, a default branch, and a name for the condition. The conditions should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The branches should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The default branch should be a function that takes two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests.

Adding Outputs

Outputs are functions that do not return data. You can add outputs to your pipeline using the output method:

def final_output(input):
    print("Final Processed Data:")
    print(input)

pipeline.output(final_output, "Final Output")

The output method takes two arguments: the output function and a name for the output. The output function should take one argument: input, which is the output of the previous step.

Executing the Pipeline

To execute the pipeline, call the execute method:

pipeline.execute()

The execute method returns the output of the last step, or None if the pipeline fails.

Visualizing the Pipeline

To visualize the pipeline, call the visualize method:

pipeline.visualize()

This will generate an interactive left-to-right visualization of the pipeline using Plotly.

Visualization

📝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your changes.
  3. Make your changes and commit them.
  4. Push your changes to your forked repository.
  5. Create a pull request to the main repository.

📝 License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_pipelines-0.1.4.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_pipelines-0.1.4-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file simple_pipelines-0.1.4.tar.gz.

File metadata

  • Download URL: simple_pipelines-0.1.4.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.4.tar.gz
Algorithm Hash digest
SHA256 9b26f76d2abb8de165a5dbbefe3294445cf2c0b6903e842cdee6b60daaff3f1d
MD5 f61226d1256b52d008cfa7dd0095856b
BLAKE2b-256 de5e37cd1ae05895215961ab056f4db760fc501d084ef7fedb35c4960c52c41b

See more details on using hashes here.

File details

Details for the file simple_pipelines-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: simple_pipelines-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 c16109d64df1eaf94f962bb201db39461c2b1536dfd2998b3348fd433ccdba22
MD5 bdb7799b1eb2f59880d94499ecdcca96
BLAKE2b-256 5a4bf5d81d964219c2c13d90a18d2bc02afc6c1e033f59bce9adb1df3a57f8fc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page