Skip to main content

Pipeline library with API integration for task orchestration and execution tracking

Project description

wpipe - Python Pipeline Library for Sequential Data Processing

PyPI version Python versions License: MIT LTS Tests Documentation

Version 2.0.0-LTS: Long-Term Support release with parallel execution, pipeline composition, step decorators, resource monitoring, checkpointing, and 90% test coverage.

Project Overview

wpipe is a powerful, enterprise-grade Python library for creating and executing complex data processing pipelines. It is designed for mission-critical environments where reliability, observability, and performance are paramount. Now in its LTS (Long-Term Support) phase, WPipe guarantees a stable API and long-term maintenance.

Key Characteristics

  • No web UI required - Just clean, production-ready Python code
  • Minimal dependencies - Only requests and pyyaml
  • Production-ready - Comprehensive error handling, retry logic, and logging
  • Well-documented - Extensive docs, tutorials, and 100+ examples
  • 100% Type Hints - Excellent IDE support and better developer experience

Features

Feature Description
๐Ÿ”— Pipeline Orchestration Create pipelines with step functions and classes
๐ŸŒณ Conditional Branches Execute different paths based on data conditions
๐Ÿ”„ Retry Logic Automatic retries with configurable backoff strategies
๐ŸŒ API Integration Connect to external APIs, register workers
๐Ÿ’พ SQLite Storage Persist execution results to database
โš ๏ธ Error Handling Custom exceptions and detailed error codes
๐Ÿ“‹ YAML Configuration Load and manage configurations
๐Ÿ”€ Nested Pipelines Compose complex workflows
๐Ÿ“Š Progress Tracking Rich terminal output
๐Ÿงช Type Hints Complete type annotations
๐Ÿ”’ Memory Control Built-in memory utilities
๐Ÿงฉ Composable Reusable pipeline components
โšก Parallel Execution Execute steps in parallel (I/O or CPU bound)
๐Ÿ“‚ Pipeline Composition Use pipelines as steps in other pipelines
๐ŸŽฏ Step Decorators Define steps inline with @step decorator
๐Ÿ’พ Checkpointing Save and resume from checkpoints
โฑ๏ธ Timeouts Prevent hanging tasks with timeout support
๐Ÿ“ˆ Resource Monitoring Track RAM and CPU during execution
๐Ÿ“ค Export Export logs, metrics, and statistics to JSON/CSV

๐Ÿšถ Diagram Walkthrough

The following diagram shows the high-level execution flow of a typical wpipe pipeline:

flowchart TD
    A[๐Ÿš€ Start: User Creates Pipeline] --> B[๐Ÿ“ Define Steps]
    B --> C[โš™๏ธ Configure Pipeline]
    C --> D[โ–ถ๏ธ Execute: pipeline.run]
    D --> E{Step 1}
    E -->|Success| F{Step 2}
    E -->|Error| Z[โŒ TaskError Raised]
    F -->|Success| G{Step 3}
    F -->|Condition True| H[โœ… Branch True]
    F -->|Condition False| I[โŒ Branch False]
    H --> G
    I --> G
    G -->|Success| J[๐Ÿ“ฆ Accumulated Results]
    G -->|Error| Z
    J --> K[โœ… Pipeline Complete]
    Z --> L[๐Ÿ”ง Error Handler]
    L --> M[๐Ÿ“‹ Access Partial Results]

Flow Description

  1. Start: User creates a Pipeline instance and defines step functions
  2. Configure: Steps are registered with name and version metadata
  3. Execute: pipeline.run() starts the sequential execution
  4. Data Flow: Each step receives accumulated results from all previous steps
  5. Conditions: Optional branching based on data evaluation
  6. Complete: Final accumulated results are returned

๐Ÿ—บ๏ธ System Workflow

The following sequence diagram shows the interaction between components during pipeline execution:

