Skip to main content

Simple but effective data pipeline builder

Project description

SimplePipeline

A lightweight Python library for building and executing data pipelines with logging and error handling.

Python License

📖 Features

✅ Define data pipelines with processing steps, conditions, and outputs
✅ Support multiple ingests (data sources)
Graceful failure handling with logging
✅ Customizable logging with plug-and-play loggers (FileLogger)
✅ Filter logs by level (INFO, SUCCESS, ERROR)
Visualize the pipeline with an interactive graph

📥 Installation

Prerequisites

Ensure you have Python 3.9+ installed. You can check your Python version with:

python --version

Installing SimplePipeline

To install SimplePipeline, run the following command:

pip install simple-pipelines

📝 Usage

Creating a Pipeline

To create a pipeline, import the SimplePipeline class from the simple_pipelines module:

from simple_pipelines import SimplePipeline

Create a new pipeline instance:

pipeline = SimplePipeline("My Pipeline")

Adding Ingests

Ingests are functions that return data. You can add ingests to your pipeline using the create_ingest method:

def ingest_users():
    return pd.DataFrame({
        "ID": [1, 2, 3, 4, 5],
        "Age": [25, 32, 40, 23, 36],
        "Score": [80, 90, 85, 78, 88]
    })

pipeline.create_ingest(ingest_users, "Users")

The create_ingest method takes two arguments: the ingest function and a name for the ingest. The ingest function should return a pandas.DataFrame or a dictionary.

Adding Steps

Steps are functions that process data. You can add steps to your pipeline using the pipe method:

def process_users(input, ingests):
    df = ingests["Users"].copy()
    df["B"] = df["Age"] * 2
    return df

pipeline.pipe(process_users, "Process Users")

The pipe method takes two arguments: the step function and a name for the step. The step function should take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The step function should return the processed data.

Adding Conditions

Conditions are functions that evaluate whether a step should be executed. You can add conditions to your pipeline using the condition method:

def is_vip(input, ingests):
    return input["Purchase_Amount"].sum() > 150

def branch_vip(input, ingests):
    input["Classification"] = "VIP"
    return input

def branch_regular(input, ingests):
    input["Classification"] = "Regular"
    return input

pipeline.condition(
    conditions={is_vip: branch_vip},
    default_branch=branch_regular,
    name="Check VIP Status"
)

The condition method takes three arguments: a dictionary of conditions and their corresponding branches, a default branch, and a name for the condition. The conditions should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The branches should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The default branch should be a function that takes two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests.

Adding Outputs

Outputs are functions that do not return data. You can add outputs to your pipeline using the output method:

def final_output(input):
    print("Final Processed Data:")
    print(input)

pipeline.output(final_output, "Final Output")

The output method takes two arguments: the output function and a name for the output. The output function should take one argument: input, which is the output of the previous step.

Executing the Pipeline

To execute the pipeline, call the execute method:

pipeline.execute()

The execute method returns the output of the last step, or None if the pipeline fails.

Visualizing the Pipeline

To visualize the pipeline, call the visualize method:

pipeline.visualize()

This will generate an interactive left-to-right visualization of the pipeline using Plotly.

Visualization

📝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your changes.
  3. Make your changes and commit them.
  4. Push your changes to your forked repository.
  5. Create a pull request to the main repository.

📝 License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_pipelines-0.1.3.tar.gz (5.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_pipelines-0.1.3-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file simple_pipelines-0.1.3.tar.gz.

File metadata

  • Download URL: simple_pipelines-0.1.3.tar.gz
  • Upload date:
  • Size: 5.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.3.tar.gz
Algorithm Hash digest
SHA256 5670f8021f473b276a7376887e185560a8bdbe228413f3c581c0058d41ffe5de
MD5 d29ee1533261868ad471e1e394be69f7
BLAKE2b-256 901016f7cbaca56fd73b8d0b03005f1f01e447738b057315d37a522c31717378

See more details on using hashes here.

File details

Details for the file simple_pipelines-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: simple_pipelines-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 aaddb149ac5cdaa8929681c82ea2b408c0c1302e2f083bdc10cbf7444450c384
MD5 cda10bdd8f0da09d66199edf87b5f857
BLAKE2b-256 68dd786602905f6b7771ba81b9e473d39531dcddb3d7cf80cca4d1c790b0f0ab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page