Skip to main content

Pipeline con API

Project description

Project Title

A brief description of what this project does and who it's for

Introduction

Breve descripción de la clase y su propósito.

Features

Resumen de las funcionalidades clave de CustomKafka.

Requirements

Herramientas y bibliotecas necesarias para usar CustomKafka (Python, Kafka, librerías adicionales).

Installation

Instrucciones sobre cómo instalar las dependencias necesarias y preparar el entorno.

Install my-project with npm

  npm install my-project
  cd my-project

How begin

Guía rápida para configurar y usar CustomKafka por primera vez.

Class structure

Descripción de la arquitectura de la clase, sus métodos y atributos principales.

Consumer use

  • Cómo registrar funciones como consumidores con el decorador @consumer.

  • Ejemplos de configuración de consumidores (filtros de clave, tipos de datos, etc.).

Producer use

  • Cómo enviar mensajes usando CustomKafka como productor.
  • Ejemplos de envío de diferentes tipos de datos (JSON, archivos, imágenes).

Manage errors

Estrategias de manejo de errores para consumidores y productores.

Settings avanced

Personalización de consumidores y productores con configuraciones adicionales.

Paraller consumers executing

Cómo iniciar múltiples consumidores de forma asíncrona.

Examples

Casos de uso detallados con código de ejemplo.

1. Procesamiento de Mensajes JSON

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="basic")

@kafka_client.consumer(topic="json_topic", value_convert_to="json")
def process_json(data):
    print("Processing JSON data:", data.value)

kafka_client.run_consumers()

Este ejemplo muestra cómo consumir mensajes en formato JSON y procesarlos. Es útil para manejar datos estructurados en aplicaciones de análisis.

2. Filtrado de Mensajes por Clave

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="key_filter")

@kafka_client.consumer(topic="filtered_topic", key_filter="important_key", value_convert_to="json")
def process_filtered(data):
    print(f"Received filtered message with key {data.key}: {data.value}")

kafka_client.run_consumers()

Se filtran los mensajes por una clave específica (key_filter). Solo los mensajes que coincidan serán procesados.

3. Recepción y Visualización de Imágenes

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="image_show")

@kafka_client.consumer(topic="image_topic", value_convert_to="image")
def display_image(data):
    cv2.imshow("Received Image", data.value)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

kafka_client.run_consumers()

Consume imágenes desde Kafka y las muestra usando OpenCV, ideal para aplicaciones de visión por computadora.

4. Almacenamiento de Mensajes en un Archivo

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="save_msgs_in_file")

@kafka_client.consumer(topic="log_topic", value_convert_to="json")
def save_to_file(data):
    with open("logs.txt", "a") as file:
        file.write(f"{data.value}\n")

kafka_client.run_consumers()

Guarda mensajes recibidos en un archivo de texto, útil para mantener registros o auditorías.

5. Streaming de Video

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="video_show")

@kafka_client.consumer(topic="video_topic", value_convert_to="image")
def stream_video(data):
    # Process each video frame as an image
    cv2.imshow("Video Stream", data.value)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        return

kafka_client.run_consumers()

Recibe y muestra un flujo de video en tiempo real, tratando cada mensaje como un fotograma de video.

6. Envío de Alertas

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="alerts_received")

@kafka_client.consumer(topic="sensor_topic", value_convert_to="json")
def alert_on_high_temp(data):
    if data.value.get("temperature") > 30:
        print("High temperature alert:", data.value)

kafka_client.run_consumers()

Procesa mensajes de sensores y genera alertas si la temperatura excede un umbral, útil en monitoreo de sistemas.

7. Integración con Sistemas de Monitoreo

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="basic_data_received")

@kafka_client.consumer(topic="metrics_topic", value_convert_to="json")
def process_metrics(data):
    # Forward metrics to a monitoring system
    print("Metrics:", data.value)

kafka_client.run_consumers()

Consume métricas y las envía a sistemas de monitoreo para la visualización en dashboards.

8. Procesamiento de Logs

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="varios")

@kafka_client.consumer(topic="service_logs", value_convert_to="json")
def process_logs(data):
    print("Service Log:", data.value)

kafka_client.run_consumers()

Recibe y procesa logs de servicios, ideal para auditoría y depuración.

  • Nota: otros usos pueden ser:
    • Ejecución de Microservicios Asíncronos
    • Procesamiento de Datos Financieros
    • Recolección de Métricas de Aplicaciones
    • Gestión de Transacciones Bancarias
    • Sincronización de Configuraciones
    • Control de Dispositivos Remotos
    • Envío de Datos a Sistemas de Machine Learning
    • Implementación de Workflows Distribuidos
    • Recolección de Métricas de Aplicaciones

9. Carga de Datos en Tiempo Real a Bases de Datos

import sqlite3
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="insert_in_DB")

conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY, data TEXT)''')

@kafka_client.consumer(topic="db_topic", value_convert_to="json")
def save_to_db(data):
    cursor.execute("INSERT INTO metrics (data) VALUES (?)", (str(data.value),))
    conn.commit()

kafka_client.run_consumers()

Inserta los datos recibidos en una base de datos en tiempo real para almacenamiento y análisis.

10. Simulación de Sensores IoT

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="send_IoT_data")

with kafka_client.producer() as producer:
    for i in range(10):
        producer.send(
            topic="sensor_topic",
            value={"sensor_id": i, "temperature": random.randint(20, 40)},
            value_type="json",
            verbose=True
        )

Simula datos de sensores IoT y los envía a Kafka, útil para pruebas antes del despliegue real.

11. Envío de Archivos a Través de Kafka

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="file_send")

with kafka_client.producer() as producer:
    producer.send(
        topic="file_topic",
        value="path/to/file.txt",  # Ruta del archivo
        value_type="file",
        verbose=True
    )

Envía un archivo a través de Kafka para ser procesado por otro sistema, como análisis de logs.

12. Procesamiento de Datos Financieros

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="financial_predict")

@kafka_client.consumer(topic="finance_topic", value_convert_to="json")
def analyze_financial_data(data):
    print("Processing financial data:", data.value)

kafka_client.run_consumers()

Consume datos financieros para su análisis en tiempo real, como la detección de anomalías.

13. Distribución de Modelos de IA

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="update_models_in_production")

with kafka_client.producer() as producer:
    with open("model.pt", "rb") as model_file:
        producer.send(
            topic="model_distribution",
            value=model_file.read(),
            value_type="file",
            verbose=True
        )

Envía un modelo de IA entrenado para su despliegue en nodos de inferencia distribuidos.

14. Preprocesamiento de Datos en Tiempo Real para Modelos ML

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="preprocess")

@kafka_client.consumer(topic="raw_data", value_convert_to="json")
def preprocess_data(data):
    # Preprocess data (e.g., normalization, feature extraction)
    processed_data = {
        "feature1": data.value["sensor_value"] / 100,
        "feature2": len(data.value["text"])
    }
    print("Preprocessed Data:", processed_data)

kafka_client.run_consumers()

Este ejemplo muestra cómo preprocesar datos en tiempo real antes de enviarlos a un modelo de ML, realizando tareas como normalización y extracción de características.

15. Inferencia en Tiempo Real con Modelos de IA

from sklearn.externals import joblib
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="features_inference")

# Cargar el modelo previamente entrenado
model = joblib.load("model.pkl")

@kafka_client.consumer(topic="inference_data", value_convert_to="json")
def run_inference(data):
    # Realizar la predicción con el modelo cargado
    prediction = model.predict([data.value["features"]])
    print(f"Prediction: {prediction}")

kafka_client.run_consumers()

Consume datos de inferencia y utiliza un modelo de IA previamente entrenado para predecir resultados en tiempo real, útil en sistemas de recomendaciones o detección de anomalías.

16. Inferencia en Tiempo Real con Modelos de IA Usando Imágenes

pip install ultralytics
from ultralytics import YOLO
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="image_inference")

# Cargar el modelo YOLOv5 preentrenado
model = YOLO('yolov8s.pt') 

@kafka_client.consumer(topic="image_topic", value_convert_to="image")
def image_inference(data):
    image = data.value

    results = model(image)

    predictions = results.pandas().xyxy[0]  # DataFrame con las predicciones

    annotated_frame = results[0].plot()

    # Muestra la imagen con las detecciones
    cv2.imshow("Image Inference", annotated_frame)
    cv2.waitKey(1)  # Refresca la ventana de visualización

# Inicia el consumidor de Kafka
kafka_client.run_consumers()

Este ejemplo muestra cómo consumir mensajes en formato JSON y procesarlos. Es útil para manejar datos estructurados en aplicaciones de análisis.

17. Monitoreo de Rendimiento de Modelos en Producción

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="metrics_monitor")

@kafka_client.consumer(topic="model_metrics", value_convert_to="json")
def monitor_model_performance(data):
    # Monitoriza las métricas de rendimiento del modelo
    print(f"Model accuracy: {data.value['accuracy']} at {data.value['timestamp']}")

kafka_client.run_consumers()

Permite monitorear el rendimiento del modelo en producción, recibiendo métricas como precisión o tiempos de inferencia, lo cual es crucial para mantener la calidad del modelo.

18. Entrenamiento de Modelos de IA Distribuidos

pip install ultralytics
from ultralytics import YOLO
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="model_train")

model = YOLO('yolov8s.pt')  # old_model

@kafka_client.consumer(topic="training_data", value_convert_to="json")
def distributed_training(data):

    data_yaml_path = data.value['data_yaml_path']  # path (string)
    dataset_hiperparemeters = data.value['dataset_hiperparemeters'] # json

    results = model.train(data_yaml_path, **dataset_hiperparemeters)

    # Agregar datos al set de entrenamiento
    print("Training with data:", data.value)

kafka_client.run_consumers()

Facilita el entrenamiento de modelos de manera distribuida recibiendo datos de diferentes fuentes para actualizar el modelo continuamente.

19. Generación de Características en Tiempo Real

from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="RT_features")

@kafka_client.consumer(topic="feature_engineering", value_convert_to="json")
def generate_features(data):
    # Generar nuevas características a partir de los datos entrantes
    features = {
        "feature1": data.value["value1"] ** 2,
        "feature2": data.value["value2"] * 0.5
    }
    print("Generated features:", features)

kafka_client.run_consumers()

Este ejemplo genera nuevas características de los datos entrantes en tiempo real, útil para pipelines de datos en modelos de ML.

20. Análisis de Sentimientos en Tiempo Real

from textblob import TextBlob
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")

@kafka_client.consumer(topic="tweets", value_convert_to="json")
def sentiment_analysis(data):
    # Realizar análisis de sentimiento sobre texto recibido
    sentiment = TextBlob(data.value["text"]).sentiment
    print(f"Sentiment Analysis: {sentiment.polarity}, {sentiment.subjectivity}")

kafka_client.run_consumers()

Realiza un análisis de sentimientos sobre textos recibidos en tiempo real, útil para monitorear redes sociales o feedback de usuarios.

21. Despliegue de Modelos de NLP (Procesamiento de Lenguaje Natural)

from transformers import pipeline
from custom_kafka import CustomKafka

kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")

# Cargar un modelo de procesamiento de texto
nlp_model = pipeline("text-classification")

@kafka_client.consumer(topic="nlp_topic", value_convert_to="json")
def nlp_processing(data):
    # Procesar texto con el modelo NLP
    result = nlp_model(data.value["text"])
    print(f"NLP Result: {result}")

kafka_client.run_consumers()

Este ejemplo despliega un modelo de NLP para procesar texto en tiempo real, útil en clasificación de correos, chatbots o análisis de contenido.

22. Orquestación de Modelos en Inferencias Complejas

from custom_kafka import CustomKafka



model_1 = ...
model_2 = ...
model_3 = ...


kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")

@kafka_client.consumer(topic="complex_inference", value_convert_to="json")
def orchestrate_models(data):
    # Orquestar múltiples modelos para una inferencia compleja
    step1_result = model_1.predict([data.value["step1_input"]])
    step2_result = model_2.predict([step1_result])
    final_result = model_3.predict([step2_result])
    print(f"Final Inference Result: {final_result}")
    

kafka_client.run_consumers()

Orquesta una secuencia de modelos en un pipeline de inferencia complejo, combinando resultados de diferentes modelos para obtener una respuesta final.

23. Captura datos, Reentrenamiento e Inferencia del Modelo

from sklearn.ensemble import RandomForestClassifier
import pickle
import json
from custom_kafka import CustomKafka



class ModelUpdater(CustomKafka):
    def __init__(self, server: str, name: str, model_path: str):
        super().__init__(server, name)
        self.model_path = model_path
        self.new_data = []

    @CustomKafka.consumer(topic="data_topic", value_convert_to="json")
    def capture_data(self, data: dict):
        self.new_data.append(data)
        if len(self.new_data) >= 100:  # Ajusta el tamaño del lote si es necesario
            self.retrain_model()

    def retrain_model(self):
        # Prepara datos para el entrenamiento
        X = [d['features'] for d in self.new_data]
        y = [d['label'] for d in self.new_data]

        # Entrena el modelo
        model = RandomForestClassifier()
        model.fit(X, y)

        # Guarda el modelo
        with open(self.model_path, 'wb') as f:
            pickle.dump(model, f)

        self.new_data = []


class ModelInferencer(CustomKafka):
    def __init__(self, server: str, name: str, model_path: str):
        super().__init__(server, name)
        self.model_path = model_path
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)
        self.producer = KafkaProducer(
            bootstrap_servers=server,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    @CustomKafka.consumer(topic="data_topic", value_convert_to="json")
    def predict_and_send(self, data: dict):
        features = [data['features']]
        prediction = self.model.predict(features)[0]
        result = {'id': data['id'], 'prediction': prediction}

        with self.producer() as producer:
            producer.send(topic='prediction_topic', value=result, value_type="json")


model_update = ModelUpdater(server="localhost:9092", name="TextBlob", model_path="path/to/model.ptl")
model_update.run_consumers()

model_predict = ModelUpdater(server="localhost:9092", name="TextBlob", model_path="path/to/model.ptl")
model_predict.run_consumers()

Explicación

  • Captura y Reentrenamiento (ModelUpdater): Usa un consumidor Kafka para recibir datos desde data_topic. Los datos se acumulan y, cuando hay suficientes (100 en este caso), se reentrena el modelo y se guarda.

  • Inferencia (ModelInferencer): Usa un consumidor Kafka para recibir datos y cargar el modelo actualizado. Realiza predicciones con el modelo y envía los resultados al tópico prediction_topic.

  • Ejecución: Se crean instancias de ModelUpdater y ModelInferencer y se ejecutan para manejar datos y realizar inferencias.

Este enfoque simplificado permite la actualización y el uso de modelos en producción de manera eficiente y con un mínimo de código.

Good practics

Recomendaciones para el uso efectivo y seguro de CustomKafka.

Solución de Problemas

Preguntas frecuentes y cómo resolver errores comunes.

Contributing

Contributions are always welcome!

See contributing.md for ways to get started.

Please adhere to this project's code of conduct.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

wkafka-0.0.1-py3-none-any.whl (11.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page