sequenceDiagram
    participant User
    participant Pipeline
    participant Step1
    participant Step2
    participant SQLite
    participant ExternalAPI

    User->>Pipeline: Create Pipeline(verbose=True)
    User->>Pipeline: set_steps([...])
    User->>Pipeline: run(initial_data)
    
    Pipeline->>Step1: Execute Step1(initial_data)
    Step1-->>Pipeline: Return {result1}
    
    Pipeline->>Pipeline: Accumulate data
    Pipeline->>Step2: Execute Step2(accumulated_data)
    Step2-->>Pipeline: Return {result2}
    
    alt with API enabled
        Pipeline->>ExternalAPI: worker_register()
        ExternalAPI-->>Pipeline: worker_id
        Pipeline->>ExternalAPI: register_process()
        ExternalAPI-->>Pipeline: process_id
    end
    
    alt with SQLite enabled
        Pipeline->>SQLite: write(input, output, details)
        SQLite-->>Pipeline: record_id
    end
    
    Pipeline-->>User: Return accumulated_results
    
    Note over User,Pipeline: Pipeline execution complete

๐Ÿ—๏ธ Architecture Components

The following diagram illustrates the static structure and dependencies of wpipe's main modules:

graph TB
    subgraph Core["๐Ÿ“ฆ Core Layer"]
        P[Pipeline<br/>pipe/pipe.py]
        C[Condition<br/>pipe/pipe.py]
        PM[ProgressManager<br/>pipe/pipe.py]
    end
    
    subgraph Integration["๐Ÿ”Œ Integration Layer"]
        API[APIClient<br/>api_client/]
        SQL[Sqlite<br/>sqlite/]
        WS[Wsqlite<br/>sqlite/]
    end
    
    subgraph Utilities["๐Ÿ› ๏ธ Utilities Layer"]
        LOG[new_logger<br/>log/]
        RAM[memory<br/>ram/]
        YAML[leer_yaml<br/>util/]
    end
    
    subgraph Exceptions["โš ๏ธ Exception Layer"]
        TE[TaskError<br/>exception/]
        AE[ApiError<br/>exception/]
        PE[ProcessError<br/>exception/]
        CO[Codes<br/>exception/]
    end
    
    P --> C
    P --> PM
    P --> API
    P --> SQL
    P --> WS
    API --> TE
    API --> AE
    SQL --> PE
    WS --> SQL
    
    User --> P
    User --> C
    User --> API
    User --> WS
    User --> YAML
    User --> RAM
    User --> LOG

Module Dependencies

graph LR
    A[wpipe/__init__.py] --> B[pipe/pipe.py]
    A --> C[api_client/]
    A --> D[sqlite/]
    A --> E[exception/]
    
    B --> F[rich<br/>ProgressManager]
    C --> G[requests<br/>HTTP calls]
    D --> H[sqlite3<br/>Database]
    E --> I[Built-in exceptions]
    
    User --> A

โš™๏ธ Container Lifecycle

Build Process

flowchart LR
    A[๐Ÿ“ Source Code] --> B[๐Ÿ”ง Install Dependencies]
    B --> C[pip install -e .]
    C --> D[๐Ÿ“ฆ wpipe Package]
    D --> E[โœ… Ready for Development]
    
    style A fill:#e1f5fe
    style E fill:#c8e6c9

Runtime Process

flowchart TD
    A[๐Ÿš€ Import wpipe] --> B[๐Ÿ“ Create Pipeline]
    B --> C[โš™๏ธ Configure Steps]
    C --> D[โ–ถ๏ธ Call pipeline.run]
    D --> E[๐Ÿ“Š Initialize Progress]
    
    E --> F{Step Loop}
    F -->|For each step| G[๐Ÿ“ฅ Get Step Data]
    G --> H[โšก Execute Step Function]
    H --> I{Success?}
    
    I -->|Yes| J[๐Ÿ“ฆ Accumulate Results]
    J --> K{Next Step?}
    K -->|Yes| F
    K -->|No| L[โœ… Return Final Results]
    
    I -->|No| M{Retry Config?}
    M -->|Yes| N[โณ Wait retry_delay]
    N --> O[๐Ÿ”„ Retry Attempt]
    O --> H
    
    M -->|No| P[โŒ Raise TaskError]
    P --> Q[๐Ÿ“‹ Capture Partial Results]
    
    style L fill:#c8e6c9
    style P fill:#ffcdd2

