A flexible Python data pipeline library using finite state machines for custom data processing workflows
Project description
Pipeweave
A flexible Python data pipeline library that makes it easy to construct and run custom data pipelines using a finite state machine approach.
Project Goal
I have tried some popular Python data pipeline libraries, and have found them all to be a little hard to use for custom use cases. The goal of this project is to create a pipeline library that avoids some of the common pitfalls and allows users to easily construct pipelines using custom functions and run them using a finite state machine.
Features
- 🚀 Simple, intuitive API for creating data pipelines
- 🔄 Built-in state management using finite state machines
- 📦 Easy integration of custom functions
- 💾 Multiple storage backends (SQLite included)
- 🔍 Pipeline status tracking and monitoring
- ⚡ Efficient execution with dependency management
Installation
pip install pipeweave
Quick Start
Here's a simple example that demonstrates how to create and run a pipeline:
pip install pipeweave
Quick Start
Here's a simple example that demonstrates how to create and run a pipeline:
from pipeweave.core import Pipeline
# Create a pipeline
pipeline = Pipeline(name="data_transformer")
# Define some processing functions
def clean_data(data):
return [x.strip().lower() for x in data]
def filter_empty(data):
return [x for x in data if x]
# Add steps to the pipeline
pipeline.add_step(
name="clean_data",
description="Clean the data",
function=clean_data,
inputs=["raw_data"],
outputs=["cleaned_data"],
)
pipeline.add_step(
name="filter_empty",
description="Filter out empty strings",
function=filter_empty,
inputs=["cleaned_data"],
outputs=["filtered_data"],
dependencies=["clean_data"],
)
# Run the pipeline
data = [" Hello ", "World ", "", " Python "]
results = pipeline.run(data)
print(results)
Core Concepts
Steps
A Step is the basic building block of a pipeline. Each step:
- Has a unique name
- Contains a processing function
- Defines its inputs and outputs
- Can specify dependencies on other steps
- Maintains its own state (IDLE, RUNNING, COMPLETED, ERROR)
Pipeline
A Pipeline is a collection of steps that:
- Manages the execution order based on dependencies
- Handles data flow between steps
- Tracks overall execution state
- Can be saved and loaded using storage backends
Storage Backends
Pipeweave supports different storage backends for persisting pipelines:
- SQLite (included)
- Custom backends can be implemented using the StorageBackend base class
Advanced Usage
Using Storage Backends
# Initialize Storage
storage = SQLiteStorage("pipelines.db")
# Save Pipeline
storage.save_pipeline(pipeline)
# Load Pipeline
pipeline = storage.load_pipeline("data_transformer")
Error Handling
from pipeweave.steps import State
try:
results = pipeline.run(data)
except Exception as e:
# Check state of steps
for step in pipeline.steps.values():
if step.state == State.ERROR:
print(f"Step {step.name} failed: {step.error}")
Contributing
Contributions are welcome! This is a new project, so please feel free to open issues and suggest improvements.
For major changes, please open an issue first to discuss what you would like to change.
- Fork the repository
- Create your feature branch (
git checkout -b feature/AmazingFeature
) - Commit your changes (
git commit -m 'Add some AmazingFeature'
) - Push to the branch (
git push origin feature/AmazingFeature
) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project Status
Pipeweave is currently in alpha. While it's functional and tested, the API may change as we gather user feedback and add new features.
Roadmap
- Add a more robust state machine implementation
- Add more storage backends
- Add more detailed monitoring and logging
- Add more testing and CI/CD pipeline
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pipeweave-0.1.1.tar.gz
.
File metadata
- Download URL: pipeweave-0.1.1.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 623f24a80572515db687abfd27cec4f0ffe0af866ff3770291aa3230e873fb7d |
|
MD5 | a2907bf48e6b3b5907a4124e36a56b3b |
|
BLAKE2b-256 | 6eb21c37bcd9b572e3459f0650e0e1aa5d6bdc9e3483eeb893491a4660f4c4c4 |
File details
Details for the file pipeweave-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: pipeweave-0.1.1-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cad8231ae77181df7dcfd9a243d31e5d676bf795fc42a4c3890e27578159691f |
|
MD5 | 31d8188f1bbda2122b8118e5501379d5 |
|
BLAKE2b-256 | 045176cd3116cf710a64605420def9ef53d6b2dfcd53b8d0493a2ebb7703a2f8 |