Skip to main content

Pipeline library with API integration for task orchestration and execution tracking

Project description

wpipe - Python Pipeline Library for Sequential Data Processing

PyPI version Python versions License: MIT LTS Tests Documentation

Version 2.0.0-LTS: Long-Term Support release with parallel execution, pipeline composition, step decorators, resource monitoring, checkpointing, and 90% test coverage.

Project Overview

wpipe is a powerful, enterprise-grade Python library for creating and executing complex data processing pipelines. It is designed for mission-critical environments where reliability, observability, and performance are paramount. Now in its LTS (Long-Term Support) phase, WPipe guarantees a stable API and long-term maintenance.

Key Characteristics

  • No web UI required - Just clean, production-ready Python code
  • Minimal dependencies - Only requests and pyyaml
  • Production-ready - Comprehensive error handling, retry logic, and logging
  • Well-documented - Extensive docs, tutorials, and 100+ examples
  • 100% Type Hints - Excellent IDE support and better developer experience

Features

Feature Description
๐Ÿ”— Pipeline Orchestration Create pipelines with step functions and classes
๐ŸŒณ Conditional Branches Execute different paths based on data conditions
๐Ÿ”„ Retry Logic Automatic retries with configurable backoff strategies
๐ŸŒ API Integration Connect to external APIs, register workers
๐Ÿ’พ SQLite Storage Persist execution results to database
โš ๏ธ Error Handling Custom exceptions and detailed error codes
๐Ÿ“‹ YAML Configuration Load and manage configurations
๐Ÿ”€ Nested Pipelines Compose complex workflows
๐Ÿ“Š Progress Tracking Rich terminal output
๐Ÿงช Type Hints Complete type annotations
๐Ÿ”’ Memory Control Built-in memory utilities
๐Ÿงฉ Composable Reusable pipeline components
โšก Parallel Execution Execute steps in parallel (I/O or CPU bound)
๐Ÿ“‚ Pipeline Composition Use pipelines as steps in other pipelines
๐ŸŽฏ Step Decorators Define steps inline with @step decorator
๐Ÿ’พ Checkpointing Save and resume from checkpoints
โฑ๏ธ Timeouts Prevent hanging tasks with timeout support
๐Ÿ“ˆ Resource Monitoring Track RAM and CPU during execution
๐Ÿ“ค Export Export logs, metrics, and statistics to JSON/CSV

๐Ÿšถ Diagram Walkthrough

The following diagram shows the high-level execution flow of a typical wpipe pipeline:

flowchart TD
    A[๐Ÿš€ Start: User Creates Pipeline] --> B[๐Ÿ“ Define Steps]
    B --> C[โš™๏ธ Configure Pipeline]
    C --> D[โ–ถ๏ธ Execute: pipeline.run]
    D --> E{Step 1}
    E -->|Success| F{Step 2}
    E -->|Error| Z[โŒ TaskError Raised]
    F -->|Success| G{Step 3}
    F -->|Condition True| H[โœ… Branch True]
    F -->|Condition False| I[โŒ Branch False]
    H --> G
    I --> G
    G -->|Success| J[๐Ÿ“ฆ Accumulated Results]
    G -->|Error| Z
    J --> K[โœ… Pipeline Complete]
    Z --> L[๐Ÿ”ง Error Handler]
    L --> M[๐Ÿ“‹ Access Partial Results]

Flow Description

  1. Start: User creates a Pipeline instance and defines step functions
  2. Configure: Steps are registered with name and version metadata
  3. Execute: pipeline.run() starts the sequential execution
  4. Data Flow: Each step receives accumulated results from all previous steps
  5. Conditions: Optional branching based on data evaluation
  6. Complete: Final accumulated results are returned

๐Ÿ—บ๏ธ System Workflow

The following sequence diagram shows the interaction between components during pipeline execution:

sequenceDiagram
    participant User
    participant Pipeline
    participant Step1
    participant Step2
    participant SQLite
    participant ExternalAPI

    User->>Pipeline: Create Pipeline(verbose=True)
    User->>Pipeline: set_steps([...])
    User->>Pipeline: run(initial_data)
    
    Pipeline->>Step1: Execute Step1(initial_data)
    Step1-->>Pipeline: Return {result1}
    
    Pipeline->>Pipeline: Accumulate data
    Pipeline->>Step2: Execute Step2(accumulated_data)
    Step2-->>Pipeline: Return {result2}
    
    alt with API enabled
        Pipeline->>ExternalAPI: worker_register()
        ExternalAPI-->>Pipeline: worker_id
        Pipeline->>ExternalAPI: register_process()
        ExternalAPI-->>Pipeline: process_id
    end
    
    alt with SQLite enabled
        Pipeline->>SQLite: write(input, output, details)
        SQLite-->>Pipeline: record_id
    end
    
    Pipeline-->>User: Return accumulated_results
    
    Note over User,Pipeline: Pipeline execution complete

