Skip to main content

Pipeline library with API integration for task orchestration and execution tracking

Project description

🚀 WPipe v2.3.2

El motor de orquestación de pipelines más rápido, resiliente y puro para Python.

WPipe es una librería profesional diseñada para automatizar flujos de trabajo complejos, garantizando que tus datos viajen seguros, tus procesos sean ultra-rápidos y tus fallos sean fáciles de diagnosticar. Incluye ahora un Tour de Aprendizaje con 140 Niveles para dominar la librería desde lo más básico hasta lo más avanzado.

PyPI version Python versions License: MIT Documentation Status


💎 ¿Por qué WPipe?

Diferénciate de los scripts lineales. WPipe te ofrece superpoderes:

Superpoder Descripción
Modo Relámpago Optimización extrema de SQLite (WAL Mode) y sincronización thread-safe para ejecuciones paralelas.
🧵 Paralelismo Nativo Ejecuta tareas en Hilos o Procesos con un solo comando. Bypass del GIL para tareas pesadas de CPU.
🛡️ Checkpoints Inteligentes Resiliencia ante objetos no serializables y referencias circulares. Si el sistema cae, WPipe reanuda exactamente donde se quedó.
🔍 Captura de Errores Forense Olvídate de los errores genéricos. Recibe notificaciones detalladas con el archivo y la línea exacta del fallo.
🧬 Contratos de Datos Valida tu "Bodega" de datos automáticamente con esquemas estrictos pero extensibles.
🔄 Paridad Síncrona/Asíncrona Elige entre Pipeline o PipelineAsync con el 100% de las mismas funcionalidades.

📊 Features (25 Features)

Feature Descripción
🔗 Pipeline Orchestration Crear pipelines con funciones y clases como pasos
🌳 Conditional Branches Ejecutar diferentes rutas basadas en condiciones de datos
🔄 Retry Logic Reintentos automáticos con estrategias configurables
🌐 API Integration Conectar a APIs externas, registrar workers
💾 SQLite Storage Persistir resultados de ejecución a base de datos
⚠️ Error Handling Excepciones personalizadas y códigos de error detallados
📋 YAML Configuration Cargar y gestionar configuraciones
🔀 Nested Pipelines Componer flujos de trabajo complejos
📊 Progress Tracking Salida rica en terminal
🧪 Type Hints Anotaciones de tipo completas
🔒 Memory Control Utilidades integradas de memoria
🧩 Composable Componentes reutilizables de pipeline
⚡ Parallel Execution Ejecutar pasos en paralelo (I/O o CPU bound)
📂 Pipeline Composition Usar pipelines como pasos de otros pipelines
🎯 Step Decorators Definir pasos en línea con @step decorator
💾 Checkpointing Guardar y resumir desde checkpoints
⏱️ Timeouts Prevenir tareas colgadas con soporte de timeout
📈 Resource Monitoring Rastrear RAM y CPU durante ejecución
📤 Export Exportar logs, métricas y estadísticas a JSON/CSV
🎪 Events & Hooks Eventos pre/post ejecución y hooks personalizados
📉 Alerts Alertas configurables basadas en métricas
🔐 Type Validation Validación de esquemas con PipelineContext
🔄 Async Pipeline Soporte completo para pipelines asíncronos
🏗️ DAG Scheduling Programación basada en grafos acíclicos dirigida
🌐 Dashboard Web Dashboard visual en tiempo real
Background Tasks Ejecutar tareas sin bloquear el pipeline (fire & forget)

🚀 Instalación

pip install wpipe

📖 Guía Completa

1. Conceptos Fundamentales

WPipe se basa en 4 pilares que puedes combinar libremente:

from wpipe import Pipeline, step, Condition, For, Parallel
from wpipe.pipe.components.logic_blocks import Background
Pilar Uso
step Decorador para definir funciones como pasos del pipeline
Pipeline Contenedor principal que orquesta la ejecución
Condition Ramificación condicional basada en expresiones
For Bucles con validación de parada
Parallel Ejecución paralela de múltiples pasos
Background Tareas en background sin bloquear el pipeline

2. Tu Primer Pipeline

from wpipe import Pipeline, step

@step(name="saludar")
def saludar(name):
    return {"mensaje": f"Hola, {name}!"}

