Skip to main content

Librería minimalista para operaciones con InfluxDB y programación de tareas

Project description

🛠️ ctrutils

ctrutils es una librería minimalista de utilidades en Python enfocada en operaciones con InfluxDB, programación de tareas robusta (tipo Airflow), y logging centralizado.

Python Version License

📦 Módulos

⏰ Scheduler

Programación robusta de tareas tipo Airflow (mejorado en v11.0.0)

  • ✅ Ejecución continua que nunca termina (modo daemon)
  • ✅ Dependencias entre tareas (DAGs - pipelines secuenciales)
  • ✅ Reintentos automáticos con backoff exponencial
  • ✅ Callbacks y hooks (on_success, on_failure, on_retry)
  • ✅ Ejecución condicional
  • ✅ Métricas detalladas
  • ✅ Expresiones crontab completas
  • ✅ Timeouts por tarea
  • ✅ Graceful shutdown

📖 Ver documentación completa del Scheduler

🗄️ InfluxDB Operations

Operaciones avanzadas con InfluxDB

  • Validación automática de datos (NaN, infinitos, None)
  • Escritura paralela para grandes volúmenes
  • Downsampling y continuous queries
  • Backup y restore de measurements
  • Métricas de calidad de datos
  • Métodos administrativos completos

📝 Handler (Logging & Notifications)

Sistema de logging y notificaciones centralizado

  • Logging: Consola, archivos con rotación
  • Grafana Loki: Logs centralizados y escalables
  • Telegram: Notificaciones en tiempo real
  • Integración completa con Scheduler e InfluxDB

🚀 Instalación

pip install ctrutils

# Para usar Loki y Telegram (opcional):
pip install requests

💡 Uso Rápido

Scheduler - Nunca Termina

from ctrutils.scheduler import Scheduler, Task

# Crear scheduler
scheduler = Scheduler(timezone="Europe/Madrid")

# Añadir tarea que se ejecuta cada 5 minutos
scheduler.add_job(
    func=mi_funcion,
    trigger="cron",
    job_id="tarea",
    trigger_args={"minute": "*/5"},
    max_retries=3,
)

# Iniciar (NUNCA termina hasta Ctrl+C)
scheduler.start(blocking=True)

Scheduler - Pipeline ETL con Dependencias

from ctrutils.scheduler import Scheduler, Task

scheduler = Scheduler()

# Extract
extract = Task(
    task_id="extract",
    func=extract_data,
    trigger_type="cron",
    trigger_args={"minute": "*/15"},
    max_retries=3,
)

# Transform (depende de Extract)
transform = Task(
    task_id="transform",
    func=transform_data,
    trigger_type="cron",
    trigger_args={"minute": "*/15"},
    dependencies=["extract"],  # Solo ejecuta si extract OK
    max_retries=3,
)

# Load (depende de Transform)
load = Task(
    task_id="load",
    func=load_data,
    trigger_type="cron",
    trigger_args={"minute": "*/15"},
    dependencies=["transform"],  # Solo ejecuta si transform OK
    on_failure=lambda e: alert_team(e),
)

scheduler.add_task(extract)
scheduler.add_task(transform)
scheduler.add_task(load)

# Nunca termina
scheduler.start(blocking=True)

InfluxDB

from ctrutils import InfluxdbOperation

influx = InfluxdbOperation(host='localhost', port=8086)

# Escribir DataFrame con validación automática
stats = influx.write_dataframe(
    measurement='datos',
    data=df,
    validate_data=True,  # Limpia NaN automáticamente
)

print(f"Escritos: {stats['successful_points']}")

Logging

from ctrutils.handler import LoggingHandler

# Crear logger con rotación
logger = LoggingHandler(
    logger_name="mi_app",
    level=logging.INFO,
)

# Añadir handlers
logger.add_handlers([
    logger.create_stream_handler(),
    logger.create_rotating_file_handler(
        filename="app.log",
        max_bytes=10*1024*1024,  # 10MB
        backup_count=5,
    )
])

📁 Estructura del Proyecto

