Skip to main content

No project description provided

Project description

SimplePipeline

A lightweight Python library for building and executing data pipelines with logging and error handling.

Python License

📖 Features

✅ Define data pipelines with processing steps, conditions, and outputs
✅ Support multiple ingests (data sources)
Graceful failure handling with logging
✅ Customizable logging with plug-and-play loggers (FileLogger)
✅ Filter logs by level (INFO, SUCCESS, ERROR)
Visualize the pipeline with an interactive graph

📥 Installation

Prerequisites

Ensure you have Python 3.9+ installed. You can check your Python version with:

python --version

Installing SimplePipeline

To install SimplePipeline, run the following command:

pip install simple-pipelines

📝 Usage

Creating a Pipeline

To create a pipeline, import the SimplePipeline class from the simple_pipelines module:

from simple_pipelines import SimplePipeline

Create a new pipeline instance:

pipeline = SimplePipeline("My Pipeline")

Adding Ingests

Ingests are functions that return data. You can add ingests to your pipeline using the create_ingest method:

def ingest_users():
    return pd.DataFrame({
        "ID": [1, 2, 3, 4, 5],
        "Age": [25, 32, 40, 23, 36],
        "Score": [80, 90, 85, 78, 88]
    })

pipeline.create_ingest(ingest_users, "Users")

The create_ingest method takes two arguments: the ingest function and a name for the ingest. The ingest function should return a pandas.DataFrame or a dictionary.

Adding Steps

Steps are functions that process data. You can add steps to your pipeline using the pipe method:

def process_users(input, ingests):
    df = ingests["Users"].copy()
    df["B"] = df["Age"] * 2
    return df

pipeline.pipe(process_users, "Process Users")

The pipe method takes two arguments: the step function and a name for the step. The step function should take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The step function should return the processed data.

Adding Conditions

Conditions are functions that evaluate whether a step should be executed. You can add conditions to your pipeline using the condition method:

def is_vip(input, ingests):
    return input["Purchase_Amount"].sum() > 150

def branch_vip(input, ingests):
    input["Classification"] = "VIP"
    return input

def branch_regular(input, ingests):
    input["Classification"] = "Regular"
    return input

pipeline.condition(
    conditions={is_vip: branch_vip},
    default_branch=branch_regular,
    name="Check VIP Status"
)

The condition method takes three arguments: a dictionary of conditions and their corresponding branches, a default branch, and a name for the condition. The conditions should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The branches should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The default branch should be a function that takes two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests.

Adding Outputs

Outputs are functions that do not return data. You can add outputs to your pipeline using the output method:

def final_output(input):
    print("Final Processed Data:")
    print(input)

pipeline.output(final_output, "Final Output")

The output method takes two arguments: the output function and a name for the output. The output function should take one argument: input, which is the output of the previous step.

Executing the Pipeline

To execute the pipeline, call the execute method:

pipeline.execute()

The execute method returns the output of the last step, or None if the pipeline fails.

Visualizing the Pipeline

To visualize the pipeline, call the visualize method:

pipeline.visualize()

This will generate an interactive left-to-right visualization of the pipeline using Plotly without Graphviz.

📝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your changes.
  3. Make your changes and commit them.
  4. Push your changes to your forked repository.
  5. Create a pull request to the main repository.

📝 License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_pipelines-0.1.0.tar.gz (5.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_pipelines-0.1.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file simple_pipelines-0.1.0.tar.gz.

File metadata

  • Download URL: simple_pipelines-0.1.0.tar.gz
  • Upload date:
  • Size: 5.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8ac0472bb95534c34fdd8e3d5c72f3a3f59ad04b1fa0a62a28854a71ad2c5423
MD5 2affe100cca76275edb1d6d4bb715d6f
BLAKE2b-256 43126a9b000f0913954326fbe5792ca44410f632febee9dc56caf97dc42f9dce

See more details on using hashes here.

File details

Details for the file simple_pipelines-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: simple_pipelines-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 943f9c543b88ce4f54730ed174ff2440cf85e391c444e49c41a0833b22f2805a
MD5 59109f6f002becd5121e054df62c995a
BLAKE2b-256 cf950588bc4f0b304a771e5e6e7e017f40c2cc8b7954fd3c720b34275182dc87

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page