pipeline = Pipeline(pipeline_name="miPrimero")
pipeline.set_steps([saludar])

result = pipeline.run({"name": "Mundo"})
# {'mensaje': 'Hola, Mundo!'}

3. Pipeline con Validación de Tipos

from wpipe import Pipeline, step, PipelineContext

class Usuario(PipelineContext):
    nombre: str
    edad: int
    email: str

@step(name="validar_usuario")
def validar(usuario: Usuario):
    if usuario.edad < 18:
        return {"validado": False, "razon": "menor de edad"}
    return {"validado": True}

pipeline = Pipeline(pipeline_name="validacion")
pipeline.set_steps([validar])

result = pipeline.run({"nombre": "Ana", "edad": 25, "email": "ana@ejemplo.com"})
# {'validado': True}

4. Ramificaciones Condicionales

from wpipe import Pipeline, step, Condition

@step(name="procesar")
def procesar(data):
    return {"resultado": "procesado"}

@step(name="alerta")
def alertar(data):
    return {"alerta": "¡Datos críticos!"}

pipeline = Pipeline(pipeline_name="condicional")
pipeline.set_steps([
    Condition(
        expression="valor > 100",
        branch_true=[procesar],
        branch_false=[alertar]
    )
])

pipeline.run({"valor": 50})  # Ejecuta alertar
pipeline.run({"valor": 150})  # Ejecuta procesar

5. Ejecución Paralela

from wpipe import Pipeline, step, Parallel

@step(name="tarea_a")
def tarea_a(data):
    return {"a": "listo"}

@step(name="tarea_b")
def tarea_b(data):
    return {"b": "listo"}

@step(name="tarea_c")
def tarea_c(data):
    return {"c": "listo"}

pipeline = Pipeline(pipeline_name="paralelo")
pipeline.set_steps([
    Parallel(
        steps=[tarea_a, tarea_b, tarea_c],
        max_workers=3
    )
])

result = pipeline.run({})
# Las 3 tareas se ejecutan simultáneamente

6. Background Tasks (Fire & Forget)

from wpipe import Pipeline, step
from wpipe.pipe.components.logic_blocks import Background

@step(name="tarea_principal")
def tarea_principal(data):
    print("Ejecutando tarea principal...")
    return {"status": "completado"}

@step(name="tarea_lenta")
def tarea_lenta(data):
    import time
    print("Enviando telemetría...")
    time.sleep(2)  # Simula operación lenta
    print("¡Telemetría enviada!")

pipeline = Pipeline(pipeline_name="con_background")
pipeline.set_steps([
    tarea_principal,
    Background(tarea_lenta),  # No bloquea el pipeline
])

result = pipeline.run({})
# El pipeline NO espera 2 segundos, continúa inmediatamente
# La tarea lenta se ejecuta en background (daemon thread)

7. Checkpoints (Resiliencia)

from wpipe import Pipeline, step, CheckpointManager

pipeline = Pipeline(pipeline_name="resiliente")

# Definir checkpoint basado en expresión lógica
pipeline.add_checkpoint(
    checkpoint_name="datos_listos",
    expression="temperatura > 0"
)

@step(name="procesar")
def procesar(data):
    return {"status": "completado"}

pipeline.set_steps([procesar])

# Si el sistema cae, WPipe reanuda automáticamente
chk = CheckpointManager("mi_db.db")
if chk.can_resume("resiliente"):
    pipeline.resume()
else:
    pipeline.run({"temperatura": 25})

7. Reintentos Automáticos

from wpipe import Pipeline, step

@step(name="conexion_api", retry_count=3, retry_delay=1)
def conexion_api(data):
    # Simulamos posible fallo
    if not data.get("disponible"):
        raise ConnectionError("API no disponible")
    return {"conectado": True}

pipeline = Pipeline(pipeline_name="retry")
pipeline.set_steps([conexion_api])
pipeline.run({"disponible": False})  # Reintenta 3 veces antes de fallar

8. Timeouts

from wpipe import Pipeline, step, timeout_sync

@timeout_sync(seconds=5)
@step(name="tarea_lenta")
def tarea_lenta(data):
    import time
    time.sleep(10)  # Simula tarea lenta
    return {"status": "ok"}