๐Ÿ—๏ธ Architecture Components

The following diagram illustrates the static structure and dependencies of wpipe's main modules:

graph TB
    subgraph Core["๐Ÿ“ฆ Core Layer"]
        P[Pipeline<br/>pipe/pipe.py]
        C[Condition<br/>pipe/pipe.py]
        PM[ProgressManager<br/>pipe/pipe.py]
    end
    
    subgraph Integration["๐Ÿ”Œ Integration Layer"]
        API[APIClient<br/>api_client/]
        SQL[Sqlite<br/>sqlite/]
        WS[Wsqlite<br/>sqlite/]
    end
    
    subgraph Utilities["๐Ÿ› ๏ธ Utilities Layer"]
        LOG[new_logger<br/>log/]
        RAM[memory<br/>ram/]
        YAML[leer_yaml<br/>util/]
    end
    
    subgraph Exceptions["โš ๏ธ Exception Layer"]
        TE[TaskError<br/>exception/]
        AE[ApiError<br/>exception/]
        PE[ProcessError<br/>exception/]
        CO[Codes<br/>exception/]
    end
    
    P --> C
    P --> PM
    P --> API
    P --> SQL
    P --> WS
    API --> TE
    API --> AE
    SQL --> PE
    WS --> SQL
    
    User --> P
    User --> C
    User --> API
    User --> WS
    User --> YAML
    User --> RAM
    User --> LOG

Module Dependencies

graph LR
    A[wpipe/__init__.py] --> B[pipe/pipe.py]
    A --> C[api_client/]
    A --> D[sqlite/]
    A --> E[exception/]
    
    B --> F[rich<br/>ProgressManager]
    C --> G[requests<br/>HTTP calls]
    D --> H[sqlite3<br/>Database]
    E --> I[Built-in exceptions]
    
    User --> A

โš™๏ธ Container Lifecycle

Build Process

flowchart LR
    A[๐Ÿ“ Source Code] --> B[๐Ÿ”ง Install Dependencies]
    B --> C[pip install -e .]
    C --> D[๐Ÿ“ฆ wpipe Package]
    D --> E[โœ… Ready for Development]
    
    style A fill:#e1f5fe
    style E fill:#c8e6c9

Runtime Process

flowchart TD
    A[๐Ÿš€ Import wpipe] --> B[๐Ÿ“ Create Pipeline]
    B --> C[โš™๏ธ Configure Steps]
    C --> D[โ–ถ๏ธ Call pipeline.run]
    D --> E[๐Ÿ“Š Initialize Progress]
    
    E --> F{Step Loop}
    F -->|For each step| G[๐Ÿ“ฅ Get Step Data]
    G --> H[โšก Execute Step Function]
    H --> I{Success?}
    
    I -->|Yes| J[๐Ÿ“ฆ Accumulate Results]
    J --> K{Next Step?}
    K -->|Yes| F
    K -->|No| L[โœ… Return Final Results]
    
    I -->|No| M{Retry Config?}
    M -->|Yes| N[โณ Wait retry_delay]
    N --> O[๐Ÿ”„ Retry Attempt]
    O --> H
    
    M -->|No| P[โŒ Raise TaskError]
    P --> Q[๐Ÿ“‹ Capture Partial Results]
    
    style L fill:#c8e6c9
    style P fill:#ffcdd2

๐Ÿ“‚ File-by-File Guide

File/Directory Purpose Description
wpipe/__init__.py Main exports Exports Pipeline, Condition, APIClient, Wsqlite
wpipe/pipe/pipe.py Core logic Pipeline, Condition, ProgressManager classes
wpipe/api_client/api_client.py HTTP client APIClient, send_post, send_get functions
wpipe/sqlite/Sqlite.py Database Core SQLite operations
wpipe/sqlite/Wsqlite.py Wrapper Context manager for simple DB operations
wpipe/log/log.py Logging new_logger function (loguru)
wpipe/ram/ram.py Memory memory decorator, memory_limit, get_memory
wpipe/util/utils.py Config leer_yaml, escribir_yaml functions
wpipe/exception/api_error.py Errors TaskError, ApiError, ProcessError, Codes
docs/source/ Documentation Sphinx documentation source files
examples/ Examples 100+ working examples organized by topic
test/ Tests pytest test suite (106 tests)

