Skip to main content

A flexible Python data pipeline library using finite state machines for custom data processing workflows

Project description

Pipeweave

A flexible Python data pipeline library that makes it easy to construct and run custom data pipelines using a finite state machine approach.

Project Goal

I have tried some popular Python data pipeline libraries, and have found them all to be a little hard to use for custom use cases. The goal of this project is to create a pipeline library that avoids some of the common pitfalls and allows users to easily construct pipelines using custom functions and run them using a finite state machine.

Features

  • 🚀 Simple, intuitive API for creating data pipelines
  • 🔄 Built-in state management using finite state machines
  • 📦 Easy integration of custom functions
  • 💾 Multiple storage backends (SQLite included)
  • 🔍 Pipeline status tracking and monitoring
  • ⚡ Efficient execution with dependency management

Installation

pip install pipeweave

Quick Start

Here's a simple example that demonstrates how to create and run a pipeline:

pip install pipeweave

Quick Start

Here's a simple example that demonstrates how to create and run a pipeline:

from pipeweave.core import Pipeline

# Create a pipeline
pipeline = Pipeline(name="data_transformer")

# Define some processing functions
def clean_data(data):
    return [x.strip().lower() for x in data]

def filter_empty(data):
    return [x for x in data if x]

# Add steps to the pipeline
pipeline.add_step(
    name="clean_data",
    description="Clean the data",
    function=clean_data,
    inputs=["raw_data"],
    outputs=["cleaned_data"],
)

pipeline.add_step(
    name="filter_empty",
    description="Filter out empty strings",
    function=filter_empty,
    inputs=["cleaned_data"],
    outputs=["filtered_data"],
    dependencies=["clean_data"],
)

# Run the pipeline
data = [" Hello ", "World ", "", " Python "]

results = pipeline.run(data)

print(results)

Core Concepts

Steps

A Step is the basic building block of a pipeline. Each step:

  • Has a unique name
  • Contains a processing function
  • Defines its inputs and outputs
  • Can specify dependencies on other steps
  • Maintains its own state (IDLE, RUNNING, COMPLETED, ERROR)

Stages

A Stage is a collection of steps that can be executed together. Each stage:

  • Has a unique name and description
  • Contains multiple steps, which are individual processing units
  • Defines its own state (IDLE, RUNNING, COMPLETED, ERROR)
  • Can specify dependencies on other stages, ensuring that it only runs when all its dependencies have been completed

Stages allow for better organization of complex pipelines by grouping related steps together. This modular approach enhances readability and maintainability of the pipeline code.

Pipeline

A Pipeline is a collection of steps that:

  • Manages the execution order based on dependencies
  • Handles data flow between steps
  • Tracks overall execution state
  • Can be saved and loaded using storage backends

Storage Backends

Pipeweave supports different storage backends for persisting pipelines:

  • SQLite (included)
  • Custom backends can be implemented using the StorageBackend base class

Advanced Usage

Using Storage Backends

# Initialize Storage
storage = SQLiteStorage("pipelines.db")

# Save Pipeline
storage.save_pipeline(pipeline)

# Load Pipeline
pipeline = storage.load_pipeline("data_transformer")

Error Handling

from pipeweave.steps import State

try:
    results = pipeline.run(data)
except Exception as e:
    # Check state of steps
    for step in pipeline.steps.values():
        if step.state == State.ERROR:
            print(f"Step {step.name} failed: {step.error}")

Stages

from pipeweave.stage import Stage
from pipeweave.step import Step
from pipeweave.core import Pipeline

# Create a pipeline
pipeline = Pipeline(name="data_transformer")

# Define step functions
def double_number(x):
    return x * 2

def add_one(x):
    return x + 1

# Create steps
step_double = Step(name="double", description="Double the input", function=double_number, inputs=["number"], outputs=["result"])
step_add_one = Step(name="add_one", description="Add one to the input", function=add_one, inputs=["result"], outputs=["final"])

# Create a stage
stage = Stage(name="data_processing", description="Process the data", steps=[step_double, step_add_one])

# Add stage to pipeline
pipeline.add_stage(stage.name, stage.description, stage.steps)

# Run the pipeline
results = pipeline.run()

print(results)

Contributing

Contributions are welcome! This is a new project, so please feel free to open issues and suggest improvements.

For major changes, please open an issue first to discuss what you would like to change.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project Status

Pipeweave is currently in alpha. While it's functional and tested, the API may change as we gather user feedback and add new features.

Roadmap

  • Add a stages feature
  • Add a more robust state machine implementation
  • Add more storage backends
  • Add more detailed monitoring and logging
  • Add more testing and CI/CD pipeline

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeweave-0.2.0.tar.gz (10.5 kB view details)

Uploaded Source

Built Distribution

pipeweave-0.2.0-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file pipeweave-0.2.0.tar.gz.

File metadata

  • Download URL: pipeweave-0.2.0.tar.gz
  • Upload date:
  • Size: 10.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for pipeweave-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a417f376f4fd06e112b9510873ee1e1f6653464ccc6006a4106e0209056d7ae8
MD5 5198b4c9720702d95b2590e794d599b0
BLAKE2b-256 22fc3553e7dc0ab289ba890e1bd51457861ae13ec481810c8d5d7b6c854cc4f4

See more details on using hashes here.

File details

Details for the file pipeweave-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pipeweave-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for pipeweave-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3b2cf05ad9c94d250bfb5c28797715cf9ba6c3ed7344780ce3f286eb8fdf5566
MD5 e9d2815538cba55e59ded3eec774cdb8
BLAKE2b-256 ccf12140a770d52cd5319eb12626e45efd2dbee569b1a697eccc08f72bbb5355

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page