pipeline = Pipeline(pipeline_name="timeout")
pipeline.set_steps([tarea_lenta])
# Si tarda más de 5 segundos, lanza TimeoutError

9. Pipeline Asíncrono

import asyncio
from wpipe import PipelineAsync, step

@step(name="async_task")
async def async_task(data):
    await asyncio.sleep(1)
    return {"result": "async done"}

async def main():
    pipeline = PipelineAsync(pipeline_name="async_demo")
    pipeline.set_steps([async_task])
    result = await pipeline.run({"data": "test"})
    return result

asyncio.run(main())

10. Pipelines Anidados

from wpipe import Pipeline, step

# Pipeline hijo
sub_pipeline = Pipeline(pipeline_name="hijo")
sub_pipeline.set_steps([step_a, step_b])

# Pipeline padre que usa el hijo
parent_pipeline = Pipeline(pipeline_name="padre")
parent_pipeline.set_steps([
    paso_inicial,
    sub_pipeline,  # ¡Se ejecuta como un paso más!
    paso_final
])

11. Exportar Resultados

from wpipe import PipelineExporter

exporter = PipelineExporter("tracking.db")

# Exportar a JSON
json_data = exporter.export_pipeline_logs(format="json")

# Exportar a CSV
csv_data = exporter.export_pipeline_logs(format="csv")

# Exportar estadísticas
stats = exporter.export_statistics(format="json")

12. Dashboard Web

from wpipe import start_dashboard

# Inicia el dashboard en http://localhost:5000
start_dashboard(db_path="tracking.db", port=5000)

🎯 Uso Avanzado: El Viaje Resiliente

Este ejemplo combina todas las características de WPipe:

from wpipe import (
    Pipeline, For, Condition, Parallel, step, to_obj,
    PipelineContext, CheckpointManager, Metric, Severity
)

# 1. Definimos el contrato de datos
class MiContexto(PipelineContext):
    motor: str
    temperatura: float
    nivel_gasolina: str

# 2. Creamos pasos con validación automática
@step(name="VerificarMotor", retry_count=3)
@to_obj(MiContexto)
def verificar_motor(ctx: MiContexto):
    print(f"Chequeando motor: {ctx.motor}")
    return {"temperatura": 85.5}

@step(name="CargarCombustible")
def cargar_combustible(data):
    return {"nivel_gasolina": "completo"}

@step(name="Conducir")
def conducir(data):
    return {"distancia": 100}

# 3. Orquestación de Alto Nivel
viaje = Pipeline(pipeline_name="ViajeLTS", verbose=True)

# Añadimos checkpoint inteligente
viaje.add_checkpoint(
    checkpoint_name="arranque",
    expression="temperatura > 0"
)

# Añadimos alertas
viaje.tracker.add_alert_threshold(
    metric=Metric.PIPELINE_DURATION,
    expression=">5000",
    severity=Severity.WARNING,
    steps=[lambda d: print("⚠ Pipeline lento!")]
)

# 4. Configuramos los pasos
viaje.set_steps([
    verificar_motor,
    Parallel(
        steps=[cargar_combustible, revisar_neumaticos],
        max_workers=2
    ),
    For(
        iterations=10,
        validation_expression="nivel_gasolina != 'vacío'",
        steps=[conducir]
    )
])

# 5. Ejecutamos
results = viaje.run({"motor": "V8", "temperatura": 20})

📊 Observabilidad Completa

WPipe no solo ejecuta, entiende tu proceso:

# Análisis de rendimiento
analysis = pipeline.tracker.analysis
stats = analysis.get_stats()

# Estadísticas globales
print(f"Total ejecuciones: {stats['total_pipelines']}")
print(f"Tasa de éxito: {stats['success_rate']}%")
print(f"Duración media: {stats['avg_duration_ms']}ms")

# Detectar cuellos de botella
slow_steps = analysis.get_top_slow_steps(limit=5)
for step in slow_steps:
    print(f"{step['step_name']}: {step['avg_duration_ms']}ms")

# Exportar a JSON/CSV para auditorías
exporter = PipelineExporter("tracking.db")
exporter.export_pipeline_logs(format="json", output_path="reporte.json")

📋 API Reference (Resumen)

