No project description provided
Project description
SimplePipeline
A lightweight Python library for building and executing data pipelines with logging and error handling.
📖 Features
✅ Define data pipelines with processing steps, conditions, and outputs
✅ Support multiple ingests (data sources)
✅ Graceful failure handling with logging
✅ Customizable logging with plug-and-play loggers (FileLogger)
✅ Filter logs by level (INFO, SUCCESS, ERROR)
✅ Visualize the pipeline with an interactive graph
📥 Installation
Prerequisites
Ensure you have Python 3.9+ installed. You can check your Python version with:
python --version
Installing SimplePipeline
To install SimplePipeline, run the following command:
pip install simple-pipelines
📝 Usage
Creating a Pipeline
To create a pipeline, import the SimplePipeline class from the simple_pipelines module:
from simple_pipelines import SimplePipeline
Create a new pipeline instance:
pipeline = SimplePipeline("My Pipeline")
Adding Ingests
Ingests are functions that return data. You can add ingests to your pipeline using the create_ingest method:
def ingest_users():
return pd.DataFrame({
"ID": [1, 2, 3, 4, 5],
"Age": [25, 32, 40, 23, 36],
"Score": [80, 90, 85, 78, 88]
})
pipeline.create_ingest(ingest_users, "Users")
The create_ingest method takes two arguments: the ingest function and a name for the ingest. The ingest function should return a pandas.DataFrame or a dictionary.
Adding Steps
Steps are functions that process data. You can add steps to your pipeline using the pipe method:
def process_users(input, ingests):
df = ingests["Users"].copy()
df["B"] = df["Age"] * 2
return df
pipeline.pipe(process_users, "Process Users")
The pipe method takes two arguments: the step function and a name for the step. The step function should take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The step function should return the processed data.
Adding Conditions
Conditions are functions that evaluate whether a step should be executed. You can add conditions to your pipeline using the condition method:
def is_vip(input, ingests):
return input["Purchase_Amount"].sum() > 150
def branch_vip(input, ingests):
input["Classification"] = "VIP"
return input
def branch_regular(input, ingests):
input["Classification"] = "Regular"
return input
pipeline.condition(
conditions={is_vip: branch_vip},
default_branch=branch_regular,
name="Check VIP Status"
)
The condition method takes three arguments: a dictionary of conditions and their corresponding branches, a default branch, and a name for the condition. The conditions should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The branches should be functions that take two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests. The default branch should be a function that takes two arguments: input, which is the output of the previous step, and ingests, which is a dictionary containing all ingests.
Adding Outputs
Outputs are functions that do not return data. You can add outputs to your pipeline using the output method:
def final_output(input):
print("Final Processed Data:")
print(input)
pipeline.output(final_output, "Final Output")
The output method takes two arguments: the output function and a name for the output. The output function should take one argument: input, which is the output of the previous step.
Executing the Pipeline
To execute the pipeline, call the execute method:
pipeline.execute()
The execute method returns the output of the last step, or None if the pipeline fails.
Visualizing the Pipeline
To visualize the pipeline, call the visualize method:
pipeline.visualize()
This will generate an interactive left-to-right visualization of the pipeline using Plotly without Graphviz.
📝 Contributing
Contributions are welcome! Please follow these steps:
- Fork the repository.
- Create a new branch for your changes.
- Make your changes and commit them.
- Push your changes to your forked repository.
- Create a pull request to the main repository.
📝 License
This project is licensed under the MIT License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simple_pipelines-0.1.0.tar.gz.
File metadata
- Download URL: simple_pipelines-0.1.0.tar.gz
- Upload date:
- Size: 5.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ac0472bb95534c34fdd8e3d5c72f3a3f59ad04b1fa0a62a28854a71ad2c5423
|
|
| MD5 |
2affe100cca76275edb1d6d4bb715d6f
|
|
| BLAKE2b-256 |
43126a9b000f0913954326fbe5792ca44410f632febee9dc56caf97dc42f9dce
|
File details
Details for the file simple_pipelines-0.1.0-py3-none-any.whl.
File metadata
- Download URL: simple_pipelines-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.1 CPython/3.13.2 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
943f9c543b88ce4f54730ed174ff2440cf85e391c444e49c41a0833b22f2805a
|
|
| MD5 |
59109f6f002becd5121e054df62c995a
|
|
| BLAKE2b-256 |
cf950588bc4f0b304a771e5e6e7e017f40c2cc8b7954fd3c720b34275182dc87
|