Skip to main content

No project description provided

Project description

SimplePipeline

A lightweight Python library for building and executing data pipelines with logging and error handling.

Python License

📖 Features

✅ Define data pipelines with processing steps, conditions, and outputs
✅ Support multiple ingests (data sources)
Graceful failure handling with logging
✅ Customizable logging with plug-and-play loggers (FileLogger)
✅ Filter logs by level (INFO, SUCCESS, ERROR)
Visualize the pipeline with an interactive graph

📥 Installation

Prerequisites

Ensure you have Python 3.9+ installed. You can check your Python version with:

python --version

Installing SimplePipeline

To install SimplePipeline, run the following command:

pip install simple-pipelines

📝 Usage

Creating a Pipeline

To create a pipeline, import the SimplePipeline class from the simple_pipelines module:

from simple_pipelines import SimplePipeline

Create a new pipeline instance:

pipeline = SimplePipeline("My Pipeline")

Adding Ingests

Ingests are functions that return data. You can add ingests to your pipeline using the create_ingest method:

def ingest_users():
    return pd.DataFrame({
        "ID": [1, 2, 3, 4, 5],
        "Age": [25, 32, 40, 23, 36],
        "Score": [80, 90, 85, 78, 88]
    })

pipeline.create_ingest(ingest_users, "Users")

The create_ingest method takes two arguments: the ingest function and a name for the ingest. The ingest function should return a pandas.DataFrame or a dictionary.

Adding Steps

Steps are functions that process data. You can add steps to your pipeline using the pipe method:

def process_users(input, ingests):
    df = ingests["Users"].copy()
    df["B"] = df["Age"] * 2
    return df

pipeline.pipe(process_users, "Process Users")

The pipe method takes two arguments: the step function and a name for the step. The step function should take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The step function should return the processed data.

Adding Conditions

Conditions are functions that evaluate whether a step should be executed. You can add conditions to your pipeline using the condition method:

def is_vip(input, ingests):
    return input["Purchase_Amount"].sum() > 150

def branch_vip(input, ingests):
    input["Classification"] = "VIP"
    return input

def branch_regular(input, ingests):
    input["Classification"] = "Regular"
    return input

pipeline.condition(
    conditions={is_vip: branch_vip},
    default_branch=branch_regular,
    name="Check VIP Status"
)

The condition method takes three arguments: a dictionary of conditions and their corresponding branches, a default branch, and a name for the condition. The conditions should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The branches should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The default branch should be a function that takes two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests.

Adding Outputs

Outputs are functions that do not return data. You can add outputs to your pipeline using the output method:

def final_output(input):
    print("Final Processed Data:")
    print(input)

pipeline.output(final_output, "Final Output")

The output method takes two arguments: the output function and a name for the output. The output function should take one argument: input, which is the output of the previous step.

Executing the Pipeline

To execute the pipeline, call the execute method:

pipeline.execute()

The execute method returns the output of the last step, or None if the pipeline fails.

Visualizing the Pipeline

To visualize the pipeline, call the visualize method:

pipeline.visualize()

This will generate an interactive left-to-right visualization of the pipeline using Plotly.

Visualization

📝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository.
  2. Create a new branch for your changes.
  3. Make your changes and commit them.
  4. Push your changes to your forked repository.
  5. Create a pull request to the main repository.

📝 License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_pipelines-0.1.2.tar.gz (5.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_pipelines-0.1.2-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file simple_pipelines-0.1.2.tar.gz.

File metadata

  • Download URL: simple_pipelines-0.1.2.tar.gz
  • Upload date:
  • Size: 5.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.2.tar.gz
Algorithm Hash digest
SHA256 cb7b0265c2fa025fb259c72908672b676186d71416d42f8798cea72d5b6743f3
MD5 b0bb8cc262e89d08c4de3ab28ee7852e
BLAKE2b-256 320e90ddb74af49001c7029476521e6eb0d7e4a2bb72f2b7b2965564bdeff9f7

See more details on using hashes here.

File details

Details for the file simple_pipelines-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: simple_pipelines-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 7.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for simple_pipelines-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 47d6ce7e88c49d84df243b448ce78ad9230c5897e52971a7d67909b40265a5d9
MD5 13bcba6cb29e34deb9504b0ba363e850
BLAKE2b-256 e752473d1b53e419da8e8de2a9b074296f0ce87146b5445297962e8c4517c7a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page