Clase/Función Descripción
Pipeline Pipeline síncrono principal
PipelineAsync Pipeline asíncrono
@step(name, version, retry_count, ...) Decorador para definir pasos
Condition(expression, branch_true, branch_false) Ramificación condicional
For(iterations, validation_expression, steps) Bucle con validación
Parallel(steps, max_workers, use_processes) Ejecución paralela
CheckpointManager Gestor de checkpoints
PipelineExporter Exportador de logs/métricas
start_dashboard(port) Dashboard web
ResourceMonitor Monitor de RAM/CPU
PipelineContext TypedDict para validación de tipos

🛡️ Calidad y Soporte

Aspecto Detalle
LTS WPipe v2.1+ cuenta con soporte a largo plazo
Test Coverage 95%+ pruebas en entornos síncronos y asíncronos
Arquitectura Unificación bajo wsqlite, sin SQL crudo en el núcleo
Python Compatible con Python 3.9+

📢 Marketing & Community

Hemos expandido nuestra presencia con +40 nuevos activos de marketing diseñados para educar y ayudar a los desarrolladores a elegir la mejor arquitectura de orquestación.

  • Dev.to: Guías técnicas profundas sobre Green-IT y ahorro de RAM.
  • DZone: Análisis arquitectónicos sobre resiliencia industrial y estados persistentes.
  • Reddit: Historias reales y debates técnicos en comunidades de Python y DevOps.
  • Indie Hackers: Estrategias para escalar startups con infraestructura mínima y bajo coste.

Puedes encontrar todos estos materiales en la carpeta posters/, numerados del 79 al 121, cubriendo comparativas con Airflow, n8n, Zapier y más.


DASHBOARD

Tutorial

image

Principal menus

Timeline:

image

Analitics:

image

Alerts:

image

Events:

image

states:

image

Pipelines:

image

Graph pipeline

image

states data transaction

image

🛠️ Extensiones para Editores

Visual Studio Code

WPipe cuenta con una extensión oficial para mejorar la experiencia de desarrollo:

  • Snippets: Autocompletado para @step, Pipeline, Parallel y más.
  • Validación YAML: Soporte para esquemas de configuración de pipelines.
  • Comandos: Acceso rápido a herramientas de WPipe.

Puedes encontrar la extensión y las instrucciones de instalación en la carpeta editors/vscode/.


📄 Licencia

MIT License - Libre para usar, modificar y distribuir.


🏢 ¿Usas WPipe? (Opcional)

¡Nos encantaría saberlo! Si usas WPipe en producción, nos motiva mucho saberlo.

** badge opcional:**

[![Built with WPipe](https://img.shields.io/badge/Built%20with-WPipe-blue)](https://github.com/wisrovi/wpipe)

📧 Contáctanos: wisrovi.rodriguez@gmail.com

Consulta USERS.md para ver la lista completa de usuarios reconocidos.


Diseñado con ❤️ por William Rodriguez (wisrovi) para ingenieros que no aceptan menos que la excelencia.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wpipe-2.3.5.tar.gz (135.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wpipe-2.3.5-py3-none-any.whl (131.6 kB view details)

Uploaded Python 3

File details

Details for the file wpipe-2.3.5.tar.gz.

File metadata

  • Download URL: wpipe-2.3.5.tar.gz
  • Upload date:
  • Size: 135.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wpipe-2.3.5.tar.gz
Algorithm Hash digest
SHA256 13b913d19e7edb3202a6e1407536b7d23994afb5563c4ec39c118da1285e7716
MD5 9bc04d9fb56442ba4581a1222784da40
BLAKE2b-256 189db99bf04e14e7467ea6413919b7afa7e9488d3bbcc01d2c9077a8b2387fd3

See more details on using hashes here.

File details

Details for the file wpipe-2.3.5-py3-none-any.whl.

File metadata

  • Download URL: wpipe-2.3.5-py3-none-any.whl
  • Upload date:
  • Size: 131.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for wpipe-2.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 15acad221f1f37027f4db9bf927d7a35deab733cf5ff97b7da6f9c1e1a906e22
MD5 6f8e7b9ba4f8eaa858d4a23808524c8b
BLAKE2b-256 5ef07d69a6805f8d8826ac62cbf69fd710e22054d50ae5d0fa44e1a36a27bc0d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page