๐Ÿ“‚ File-by-File Guide

File/Directory Purpose Description
wpipe/__init__.py Main exports Exports Pipeline, Condition, APIClient, Wsqlite
wpipe/pipe/pipe.py Core logic Pipeline, Condition, ProgressManager classes
wpipe/api_client/api_client.py HTTP client APIClient, send_post, send_get functions
wpipe/sqlite/Sqlite.py Database Core SQLite operations
wpipe/sqlite/Wsqlite.py Wrapper Context manager for simple DB operations
wpipe/log/log.py Logging new_logger function (loguru)
wpipe/ram/ram.py Memory memory decorator, memory_limit, get_memory
wpipe/util/utils.py Config leer_yaml, escribir_yaml functions
wpipe/exception/api_error.py Errors TaskError, ApiError, ProcessError, Codes
docs/source/ Documentation Sphinx documentation source files
examples/ Examples 100+ working examples organized by topic
test/ Tests pytest test suite (106 tests)

Getting Started

Installation

PyPI (Recommended)

pip install wpipe

From Source

git clone https://github.com/wisrovi/wpipe
cd wpipe
pip install -e .

Development Install

pip install -e ".[dev]"

Requirements

  • Python 3.9 or higher
  • requests (for API integration)
  • pyyaml (for YAML configuration)

Verification

import wpipe
print(wpipe.__version__)  # 1.0.0

Usage/Examples

Basic Pipeline

from wpipe import Pipeline

def fetch_data(data):
    """Fetch data from a source."""
    return {"users": [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}]}

def process_data(data):
    """Process the fetched data."""
    users = data["users"]
    return {"count": len(users), "names": [u["name"] for u in users]}

def save_data(data):
    """Save results."""
    return {"status": "saved", "processed": data["count"]}

# Create and configure your pipeline
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (fetch_data, "Fetch Data", "v1.0"),
    (process_data, "Process Data", "v1.0"),
    (save_data, "Save Data", "v1.0"),
])

# Run the pipeline
result = pipeline.run({})
# Output: {'users': [...], 'count': 3, 'names': [...], 'status': 'saved', 'processed': 3}

Conditional Pipeline

from wpipe import Pipeline, Condition

def check_value(data):
    return {"value": 75}

def process_high(data):
    return {"result": "High value"}

def process_low(data):
    return {"result": "Low value"}

condition = Condition(
    expression="value > 50",
    branch_true=[(process_high, "High", "v1.0")],
    branch_false=[(process_low, "Low", "v1.0")],
)

pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (check_value, "Check", "v1.0"),
    condition,
])

With SQLite Storage

from wpipe import Pipeline
from wpipe.sqlite import Wsqlite

with Wsqlite(db_name="results.db") as db:
    db.input = {"x": 10}
    result = pipeline.run({"x": 10})
    db.output = result
    print(f"Record ID: {db.id}")

With Retry Logic

pipeline = Pipeline(
    verbose=True,
    max_retries=3,
    retry_delay=2.0,
    retry_on_exceptions=(ConnectionError, TimeoutError),
)

Advanced Usage

Parallel Execution

Execute independent steps in parallel for I/O or CPU-bound tasks:

from wpipe import Pipeline
from wpipe.parallel import ParallelExecutor, ExecutionMode

def fetch_users(data):
    import time; time.sleep(1)
    return {"users": ["Alice", "Bob"]}

def fetch_posts(data):
    import time; time.sleep(1)
    return {"posts": ["Post 1", "Post 2"]}

def aggregate(data):
    return {"total": len(data.get("users", [])) + len(data.get("posts", []))}

