Pipeline con API
Project description
Project Title
A brief description of what this project does and who it's for
Introduction
Breve descripción de la clase y su propósito.
Features
Resumen de las funcionalidades clave de CustomKafka.
Requirements
Herramientas y bibliotecas necesarias para usar CustomKafka (Python, Kafka, librerías adicionales).
Installation
Instrucciones sobre cómo instalar las dependencias necesarias y preparar el entorno.
Install my-project with npm
npm install my-project
cd my-project
How begin
Guía rápida para configurar y usar CustomKafka por primera vez.
Class structure
Descripción de la arquitectura de la clase, sus métodos y atributos principales.
Consumer use
-
Cómo registrar funciones como consumidores con el decorador @consumer.
-
Ejemplos de configuración de consumidores (filtros de clave, tipos de datos, etc.).
Producer use
- Cómo enviar mensajes usando CustomKafka como productor.
- Ejemplos de envío de diferentes tipos de datos (JSON, archivos, imágenes).
Manage errors
Estrategias de manejo de errores para consumidores y productores.
Settings avanced
Personalización de consumidores y productores con configuraciones adicionales.
Paraller consumers executing
Cómo iniciar múltiples consumidores de forma asíncrona.
Examples
Casos de uso detallados con código de ejemplo.
1. Procesamiento de Mensajes JSON
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="basic")
@kafka_client.consumer(topic="json_topic", value_convert_to="json")
def process_json(data):
print("Processing JSON data:", data.value)
kafka_client.run_consumers()
Este ejemplo muestra cómo consumir mensajes en formato JSON y procesarlos. Es útil para manejar datos estructurados en aplicaciones de análisis.
2. Filtrado de Mensajes por Clave
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="key_filter")
@kafka_client.consumer(topic="filtered_topic", key_filter="important_key", value_convert_to="json")
def process_filtered(data):
print(f"Received filtered message with key {data.key}: {data.value}")
kafka_client.run_consumers()
Se filtran los mensajes por una clave específica (key_filter). Solo los mensajes que coincidan serán procesados.
3. Recepción y Visualización de Imágenes
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="image_show")
@kafka_client.consumer(topic="image_topic", value_convert_to="image")
def display_image(data):
cv2.imshow("Received Image", data.value)
cv2.waitKey(0)
cv2.destroyAllWindows()
kafka_client.run_consumers()
Consume imágenes desde Kafka y las muestra usando OpenCV, ideal para aplicaciones de visión por computadora.
4. Almacenamiento de Mensajes en un Archivo
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="save_msgs_in_file")
@kafka_client.consumer(topic="log_topic", value_convert_to="json")
def save_to_file(data):
with open("logs.txt", "a") as file:
file.write(f"{data.value}\n")
kafka_client.run_consumers()
Guarda mensajes recibidos en un archivo de texto, útil para mantener registros o auditorías.
5. Streaming de Video
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="video_show")
@kafka_client.consumer(topic="video_topic", value_convert_to="image")
def stream_video(data):
# Process each video frame as an image
cv2.imshow("Video Stream", data.value)
if cv2.waitKey(1) & 0xFF == ord('q'):
return
kafka_client.run_consumers()
Recibe y muestra un flujo de video en tiempo real, tratando cada mensaje como un fotograma de video.
6. Envío de Alertas
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="alerts_received")
@kafka_client.consumer(topic="sensor_topic", value_convert_to="json")
def alert_on_high_temp(data):
if data.value.get("temperature") > 30:
print("High temperature alert:", data.value)
kafka_client.run_consumers()
Procesa mensajes de sensores y genera alertas si la temperatura excede un umbral, útil en monitoreo de sistemas.
7. Integración con Sistemas de Monitoreo
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="basic_data_received")
@kafka_client.consumer(topic="metrics_topic", value_convert_to="json")
def process_metrics(data):
# Forward metrics to a monitoring system
print("Metrics:", data.value)
kafka_client.run_consumers()
Consume métricas y las envía a sistemas de monitoreo para la visualización en dashboards.
8. Procesamiento de Logs
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="varios")
@kafka_client.consumer(topic="service_logs", value_convert_to="json")
def process_logs(data):
print("Service Log:", data.value)
kafka_client.run_consumers()
Recibe y procesa logs de servicios, ideal para auditoría y depuración.
- Nota: otros usos pueden ser:
- Ejecución de Microservicios Asíncronos
- Procesamiento de Datos Financieros
- Recolección de Métricas de Aplicaciones
- Gestión de Transacciones Bancarias
- Sincronización de Configuraciones
- Control de Dispositivos Remotos
- Envío de Datos a Sistemas de Machine Learning
- Implementación de Workflows Distribuidos
- Recolección de Métricas de Aplicaciones
9. Carga de Datos en Tiempo Real a Bases de Datos
import sqlite3
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="insert_in_DB")
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY, data TEXT)''')
@kafka_client.consumer(topic="db_topic", value_convert_to="json")
def save_to_db(data):
cursor.execute("INSERT INTO metrics (data) VALUES (?)", (str(data.value),))
conn.commit()
kafka_client.run_consumers()
Inserta los datos recibidos en una base de datos en tiempo real para almacenamiento y análisis.
10. Simulación de Sensores IoT
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="send_IoT_data")
with kafka_client.producer() as producer:
for i in range(10):
producer.send(
topic="sensor_topic",
value={"sensor_id": i, "temperature": random.randint(20, 40)},
value_type="json",
verbose=True
)
Simula datos de sensores IoT y los envía a Kafka, útil para pruebas antes del despliegue real.
11. Envío de Archivos a Través de Kafka
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="file_send")
with kafka_client.producer() as producer:
producer.send(
topic="file_topic",
value="path/to/file.txt", # Ruta del archivo
value_type="file",
verbose=True
)
Envía un archivo a través de Kafka para ser procesado por otro sistema, como análisis de logs.
12. Procesamiento de Datos Financieros
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="financial_predict")
@kafka_client.consumer(topic="finance_topic", value_convert_to="json")
def analyze_financial_data(data):
print("Processing financial data:", data.value)
kafka_client.run_consumers()
Consume datos financieros para su análisis en tiempo real, como la detección de anomalías.
13. Distribución de Modelos de IA
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="update_models_in_production")
with kafka_client.producer() as producer:
with open("model.pt", "rb") as model_file:
producer.send(
topic="model_distribution",
value=model_file.read(),
value_type="file",
verbose=True
)
Envía un modelo de IA entrenado para su despliegue en nodos de inferencia distribuidos.
14. Preprocesamiento de Datos en Tiempo Real para Modelos ML
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="preprocess")
@kafka_client.consumer(topic="raw_data", value_convert_to="json")
def preprocess_data(data):
# Preprocess data (e.g., normalization, feature extraction)
processed_data = {
"feature1": data.value["sensor_value"] / 100,
"feature2": len(data.value["text"])
}
print("Preprocessed Data:", processed_data)
kafka_client.run_consumers()
Este ejemplo muestra cómo preprocesar datos en tiempo real antes de enviarlos a un modelo de ML, realizando tareas como normalización y extracción de características.
15. Inferencia en Tiempo Real con Modelos de IA
from sklearn.externals import joblib
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="features_inference")
# Cargar el modelo previamente entrenado
model = joblib.load("model.pkl")
@kafka_client.consumer(topic="inference_data", value_convert_to="json")
def run_inference(data):
# Realizar la predicción con el modelo cargado
prediction = model.predict([data.value["features"]])
print(f"Prediction: {prediction}")
kafka_client.run_consumers()
Consume datos de inferencia y utiliza un modelo de IA previamente entrenado para predecir resultados en tiempo real, útil en sistemas de recomendaciones o detección de anomalías.
16. Inferencia en Tiempo Real con Modelos de IA Usando Imágenes
pip install ultralytics
from ultralytics import YOLO
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="image_inference")
# Cargar el modelo YOLOv5 preentrenado
model = YOLO('yolov8s.pt')
@kafka_client.consumer(topic="image_topic", value_convert_to="image")
def image_inference(data):
image = data.value
results = model(image)
predictions = results.pandas().xyxy[0] # DataFrame con las predicciones
annotated_frame = results[0].plot()
# Muestra la imagen con las detecciones
cv2.imshow("Image Inference", annotated_frame)
cv2.waitKey(1) # Refresca la ventana de visualización
# Inicia el consumidor de Kafka
kafka_client.run_consumers()
Este ejemplo muestra cómo consumir mensajes en formato JSON y procesarlos. Es útil para manejar datos estructurados en aplicaciones de análisis.
17. Monitoreo de Rendimiento de Modelos en Producción
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="metrics_monitor")
@kafka_client.consumer(topic="model_metrics", value_convert_to="json")
def monitor_model_performance(data):
# Monitoriza las métricas de rendimiento del modelo
print(f"Model accuracy: {data.value['accuracy']} at {data.value['timestamp']}")
kafka_client.run_consumers()
Permite monitorear el rendimiento del modelo en producción, recibiendo métricas como precisión o tiempos de inferencia, lo cual es crucial para mantener la calidad del modelo.
18. Entrenamiento de Modelos de IA Distribuidos
pip install ultralytics
from ultralytics import YOLO
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="model_train")
model = YOLO('yolov8s.pt') # old_model
@kafka_client.consumer(topic="training_data", value_convert_to="json")
def distributed_training(data):
data_yaml_path = data.value['data_yaml_path'] # path (string)
dataset_hiperparemeters = data.value['dataset_hiperparemeters'] # json
results = model.train(data_yaml_path, **dataset_hiperparemeters)
# Agregar datos al set de entrenamiento
print("Training with data:", data.value)
kafka_client.run_consumers()
Facilita el entrenamiento de modelos de manera distribuida recibiendo datos de diferentes fuentes para actualizar el modelo continuamente.
19. Generación de Características en Tiempo Real
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="RT_features")
@kafka_client.consumer(topic="feature_engineering", value_convert_to="json")
def generate_features(data):
# Generar nuevas características a partir de los datos entrantes
features = {
"feature1": data.value["value1"] ** 2,
"feature2": data.value["value2"] * 0.5
}
print("Generated features:", features)
kafka_client.run_consumers()
Este ejemplo genera nuevas características de los datos entrantes en tiempo real, útil para pipelines de datos en modelos de ML.
20. Análisis de Sentimientos en Tiempo Real
from textblob import TextBlob
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")
@kafka_client.consumer(topic="tweets", value_convert_to="json")
def sentiment_analysis(data):
# Realizar análisis de sentimiento sobre texto recibido
sentiment = TextBlob(data.value["text"]).sentiment
print(f"Sentiment Analysis: {sentiment.polarity}, {sentiment.subjectivity}")
kafka_client.run_consumers()
Realiza un análisis de sentimientos sobre textos recibidos en tiempo real, útil para monitorear redes sociales o feedback de usuarios.
21. Despliegue de Modelos de NLP (Procesamiento de Lenguaje Natural)
from transformers import pipeline
from custom_kafka import CustomKafka
kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")
# Cargar un modelo de procesamiento de texto
nlp_model = pipeline("text-classification")
@kafka_client.consumer(topic="nlp_topic", value_convert_to="json")
def nlp_processing(data):
# Procesar texto con el modelo NLP
result = nlp_model(data.value["text"])
print(f"NLP Result: {result}")
kafka_client.run_consumers()
Este ejemplo despliega un modelo de NLP para procesar texto en tiempo real, útil en clasificación de correos, chatbots o análisis de contenido.
22. Orquestación de Modelos en Inferencias Complejas
from custom_kafka import CustomKafka
model_1 = ...
model_2 = ...
model_3 = ...
kafka_client = CustomKafka(server="localhost:9092", name="TextBlob")
@kafka_client.consumer(topic="complex_inference", value_convert_to="json")
def orchestrate_models(data):
# Orquestar múltiples modelos para una inferencia compleja
step1_result = model_1.predict([data.value["step1_input"]])
step2_result = model_2.predict([step1_result])
final_result = model_3.predict([step2_result])
print(f"Final Inference Result: {final_result}")
kafka_client.run_consumers()
Orquesta una secuencia de modelos en un pipeline de inferencia complejo, combinando resultados de diferentes modelos para obtener una respuesta final.
23. Captura datos, Reentrenamiento e Inferencia del Modelo
from sklearn.ensemble import RandomForestClassifier
import pickle
import json
from custom_kafka import CustomKafka
class ModelUpdater(CustomKafka):
def __init__(self, server: str, name: str, model_path: str):
super().__init__(server, name)
self.model_path = model_path
self.new_data = []
@CustomKafka.consumer(topic="data_topic", value_convert_to="json")
def capture_data(self, data: dict):
self.new_data.append(data)
if len(self.new_data) >= 100: # Ajusta el tamaño del lote si es necesario
self.retrain_model()
def retrain_model(self):
# Prepara datos para el entrenamiento
X = [d['features'] for d in self.new_data]
y = [d['label'] for d in self.new_data]
# Entrena el modelo
model = RandomForestClassifier()
model.fit(X, y)
# Guarda el modelo
with open(self.model_path, 'wb') as f:
pickle.dump(model, f)
self.new_data = []
class ModelInferencer(CustomKafka):
def __init__(self, server: str, name: str, model_path: str):
super().__init__(server, name)
self.model_path = model_path
with open(model_path, 'rb') as f:
self.model = pickle.load(f)
self.producer = KafkaProducer(
bootstrap_servers=server,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
@CustomKafka.consumer(topic="data_topic", value_convert_to="json")
def predict_and_send(self, data: dict):
features = [data['features']]
prediction = self.model.predict(features)[0]
result = {'id': data['id'], 'prediction': prediction}
with self.producer() as producer:
producer.send(topic='prediction_topic', value=result, value_type="json")
model_update = ModelUpdater(server="localhost:9092", name="TextBlob", model_path="path/to/model.ptl")
model_update.run_consumers()
model_predict = ModelUpdater(server="localhost:9092", name="TextBlob", model_path="path/to/model.ptl")
model_predict.run_consumers()
Explicación
-
Captura y Reentrenamiento (ModelUpdater): Usa un consumidor Kafka para recibir datos desde data_topic. Los datos se acumulan y, cuando hay suficientes (100 en este caso), se reentrena el modelo y se guarda.
-
Inferencia (ModelInferencer): Usa un consumidor Kafka para recibir datos y cargar el modelo actualizado. Realiza predicciones con el modelo y envía los resultados al tópico prediction_topic.
-
Ejecución: Se crean instancias de ModelUpdater y ModelInferencer y se ejecutan para manejar datos y realizar inferencias.
Este enfoque simplificado permite la actualización y el uso de modelos en producción de manera eficiente y con un mínimo de código.
Good practics
Recomendaciones para el uso efectivo y seguro de CustomKafka.
Solución de Problemas
Preguntas frecuentes y cómo resolver errores comunes.
Contributing
Contributions are always welcome!
See contributing.md
for ways to get started.
Please adhere to this project's code of conduct
.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.