ctrutils/
├── .coveragerc              # Coverage configuration
├── .isort.cfg               # Import sorting
├── .pylintrc                # Linting rules
├── mypy.ini                 # Type checking
├── pytest.ini               # Testing configuration
├── ctrutils/                # Código fuente principal
│   ├── database/           # Módulos de base de datos
│   │   └── influxdb/       # InfluxDB operations
│   ├── handler/            # Logging y notificaciones
│   │   ├── logging/        # Handlers de logging
│   │   └── notification/   # Loki, Telegram
│   └── scheduler/          # Scheduler robusto
├── docs/                    # Documentación
│   ├── scheduler/          # Docs específicas del scheduler
│   ├── PROJECT_STRUCTURE.md # Arquitectura del proyecto
│   ├── QUICK_START.md      # Guía rápida
│   └── TEST_SUMMARY.md     # Documentación de tests
├── examples/                # Ejemplos de uso
│   ├── scheduler_simple.py
│   └── scheduler_advanced_demo.py
├── tests/                   # Suite de tests
│   ├── unit/               # Tests unitarios
│   └── integration/        # Tests de integración
├── makefiles/              # Makefiles modulares
├── CHANGELOG.md            # Historial de cambios
├── README.md               # Este archivo
└── pyproject.toml          # Configuración del proyecto

🧪 Testing

El proyecto incluye una suite completa de tests:

# Ejecutar todos los tests
make test

# Solo tests unitarios (rápido, sin dependencias)
make test-unit

# Tests de integración (requiere InfluxDB)
make test-integration

# Tests con coverage
make test-coverage

# Ver todos los comandos disponibles
make help

📚 Documentación

🔧 Desarrollo

# Instalar dependencias de desarrollo
poetry install

# Ejecutar linters
make lint

# Formatear código
make format

# Verificar tipos
make type-check

# Verificación completa antes de commit
make check

📋 Requisitos

  • Python 3.8+
  • APScheduler 3.10+
  • InfluxDB-Python 5.3+
  • Pandas (para operaciones con DataFrames)

🤝 Contribución

Ver CONTRIBUTING.md para guías de contribución.

📄 Licencia

Este proyecto está licenciado bajo la Licencia MIT - ver el archivo LICENSE para más detalles.

🔗 Enlaces

📝 Changelog

Ver CHANGELOG.md para el historial completo de cambios.

⭐ Características Destacadas v11.0.0

Scheduler Mejorado

El scheduler ha sido completamente refactorizado para ser una solución production-ready:

  • Nunca termina: Modo daemon con blocking=True
  • DAGs completos: Dependencias entre tareas tipo Airflow
  • Recuperación automática: Reintentos con backoff exponencial
  • Monitoreo: Métricas detalladas de ejecución
  • Robusto: Thread-safe, graceful shutdown, timeouts

Ver mejoras completas


Desarrollado por: Cristian Tacoronte Rivero
Versión: 11.0.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ctrutils-11.0.0.tar.gz (40.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ctrutils-11.0.0-py3-none-any.whl (45.0 kB view details)

Uploaded Python 3

File details

Details for the file ctrutils-11.0.0.tar.gz.

File metadata

  • Download URL: ctrutils-11.0.0.tar.gz
  • Upload date:
  • Size: 40.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.10.12 Linux/6.6.87.2-microsoft-standard-WSL2

File hashes

Hashes for ctrutils-11.0.0.tar.gz
Algorithm Hash digest
SHA256 c4d5863ee8aaf7baf22eb2e7ac02342b66988bb74ecd3d094bc8667d6f07d6e1
MD5 23e0259690299cfff4930b94f45a3c26
BLAKE2b-256 9e4f36b593b1b8f3d208fe3af4c9b2db75bd0942dd1fbc0d053be152d9e9ba82

See more details on using hashes here.

File details

Details for the file ctrutils-11.0.0-py3-none-any.whl.

File metadata

  • Download URL: ctrutils-11.0.0-py3-none-any.whl
  • Upload date:
  • Size: 45.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.10.12 Linux/6.6.87.2-microsoft-standard-WSL2

File hashes

Hashes for ctrutils-11.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 aaa6eb06862c8e3bafb3e5a2116c63adc593f5980cfba5f225fe8ce84dc4904f
MD5 3bdd0e4dd99acb021920e8dfba666483
BLAKE2b-256 4852bfaf92bff69ae46c60fb663abc7a0b0d9d6d1e17d527492d5102b92518f6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page