executor = ParallelExecutor(max_workers=4)
executor.add_step("fetch_users", fetch_users, mode=ExecutionMode.IO_BOUND)
executor.add_step("fetch_posts", fetch_posts, mode=ExecutionMode.IO_BOUND)
executor.add_step("aggregate", aggregate, depends_on=["fetch_users", "fetch_posts"])

result = executor.execute({})  # 4x faster than sequential!

For Loops

Iterate over steps with count or condition:

from wpipe import Pipeline, For

def check_status(data):
    count = data.get("count", 0) + 1
    return {"count": count, "done": count >= 3}

# Count-based loop
loop = For(steps=[(check_status, "Check", "v1.0")], iterations=3)

# Condition-based loop
loop = For(steps=[(check_status, "Check", "v1.0")], validation_expression="not data.get('done', False)")

pipeline = Pipeline(verbose=False)
pipeline.set_steps([loop])
result = pipeline.run({"count": 0})

Pipeline Composition

Use pipelines as steps in other pipelines:

from wpipe import Pipeline
from wpipe.composition import NestedPipelineStep, PipelineAsStep

# Inner pipelines
clean = Pipeline(verbose=False)
clean.set_steps([(lambda d: {"cleaned": True}, "Clean", "v1.0")])

analyze = Pipeline(verbose=False)
analyze.set_steps([(lambda d: {"analyzed": True}, "Analyze", "v1.0")])

# Compose into main pipeline
main = Pipeline(verbose=False)
main.set_steps([
    (lambda d: {"data": "raw"}, "Fetch", "v1.0"),
    (lambda d: NestedPipelineStep("clean", clean).run(d), "Clean", "v1.0"),
    (lambda d: NestedPipelineStep("analyze", analyze).run(d), "Analyze", "v1.0"),
])

result = main.run({})

@step Decorator

Define steps inline with metadata:

from wpipe import step, AutoRegister, Pipeline

@step(description="Fetch data", timeout=30, tags=["data"], retry_count=3)
def fetch_data(context):
    return {"data": [1, 2, 3]}

@step(description="Process", depends_on=["fetch_data"], tags=["transform"])
def process_data(context):
    return {"processed": context.get("data", [])}

# Auto-register all decorated steps
pipeline = Pipeline(verbose=False)
AutoRegister.register_all(pipeline)
result = pipeline.run({})

Checkpointing

Save and resume pipeline state:

from wpipe import Pipeline
from wpipe.checkpoint import CheckpointManager

checkpoint = CheckpointManager(tracking_db="tracking.db")

# Create checkpoint
checkpoint.create_checkpoint("v1", "my_pipeline", {"state": "initial"})

# Check if can resume
can_resume = checkpoint.can_resume("my_pipeline")
if can_resume:
    state = checkpoint.get_checkpoint("my_pipeline", "v1")
    print(f"Resuming from: {state}")

# Get stats
stats = checkpoint.get_checkpoint_stats("my_pipeline")
print(f"Total checkpoints: {stats['total_checkpoints']}")

Resource Monitoring

Track CPU and RAM during execution:

from wpipe import Pipeline
from wpipe.resource_monitor import ResourceMonitor, ResourceMonitorRegistry

# Single task monitoring
monitor = ResourceMonitor()
monitor.start()
# ... run your task ...
monitor.stop()
stats = monitor.get_stats()
print(f"Peak RAM: {stats['peak_ram_mb']} MB")

# Registry for multiple tasks with SQLite persistence
registry = ResourceMonitorRegistry(db_path="resources.db")
task_id = registry.register_task("my_task")
registry.record_usage(task_id, {"cpu": 45.2, "ram": 512.0})

Export to JSON/CSV

Export logs, metrics, and statistics:

from wpipe.export import PipelineExporter

exporter = PipelineExporter(db_path="tracking.db")

# Export pipeline logs to JSON
logs_json = exporter.export_pipeline_logs(format="json")

# Export to CSV with filter
logs_csv = exporter.export_pipeline_logs(pipeline_id="PIPE-123", format="csv")

# Export metrics
metrics = exporter.export_metrics(format="json")