Getting Started

Installation

PyPI (Recommended)

pip install wpipe

From Source

git clone https://github.com/wisrovi/wpipe
cd wpipe
pip install -e .

Development Install

pip install -e ".[dev]"

Requirements

  • Python 3.9 or higher
  • requests (for API integration)
  • pyyaml (for YAML configuration)

Verification

import wpipe
print(wpipe.__version__)  # 1.0.0

Usage/Examples

Basic Pipeline

from wpipe import Pipeline

def fetch_data(data):
    """Fetch data from a source."""
    return {"users": [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}]}

def process_data(data):
    """Process the fetched data."""
    users = data["users"]
    return {"count": len(users), "names": [u["name"] for u in users]}

def save_data(data):
    """Save results."""
    return {"status": "saved", "processed": data["count"]}

# Create and configure your pipeline
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (fetch_data, "Fetch Data", "v1.0"),
    (process_data, "Process Data", "v1.0"),
    (save_data, "Save Data", "v1.0"),
])

# Run the pipeline
result = pipeline.run({})
# Output: {'users': [...], 'count': 3, 'names': [...], 'status': 'saved', 'processed': 3}

Conditional Pipeline

from wpipe import Pipeline, Condition

def check_value(data):
    return {"value": 75}

def process_high(data):
    return {"result": "High value"}

def process_low(data):
    return {"result": "Low value"}

condition = Condition(
    expression="value > 50",
    branch_true=[(process_high, "High", "v1.0")],
    branch_false=[(process_low, "Low", "v1.0")],
)

pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (check_value, "Check", "v1.0"),
    condition,
])

With SQLite Storage

from wpipe import Pipeline
from wpipe.sqlite import Wsqlite

with Wsqlite(db_name="results.db") as db:
    db.input = {"x": 10}
    result = pipeline.run({"x": 10})
    db.output = result
    print(f"Record ID: {db.id}")

With Retry Logic

pipeline = Pipeline(
    verbose=True,
    max_retries=3,
    retry_delay=2.0,
    retry_on_exceptions=(ConnectionError, TimeoutError),
)

Data Flow Visualization

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        PIPELINE EXECUTION FLOW                            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Input          Step 1            Step 2            Step 3            Output
โ”€โ”€โ”€โ”€โ”€โ”€          โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€
โ”Œโ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  {}  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Fetch  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ Process โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Save   โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ Result  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚  Data   โ”‚      โ”‚  Data   โ”‚      โ”‚  Data   โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
             โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
             {users: [...]}   {users, count}   {count, status}

API Reference

Pipeline

from wpipe import Pipeline

pipeline = Pipeline(
    verbose=True,           # Enable verbose output
    api_config={},         # API configuration
    max_retries=3,         # Maximum retry attempts
    retry_delay=1.0,       # Delay between retries
    retry_on_exceptions=(ConnectionError, TimeoutError),
)

Methods:

  • set_steps(steps) - Configure pipeline steps
  • run(input_data) - Execute the pipeline
  • worker_register(name, version) - Register with API
  • set_worker_id(worker_id) - Set worker ID

Condition

from wpipe import Condition

condition = Condition(
    expression="value > 50",
    branch_true=[(step_true, "True", "v1.0")],
    branch_false=[(step_false, "False", "v1.0")],
)

Exceptions

from wpipe.exception import TaskError, ApiError, ProcessError, Codes

try:
    result = pipeline.run(data)
except TaskError as e:
    print(f"Step: {e.step_name}")
    print(f"Code: {e.error_code}")

Error Codes:

  • TASK_FAILED (502) - Task execution failed
  • API_ERROR (501) - API communication error
  • UPDATE_PROCESS_ERROR (504) - Process update failed
  • UPDATE_TASK (505) - Task update failed
  • UPDATE_PROCESS_OK (503) - Process completed successfully

Code Quality

Metric Value
Tests 106 passing
Examples 100+
Python Support 3.9, 3.10, 3.11, 3.12, 3.13
Type Hints Complete
Docstrings Google-style

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=wpipe --cov-report=html
open htmlcov/index.html

# Lint
ruff check wpipe/

# Auto-fix linting
ruff check wpipe/ --fix

