Pipeline library with API integration for task orchestration and execution tracking
Project description
wpipe - Python Pipeline Library for Sequential Data Processing
Version 2.0.0-LTS: Long-Term Support release with parallel execution, pipeline composition, step decorators, resource monitoring, checkpointing, and 90% test coverage.
Project Overview
wpipe is a powerful, enterprise-grade Python library for creating and executing complex data processing pipelines. It is designed for mission-critical environments where reliability, observability, and performance are paramount. Now in its LTS (Long-Term Support) phase, WPipe guarantees a stable API and long-term maintenance.
Key Characteristics
- No web UI required - Just clean, production-ready Python code
- Minimal dependencies - Only
requestsandpyyaml - Production-ready - Comprehensive error handling, retry logic, and logging
- Well-documented - Extensive docs, tutorials, and 100+ examples
- 100% Type Hints - Excellent IDE support and better developer experience
Features
| Feature | Description |
|---|---|
| ๐ Pipeline Orchestration | Create pipelines with step functions and classes |
| ๐ณ Conditional Branches | Execute different paths based on data conditions |
| ๐ Retry Logic | Automatic retries with configurable backoff strategies |
| ๐ API Integration | Connect to external APIs, register workers |
| ๐พ SQLite Storage | Persist execution results to database |
| โ ๏ธ Error Handling | Custom exceptions and detailed error codes |
| ๐ YAML Configuration | Load and manage configurations |
| ๐ Nested Pipelines | Compose complex workflows |
| ๐ Progress Tracking | Rich terminal output |
| ๐งช Type Hints | Complete type annotations |
| ๐ Memory Control | Built-in memory utilities |
| ๐งฉ Composable | Reusable pipeline components |
| โก Parallel Execution | Execute steps in parallel (I/O or CPU bound) |
| ๐ Pipeline Composition | Use pipelines as steps in other pipelines |
| ๐ฏ Step Decorators | Define steps inline with @step decorator |
| ๐พ Checkpointing | Save and resume from checkpoints |
| โฑ๏ธ Timeouts | Prevent hanging tasks with timeout support |
| ๐ Resource Monitoring | Track RAM and CPU during execution |
| ๐ค Export | Export logs, metrics, and statistics to JSON/CSV |
๐ถ Diagram Walkthrough
The following diagram shows the high-level execution flow of a typical wpipe pipeline:
flowchart TD
A[๐ Start: User Creates Pipeline] --> B[๐ Define Steps]
B --> C[โ๏ธ Configure Pipeline]
C --> D[โถ๏ธ Execute: pipeline.run]
D --> E{Step 1}
E -->|Success| F{Step 2}
E -->|Error| Z[โ TaskError Raised]
F -->|Success| G{Step 3}
F -->|Condition True| H[โ
Branch True]
F -->|Condition False| I[โ Branch False]
H --> G
I --> G
G -->|Success| J[๐ฆ Accumulated Results]
G -->|Error| Z
J --> K[โ
Pipeline Complete]
Z --> L[๐ง Error Handler]
L --> M[๐ Access Partial Results]
Flow Description
- Start: User creates a Pipeline instance and defines step functions
- Configure: Steps are registered with name and version metadata
- Execute:
pipeline.run()starts the sequential execution - Data Flow: Each step receives accumulated results from all previous steps
- Conditions: Optional branching based on data evaluation
- Complete: Final accumulated results are returned
๐บ๏ธ System Workflow
The following sequence diagram shows the interaction between components during pipeline execution:
sequenceDiagram
participant User
participant Pipeline
participant Step1
participant Step2
participant SQLite
participant ExternalAPI
User->>Pipeline: Create Pipeline(verbose=True)
User->>Pipeline: set_steps([...])
User->>Pipeline: run(initial_data)
Pipeline->>Step1: Execute Step1(initial_data)
Step1-->>Pipeline: Return {result1}
Pipeline->>Pipeline: Accumulate data
Pipeline->>Step2: Execute Step2(accumulated_data)
Step2-->>Pipeline: Return {result2}
alt with API enabled
Pipeline->>ExternalAPI: worker_register()
ExternalAPI-->>Pipeline: worker_id
Pipeline->>ExternalAPI: register_process()
ExternalAPI-->>Pipeline: process_id
end
alt with SQLite enabled
Pipeline->>SQLite: write(input, output, details)
SQLite-->>Pipeline: record_id
end
Pipeline-->>User: Return accumulated_results
Note over User,Pipeline: Pipeline execution complete
๐๏ธ Architecture Components
The following diagram illustrates the static structure and dependencies of wpipe's main modules:
graph TB
subgraph Core["๐ฆ Core Layer"]
P[Pipeline<br/>pipe/pipe.py]
C[Condition<br/>pipe/pipe.py]
PM[ProgressManager<br/>pipe/pipe.py]
end
subgraph Integration["๐ Integration Layer"]
API[APIClient<br/>api_client/]
SQL[Sqlite<br/>sqlite/]
WS[Wsqlite<br/>sqlite/]
end
subgraph Utilities["๐ ๏ธ Utilities Layer"]
LOG[new_logger<br/>log/]
RAM[memory<br/>ram/]
YAML[leer_yaml<br/>util/]
end
subgraph Exceptions["โ ๏ธ Exception Layer"]
TE[TaskError<br/>exception/]
AE[ApiError<br/>exception/]
PE[ProcessError<br/>exception/]
CO[Codes<br/>exception/]
end
P --> C
P --> PM
P --> API
P --> SQL
P --> WS
API --> TE
API --> AE
SQL --> PE
WS --> SQL
User --> P
User --> C
User --> API
User --> WS
User --> YAML
User --> RAM
User --> LOG
Module Dependencies
graph LR
A[wpipe/__init__.py] --> B[pipe/pipe.py]
A --> C[api_client/]
A --> D[sqlite/]
A --> E[exception/]
B --> F[rich<br/>ProgressManager]
C --> G[requests<br/>HTTP calls]
D --> H[sqlite3<br/>Database]
E --> I[Built-in exceptions]
User --> A
โ๏ธ Container Lifecycle
Build Process
flowchart LR
A[๐ Source Code] --> B[๐ง Install Dependencies]
B --> C[pip install -e .]
C --> D[๐ฆ wpipe Package]
D --> E[โ
Ready for Development]
style A fill:#e1f5fe
style E fill:#c8e6c9
Runtime Process
flowchart TD
A[๐ Import wpipe] --> B[๐ Create Pipeline]
B --> C[โ๏ธ Configure Steps]
C --> D[โถ๏ธ Call pipeline.run]
D --> E[๐ Initialize Progress]
E --> F{Step Loop}
F -->|For each step| G[๐ฅ Get Step Data]
G --> H[โก Execute Step Function]
H --> I{Success?}
I -->|Yes| J[๐ฆ Accumulate Results]
J --> K{Next Step?}
K -->|Yes| F
K -->|No| L[โ
Return Final Results]
I -->|No| M{Retry Config?}
M -->|Yes| N[โณ Wait retry_delay]
N --> O[๐ Retry Attempt]
O --> H
M -->|No| P[โ Raise TaskError]
P --> Q[๐ Capture Partial Results]
style L fill:#c8e6c9
style P fill:#ffcdd2
๐ File-by-File Guide
| File/Directory | Purpose | Description |
|---|---|---|
wpipe/__init__.py |
Main exports | Exports Pipeline, Condition, APIClient, Wsqlite |
wpipe/pipe/pipe.py |
Core logic | Pipeline, Condition, ProgressManager classes |
wpipe/api_client/api_client.py |
HTTP client | APIClient, send_post, send_get functions |
wpipe/sqlite/Sqlite.py |
Database | Core SQLite operations |
wpipe/sqlite/Wsqlite.py |
Wrapper | Context manager for simple DB operations |
wpipe/log/log.py |
Logging | new_logger function (loguru) |
wpipe/ram/ram.py |
Memory | memory decorator, memory_limit, get_memory |
wpipe/util/utils.py |
Config | leer_yaml, escribir_yaml functions |
wpipe/exception/api_error.py |
Errors | TaskError, ApiError, ProcessError, Codes |
docs/source/ |
Documentation | Sphinx documentation source files |
examples/ |
Examples | 100+ working examples organized by topic |
test/ |
Tests | pytest test suite (106 tests) |
Getting Started
Installation
PyPI (Recommended)
pip install wpipe
From Source
git clone https://github.com/wisrovi/wpipe
cd wpipe
pip install -e .
Development Install
pip install -e ".[dev]"
Requirements
- Python 3.9 or higher
- requests (for API integration)
- pyyaml (for YAML configuration)
Verification
import wpipe
print(wpipe.__version__) # 1.0.0
Usage/Examples
Basic Pipeline
from wpipe import Pipeline
def fetch_data(data):
"""Fetch data from a source."""
return {"users": [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}]}
def process_data(data):
"""Process the fetched data."""
users = data["users"]
return {"count": len(users), "names": [u["name"] for u in users]}
def save_data(data):
"""Save results."""
return {"status": "saved", "processed": data["count"]}
# Create and configure your pipeline
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(fetch_data, "Fetch Data", "v1.0"),
(process_data, "Process Data", "v1.0"),
(save_data, "Save Data", "v1.0"),
])
# Run the pipeline
result = pipeline.run({})
# Output: {'users': [...], 'count': 3, 'names': [...], 'status': 'saved', 'processed': 3}
Conditional Pipeline
from wpipe import Pipeline, Condition
def check_value(data):
return {"value": 75}
def process_high(data):
return {"result": "High value"}
def process_low(data):
return {"result": "Low value"}
condition = Condition(
expression="value > 50",
branch_true=[(process_high, "High", "v1.0")],
branch_false=[(process_low, "Low", "v1.0")],
)
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(check_value, "Check", "v1.0"),
condition,
])
With SQLite Storage
from wpipe import Pipeline
from wpipe.sqlite import Wsqlite
with Wsqlite(db_name="results.db") as db:
db.input = {"x": 10}
result = pipeline.run({"x": 10})
db.output = result
print(f"Record ID: {db.id}")
With Retry Logic
pipeline = Pipeline(
verbose=True,
max_retries=3,
retry_delay=2.0,
retry_on_exceptions=(ConnectionError, TimeoutError),
)
Advanced Usage
Parallel Execution
Execute independent steps in parallel for I/O or CPU-bound tasks:
from wpipe import Pipeline
from wpipe.parallel import ParallelExecutor, ExecutionMode
def fetch_users(data):
import time; time.sleep(1)
return {"users": ["Alice", "Bob"]}
def fetch_posts(data):
import time; time.sleep(1)
return {"posts": ["Post 1", "Post 2"]}
def aggregate(data):
return {"total": len(data.get("users", [])) + len(data.get("posts", []))}
executor = ParallelExecutor(max_workers=4)
executor.add_step("fetch_users", fetch_users, mode=ExecutionMode.IO_BOUND)
executor.add_step("fetch_posts", fetch_posts, mode=ExecutionMode.IO_BOUND)
executor.add_step("aggregate", aggregate, depends_on=["fetch_users", "fetch_posts"])
result = executor.execute({}) # 4x faster than sequential!
For Loops
Iterate over steps with count or condition:
from wpipe import Pipeline, For
def check_status(data):
count = data.get("count", 0) + 1
return {"count": count, "done": count >= 3}
# Count-based loop
loop = For(steps=[(check_status, "Check", "v1.0")], iterations=3)
# Condition-based loop
loop = For(steps=[(check_status, "Check", "v1.0")], validation_expression="not data.get('done', False)")
pipeline = Pipeline(verbose=False)
pipeline.set_steps([loop])
result = pipeline.run({"count": 0})
Pipeline Composition
Use pipelines as steps in other pipelines:
from wpipe import Pipeline
from wpipe.composition import NestedPipelineStep, PipelineAsStep
# Inner pipelines
clean = Pipeline(verbose=False)
clean.set_steps([(lambda d: {"cleaned": True}, "Clean", "v1.0")])
analyze = Pipeline(verbose=False)
analyze.set_steps([(lambda d: {"analyzed": True}, "Analyze", "v1.0")])
# Compose into main pipeline
main = Pipeline(verbose=False)
main.set_steps([
(lambda d: {"data": "raw"}, "Fetch", "v1.0"),
(lambda d: NestedPipelineStep("clean", clean).run(d), "Clean", "v1.0"),
(lambda d: NestedPipelineStep("analyze", analyze).run(d), "Analyze", "v1.0"),
])
result = main.run({})
@step Decorator
Define steps inline with metadata:
from wpipe import step, AutoRegister, Pipeline
@step(description="Fetch data", timeout=30, tags=["data"], retry_count=3)
def fetch_data(context):
return {"data": [1, 2, 3]}
@step(description="Process", depends_on=["fetch_data"], tags=["transform"])
def process_data(context):
return {"processed": context.get("data", [])}
# Auto-register all decorated steps
pipeline = Pipeline(verbose=False)
AutoRegister.register_all(pipeline)
result = pipeline.run({})
Checkpointing
Save and resume pipeline state:
from wpipe import Pipeline
from wpipe.checkpoint import CheckpointManager
checkpoint = CheckpointManager(tracking_db="tracking.db")
# Create checkpoint
checkpoint.create_checkpoint("v1", "my_pipeline", {"state": "initial"})
# Check if can resume
can_resume = checkpoint.can_resume("my_pipeline")
if can_resume:
state = checkpoint.get_checkpoint("my_pipeline", "v1")
print(f"Resuming from: {state}")
# Get stats
stats = checkpoint.get_checkpoint_stats("my_pipeline")
print(f"Total checkpoints: {stats['total_checkpoints']}")
Resource Monitoring
Track CPU and RAM during execution:
from wpipe import Pipeline
from wpipe.resource_monitor import ResourceMonitor, ResourceMonitorRegistry
# Single task monitoring
monitor = ResourceMonitor()
monitor.start()
# ... run your task ...
monitor.stop()
stats = monitor.get_stats()
print(f"Peak RAM: {stats['peak_ram_mb']} MB")
# Registry for multiple tasks with SQLite persistence
registry = ResourceMonitorRegistry(db_path="resources.db")
task_id = registry.register_task("my_task")
registry.record_usage(task_id, {"cpu": 45.2, "ram": 512.0})
Export to JSON/CSV
Export logs, metrics, and statistics:
from wpipe.export import PipelineExporter
exporter = PipelineExporter(db_path="tracking.db")
# Export pipeline logs to JSON
logs_json = exporter.export_pipeline_logs(format="json")
# Export to CSV with filter
logs_csv = exporter.export_pipeline_logs(pipeline_id="PIPE-123", format="csv")
# Export metrics
metrics = exporter.export_metrics(format="json")
# Export statistics
stats = exporter.export_statistics(format="json")
# Save to file
exporter.export_pipeline_logs(output_path="logs.json", format="json")
Timeout Decorators
Prevent hanging tasks:
from wpipe.timeout import timeout_sync, timeout_async, TaskTimer
import asyncio
# Sync timeout
@timeout_sync(seconds=5)
def slow_function(data):
import time; time.sleep(10) # Will be killed after 5s
return {"done": True}
# Async timeout
@timeout_async(seconds=5)
async def slow_async_function(data):
import asyncio; await asyncio.sleep(10) # Will be killed after 5s
return {"done": True}
# Task timer context manager
with TaskTimer("my_task") as timer:
# ... your code ...
pass
print(f"Elapsed: {timer.elapsed_ms}ms")
Async Pipeline
import asyncio
from wpipe.pipe.pipe_async import PipelineAsync
async def fetch_data(data):
await asyncio.sleep(0.1)
return {"data": "fetched"}
async def process_data(data):
await asyncio.sleep(0.1)
return {"processed": True}
async def main():
pipeline = PipelineAsync(verbose=False)
pipeline.set_steps([
(fetch_data, "Fetch", "v1.0"),
(process_data, "Process", "v1.0"),
])
result = await pipeline.run({})
print(result)
asyncio.run(main())
Data Flow Visualization
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PIPELINE EXECUTION FLOW โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Input Step 1 Step 2 Step 3 Output
โโโโโโ โโโโโโ โโโโโโ โโโโโโ โโโโโโ
โโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ
โ {} โโโโโโถโ Fetch โโโโโโถโ Process โโโโโโถโ Save โโโโโโถโ Result โ
โโโโโโโ โ Data โ โ Data โ โ Data โ โโโโโโโโโโโ
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ
{users: [...]} {users, count} {count, status}
API Reference
Pipeline
from wpipe import Pipeline
pipeline = Pipeline(
verbose=True, # Enable verbose output
api_config={}, # API configuration
max_retries=3, # Maximum retry attempts
retry_delay=1.0, # Delay between retries
retry_on_exceptions=(ConnectionError, TimeoutError),
)
Methods:
set_steps(steps)- Configure pipeline stepsrun(input_data)- Execute the pipelineworker_register(name, version)- Register with APIset_worker_id(worker_id)- Set worker ID
Condition
from wpipe import Condition
condition = Condition(
expression="value > 50",
branch_true=[(step_true, "True", "v1.0")],
branch_false=[(step_false, "False", "v1.0")],
)
Exceptions
from wpipe.exception import TaskError, ApiError, ProcessError, Codes
try:
result = pipeline.run(data)
except TaskError as e:
print(f"Step: {e.step_name}")
print(f"Code: {e.error_code}")
Error Codes:
TASK_FAILED(502) - Task execution failedAPI_ERROR(501) - API communication errorUPDATE_PROCESS_ERROR(504) - Process update failedUPDATE_TASK(505) - Task update failedUPDATE_PROCESS_OK(503) - Process completed successfully
Code Quality
| Metric | Value |
|---|---|
| Tests | 106 passing |
| Examples | 100+ |
| Python Support | 3.9, 3.10, 3.11, 3.12, 3.13 |
| Type Hints | Complete |
| Docstrings | Google-style |
Running Tests
# Run all tests
pytest
# Run with coverage
pytest --cov=wpipe --cov-report=html
open htmlcov/index.html
# Lint
ruff check wpipe/
# Auto-fix linting
ruff check wpipe/ --fix
# Type check
mypy wpipe/
# All quality checks
ruff check wpipe/ && mypy wpipe/ && pytest
Examples
Explore 100+ examples organized by functionality:
| Folder | Examples | Description |
|---|---|---|
| 01_basic_pipeline | 15 | Functions, classes, mixed steps, data flow, async |
| 02_api_pipeline | 20 | External APIs, workers, authentication, health checks |
| 03_error_handling | 15 | Exceptions, error codes, recovery, partial results |
| 04_condition | 12 | Conditional branches, decision trees, boolean logic |
| 05_retry | 12 | Automatic retries, backoff, custom exceptions |
| 06_sqlite_integration | 14 | Persistence, CSV export, batch operations |
| 07_nested_pipelines | 14 | Complex workflows, parallel execution, recursion |
| 08_yaml_config | 14 | Configuration, environment variables, validation |
| 09_microservice | 11 | Production-ready microservice patterns |
Running Examples
# Basic pipeline
python examples/01_basic_pipeline/01_simple_function/example.py
# With conditions
python examples/04_condition/01_basic_condition_example/example.py
# With SQLite
python examples/06_sqlite_integration/02_wsqlite_example/example.py
# With retry
python examples/05_retry/01_basic_retry_example/example.py
Architecture
wpipe/
โโโ __init__.py # Main exports (Pipeline, Condition, APIClient, Wsqlite)
โโโ pipe/
โ โโโ pipe.py # Pipeline, Condition, ProgressManager
โโโ api_client/
โ โโโ api_client.py # APIClient, send_post, send_get
โโโ sqlite/
โ โโโ Sqlite.py # Core SQLite operations
โ โโโ Wsqlite.py # Simplified context manager wrapper
โโโ log/
โ โโโ log.py # Logging utilities (loguru)
โโโ ram/
โ โโโ ram.py # Memory control utilities
โโโ util/
โ โโโ utils.py # YAML utilities (leer_yaml, escribir_yaml)
โโโ exception/
โโโ api_error.py # TaskError, ApiError, ProcessError, Codes
Documentation
| Resource | URL |
|---|---|
| Documentation | https://wpipe.readthedocs.io/ |
| Live Demo | https://wisrovi.github.io/wpipe/ |
| PyPI Package | https://pypi.org/project/wpipe/ |
| GitHub Repository | https://github.com/wisrovi/wpipe |
| Releases | https://github.com/wisrovi/wpipe/releases |
| Issues | https://github.com/wisrovi/wpipe/issues |
Why wpipe?
| Traditional Tools | wpipe |
|---|---|
| Complex setup | Simple pip install |
| Web UI required | Pure Python code |
| Heavy dependencies | Minimal requirements |
| YAML/JSON config | Python code |
| Overkill for simple tasks | Perfect for any scale |
License
MIT License - See LICENSE file
Author
William Steve Rodriguez Villamizar
- GitHub: github.com/wisrovi
- LinkedIn: linkedin.com/in/wisrovi-rodriguez
- Portfolio: wisrovi.github.io
Star โญ this repo if you find it useful!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wpipe-1.5.6.tar.gz.
File metadata
- Download URL: wpipe-1.5.6.tar.gz
- Upload date:
- Size: 141.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b48143536a48078a6469e6bf59564c40d53f478eb9eb63d86f4dd8fc9e16830
|
|
| MD5 |
4ed20631a40f972cc12705da0e4c0b59
|
|
| BLAKE2b-256 |
b28521edb9663e19a83bbf714613bbf25b9dbc9bf701bc4fc6b17d4b7ba07b63
|
File details
Details for the file wpipe-1.5.6-py3-none-any.whl.
File metadata
- Download URL: wpipe-1.5.6-py3-none-any.whl
- Upload date:
- Size: 112.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
609feef55cd4d7af95432e9b9ae2e6f57b77bb71605f0f8dc27b7f69a4fefb52
|
|
| MD5 |
7c408cca397447fa4b1515cba8fddbc2
|
|
| BLAKE2b-256 |
5e6ab28522d9abfab3b613c97a7dcdb26f92a57e483fde1f932c8a98825e8890
|