# Export statistics
stats = exporter.export_statistics(format="json")

# Save to file
exporter.export_pipeline_logs(output_path="logs.json", format="json")

Timeout Decorators

Prevent hanging tasks:

from wpipe.timeout import timeout_sync, timeout_async, TaskTimer
import asyncio

# Sync timeout
@timeout_sync(seconds=5)
def slow_function(data):
    import time; time.sleep(10)  # Will be killed after 5s
    return {"done": True}

# Async timeout
@timeout_async(seconds=5)
async def slow_async_function(data):
    import asyncio; await asyncio.sleep(10)  # Will be killed after 5s
    return {"done": True}

# Task timer context manager
with TaskTimer("my_task") as timer:
    # ... your code ...
    pass
print(f"Elapsed: {timer.elapsed_ms}ms")

Async Pipeline

import asyncio
from wpipe.pipe.pipe_async import PipelineAsync

async def fetch_data(data):
    await asyncio.sleep(0.1)
    return {"data": "fetched"}

async def process_data(data):
    await asyncio.sleep(0.1)
    return {"processed": True}

async def main():
    pipeline = PipelineAsync(verbose=False)
    pipeline.set_steps([
        (fetch_data, "Fetch", "v1.0"),
        (process_data, "Process", "v1.0"),
    ])
    result = await pipeline.run({})
    print(result)

asyncio.run(main())

Data Flow Visualization

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        PIPELINE EXECUTION FLOW                            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Input          Step 1            Step 2            Step 3            Output
โ”€โ”€โ”€โ”€โ”€โ”€          โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€
โ”Œโ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  {}  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Fetch  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ Process โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Save   โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ Result  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚  Data   โ”‚      โ”‚  Data   โ”‚      โ”‚  Data   โ”‚      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
             โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
             {users: [...]}   {users, count}   {count, status}

API Reference

Pipeline

from wpipe import Pipeline

pipeline = Pipeline(
    verbose=True,           # Enable verbose output
    api_config={},         # API configuration
    max_retries=3,         # Maximum retry attempts
    retry_delay=1.0,       # Delay between retries
    retry_on_exceptions=(ConnectionError, TimeoutError),
)

Methods:

  • set_steps(steps) - Configure pipeline steps
  • run(input_data) - Execute the pipeline
  • worker_register(name, version) - Register with API
  • set_worker_id(worker_id) - Set worker ID

Condition

from wpipe import Condition

condition = Condition(
    expression="value > 50",
    branch_true=[(step_true, "True", "v1.0")],
    branch_false=[(step_false, "False", "v1.0")],
)

Exceptions

from wpipe.exception import TaskError, ApiError, ProcessError, Codes

try:
    result = pipeline.run(data)
except TaskError as e:
    print(f"Step: {e.step_name}")
    print(f"Code: {e.error_code}")

Error Codes:

  • TASK_FAILED (502) - Task execution failed
  • API_ERROR (501) - API communication error
  • UPDATE_PROCESS_ERROR (504) - Process update failed
  • UPDATE_TASK (505) - Task update failed
  • UPDATE_PROCESS_OK (503) - Process completed successfully

Code Quality

Metric Value
Tests 106 passing
Examples 100+
Python Support 3.9, 3.10, 3.11, 3.12, 3.13
Type Hints Complete
Docstrings Google-style

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=wpipe --cov-report=html
open htmlcov/index.html

# Lint
ruff check wpipe/

# Auto-fix linting
ruff check wpipe/ --fix

# Type check
mypy wpipe/

# All quality checks
ruff check wpipe/ && mypy wpipe/ && pytest

Examples

Explore 100+ examples organized by functionality:

Folder Examples Description
01_basic_pipeline 15 Functions, classes, mixed steps, data flow, async
02_api_pipeline 20 External APIs, workers, authentication, health checks
03_error_handling 15 Exceptions, error codes, recovery, partial results
04_condition 12 Conditional branches, decision trees, boolean logic
05_retry 12 Automatic retries, backoff, custom exceptions
06_sqlite_integration 14 Persistence, CSV export, batch operations
07_nested_pipelines 14 Complex workflows, parallel execution, recursion
08_yaml_config 14 Configuration, environment variables, validation
09_microservice 11 Production-ready microservice patterns