# Type check
mypy wpipe/

# All quality checks
ruff check wpipe/ && mypy wpipe/ && pytest

Examples

Explore 100+ examples organized by functionality:

Folder Examples Description
01_basic_pipeline 15 Functions, classes, mixed steps, data flow, async
02_api_pipeline 20 External APIs, workers, authentication, health checks
03_error_handling 15 Exceptions, error codes, recovery, partial results
04_condition 12 Conditional branches, decision trees, boolean logic
05_retry 12 Automatic retries, backoff, custom exceptions
06_sqlite_integration 14 Persistence, CSV export, batch operations
07_nested_pipelines 14 Complex workflows, parallel execution, recursion
08_yaml_config 14 Configuration, environment variables, validation
09_microservice 11 Production-ready microservice patterns

Running Examples

# Basic pipeline
python examples/01_basic_pipeline/01_simple_function/example.py

# With conditions
python examples/04_condition/01_basic_condition_example/example.py

# With SQLite
python examples/06_sqlite_integration/02_wsqlite_example/example.py

# With retry
python examples/05_retry/01_basic_retry_example/example.py

Architecture

wpipe/
โ”œโ”€โ”€ __init__.py           # Main exports (Pipeline, Condition, APIClient, Wsqlite)
โ”œโ”€โ”€ pipe/
โ”‚   โ””โ”€โ”€ pipe.py           # Pipeline, Condition, ProgressManager
โ”œโ”€โ”€ api_client/
โ”‚   โ””โ”€โ”€ api_client.py     # APIClient, send_post, send_get
โ”œโ”€โ”€ sqlite/
โ”‚   โ”œโ”€โ”€ Sqlite.py         # Core SQLite operations
โ”‚   โ””โ”€โ”€ Wsqlite.py        # Simplified context manager wrapper
โ”œโ”€โ”€ log/
โ”‚   โ””โ”€โ”€ log.py            # Logging utilities (loguru)
โ”œโ”€โ”€ ram/
โ”‚   โ””โ”€โ”€ ram.py            # Memory control utilities
โ”œโ”€โ”€ util/
โ”‚   โ””โ”€โ”€ utils.py          # YAML utilities (leer_yaml, escribir_yaml)
โ””โ”€โ”€ exception/
    โ””โ”€โ”€ api_error.py      # TaskError, ApiError, ProcessError, Codes

Documentation

Resource URL
Documentation https://wpipe.readthedocs.io/
Live Demo https://wisrovi.github.io/wpipe/
PyPI Package https://pypi.org/project/wpipe/
GitHub Repository https://github.com/wisrovi/wpipe
Releases https://github.com/wisrovi/wpipe/releases
Issues https://github.com/wisrovi/wpipe/issues

Why wpipe?

Traditional Tools wpipe
Complex setup Simple pip install
Web UI required Pure Python code
Heavy dependencies Minimal requirements
YAML/JSON config Python code
Overkill for simple tasks Perfect for any scale

License

MIT License - See LICENSE file


Author

William Steve Rodriguez Villamizar


Star โญ this repo if you find it useful!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wpipe-1.5.4.tar.gz (112.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wpipe-1.5.4-py3-none-any.whl (108.5 kB view details)

Uploaded Python 3

File details

Details for the file wpipe-1.5.4.tar.gz.

File metadata

  • Download URL: wpipe-1.5.4.tar.gz
  • Upload date:
  • Size: 112.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for wpipe-1.5.4.tar.gz
Algorithm Hash digest
SHA256 48e36b5f207b977537a06b679c6ee2d5afaa82b474b4f870fe554e6751c03833
MD5 5e8363fcdd4bbe93e787a3b981ef1401
BLAKE2b-256 6145916a3a4b29c7e8b386f1ba11a4bb6f019646f48cb8fb9b15c61f4758d66f

See more details on using hashes here.

File details

Details for the file wpipe-1.5.4-py3-none-any.whl.

File metadata

  • Download URL: wpipe-1.5.4-py3-none-any.whl
  • Upload date:
  • Size: 108.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for wpipe-1.5.4-py3-none-any.whl
Algorithm Hash digest
SHA256 3778f94cacffcf185c8ac733616803dfef050fc2b8dbef4dd86b07dfffb3f510
MD5 c867043dd9ce2b4c0fd63ad09d5f8e25
BLAKE2b-256 b20dfacc89a1ed61a0021704f3571809fba7ec67163ecdcfdedc1749b95928f5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page