Running Examples

# Basic pipeline
python examples/01_basic_pipeline/01_simple_function/example.py

# With conditions
python examples/04_condition/01_basic_condition_example/example.py

# With SQLite
python examples/06_sqlite_integration/02_wsqlite_example/example.py

# With retry
python examples/05_retry/01_basic_retry_example/example.py

Architecture

wpipe/
โ”œโ”€โ”€ __init__.py           # Main exports (Pipeline, Condition, APIClient, Wsqlite)
โ”œโ”€โ”€ pipe/
โ”‚   โ””โ”€โ”€ pipe.py           # Pipeline, Condition, ProgressManager
โ”œโ”€โ”€ api_client/
โ”‚   โ””โ”€โ”€ api_client.py     # APIClient, send_post, send_get
โ”œโ”€โ”€ sqlite/
โ”‚   โ”œโ”€โ”€ Sqlite.py         # Core SQLite operations
โ”‚   โ””โ”€โ”€ Wsqlite.py        # Simplified context manager wrapper
โ”œโ”€โ”€ log/
โ”‚   โ””โ”€โ”€ log.py            # Logging utilities (loguru)
โ”œโ”€โ”€ ram/
โ”‚   โ””โ”€โ”€ ram.py            # Memory control utilities
โ”œโ”€โ”€ util/
โ”‚   โ””โ”€โ”€ utils.py          # YAML utilities (leer_yaml, escribir_yaml)
โ””โ”€โ”€ exception/
    โ””โ”€โ”€ api_error.py      # TaskError, ApiError, ProcessError, Codes

Documentation

Resource URL
Documentation https://wpipe.readthedocs.io/
Live Demo https://wisrovi.github.io/wpipe/
PyPI Package https://pypi.org/project/wpipe/
GitHub Repository https://github.com/wisrovi/wpipe
Releases https://github.com/wisrovi/wpipe/releases
Issues https://github.com/wisrovi/wpipe/issues

Why wpipe?

Traditional Tools wpipe
Complex setup Simple pip install
Web UI required Pure Python code
Heavy dependencies Minimal requirements
YAML/JSON config Python code
Overkill for simple tasks Perfect for any scale

License

MIT License - See LICENSE file


Author

William Steve Rodriguez Villamizar


Star โญ this repo if you find it useful!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wpipe-1.5.6.tar.gz (141.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wpipe-1.5.6-py3-none-any.whl (112.8 kB view details)

Uploaded Python 3

File details

Details for the file wpipe-1.5.6.tar.gz.

File metadata

  • Download URL: wpipe-1.5.6.tar.gz
  • Upload date:
  • Size: 141.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for wpipe-1.5.6.tar.gz
Algorithm Hash digest
SHA256 2b48143536a48078a6469e6bf59564c40d53f478eb9eb63d86f4dd8fc9e16830
MD5 4ed20631a40f972cc12705da0e4c0b59
BLAKE2b-256 b28521edb9663e19a83bbf714613bbf25b9dbc9bf701bc4fc6b17d4b7ba07b63

See more details on using hashes here.

File details

Details for the file wpipe-1.5.6-py3-none-any.whl.

File metadata

  • Download URL: wpipe-1.5.6-py3-none-any.whl
  • Upload date:
  • Size: 112.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for wpipe-1.5.6-py3-none-any.whl
Algorithm Hash digest
SHA256 609feef55cd4d7af95432e9b9ae2e6f57b77bb71605f0f8dc27b7f69a4fefb52
MD5 7c408cca397447fa4b1515cba8fddbc2
BLAKE2b-256 5e6ab28522d9abfab3b613c97a7dcdb26f92a57e483fde1f932c8a98825e8890

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page