Proyecto que facilita el flujo de información de cliente.
Project description
centraal-client-flow
centraal-client-flow
es una librería de Python diseñada para facilitar la implementación de una una solución basada en eventos para la sincronización y consistencia de datos entre sistemas distribuidos de clientes utilizando Azure. Esta librería proporciona herramientas para manejar eventos, aplicar reglas de procesamiento, integrar con sistemas externos y mantener un esquema unificado de clientes.
Introducción
Centraal-Cliente-Flow
facilita la implementación de arquitecturas de sincronización de datos en Azure, proporcionando una base sólida para manejar eventos en tiempo real, reglas de negocio, integración con APIs externas y mantenimiento de un log de auditoría.
Arquitectura
La arquitectura está diseñada para unificar la información de los clientes alrededor de un identificador único, asegurando la consistencia de los datos a través de los siguientes componentes clave:
- Eventos: Gestionados por Azure Functions para manejar eventos entrantes en tiempo real y operaciones periódicas de extracción de datos.
- Reglas: Implementan la lógica de procesamiento de eventos para actualizar un esquema unificado de clientes.
- Reglas de Integración: Sincronizan las actualizaciones del esquema unificado con sistemas externos a través de APIs REST.
- Esquema Unificado: Modelo de datos centralizado que asegura consistencia y escalabilidad en la información de los clientes.
- Log de Auditoría: Registra todas las actualizaciones del esquema unificado para asegurar trazabilidad.
- Uso de Pydantic: La libreria hace uso extensivo e incentiva que cada intercambio de datos este mediado por modelos Pydantic, ya de esta manera se asegura calidad y reglas de negocio.
Evento
^
|
v
+------------------------------+
| Receiver/Timer Function |
| (Valida y envía a la cola |
| usando Pydantic) |
+------------------------------+
|
|
[P]-EventoBase(IdModel)
|
v
+------------------------------+
| Azure Service Bus Queue |
| (Ordena eventos por |
| Session ID) |
+------------------------------+
|
v
+---------------------------------------+
| Processor Function |
| Reglas de Actualización |
| (Actualiza esquema y log de |
| auditoría usando Pydantic, |
| Publica actualizaciones) |
+---------------------------------------+
| |
| |
[P]-EntradaEsquemaUnificado
[P] - AuditoriaEntry |
| |
v v
+-----------------------+ +-------------------------+
| Cosmos DB | | Azure Service Bus |
| (Esquema Unificado y | | Topic |
| Log de Auditoría) | | |
+-----------------------+ +-------------------------+
| |
| |
[P]-AuditoriaEntry |
| |
v v
+-----------------------+ +-------------------------+
| Log de Auditoría en | | Integration Function |
| Cosmos DB | | Reglas y Estrategias |
| (Registra cambios) | | de Integración |
+-----------------------+ | |
| |
+-------------------------+
|
[P]-BaseModel
|
v
+-------------------------+
| Sistemas Externos |
| (Reciben actualizaciones|
| a través de APIs REST) |
+-------------------------+
Componentes Clave
1. Eventos
- Receiver Functions: Manejan eventos entrantes en tiempo real. Implementadas en el módulo
receiver.py
utilizando clases comoEventFunctionBuilder
. - Timer Functions: Ejecutan tareas periódicas para extraer información de sistemas externos, definidas en
timer.py
usandoTimerFunctionBuilder
.
2. Reglas de Procesamiento
Las reglas para actualizar el esquema unificado de clientes se implementan usando UpdateProcessor
y RuleProcessor
, que permiten procesar y aplicar reglas específicas a los eventos entrantes.
3. Reglas de Integración
Se implementan en strategy.py
usando la clase RESTIntegration
, que permite la sincronización de datos con APIs REST externas.
4. Esquema Unificado
Definido en schemas.py
, utiliza modelos Pydantic para asegurar la validación y consistencia de datos. Los modelos incluyen IDModel
, EntradaEsquemaUnificado
, y otros sub-esquemas específicos.
5. Log de Auditoría
Para asegurar la trazabilidad de las actualizaciones, todos los cambios en los sub-esquemas se registran en una colección de auditoría en Cosmos DB.
Uso de la Librería
1. Configuración Inicial
Asegúrate de tener configuradas las variables de entorno necesarias para las conexiones a Cosmos DB y Azure Service Bus.
import os
os.environ["COSMOS_CONN"] = "<tu_cosmos_db_connection_string>"
os.environ["DATABASE"] = "<tu_database_name>"
os.environ["BUS_CONN"] = "<tu_service_bus_connection_string>"
2. Registrar Funciones de Azure
Eventos
Utiliza el siguiente ejemplo para registrar funciones receptoras y de temporización.
from azure.functions import FunctionApp
from centraal_client_flow.receiver import Recieve
from centraal_client_flow.timer import Pull
app = FunctionApp()
# Registrar función receptora
receiver = Recieve(event_source="source_name", queue_name="queue_name", service_bus_client=service_bus_client_instance)
receiver.register_function(app, processor=event_processor_instance, event_model=event_model_instance)
# Registrar función de temporización
pull = Pull(schedule="0 */5 * * * *", event_source="source_name", queue_name="queue_name", service_bus_client=service_bus_client_instance)
pull.register_function(app, processor=pull_processor_instance)
Reglas de Actualización
from azure.functions import FunctionApp
from update_rules import bp_update_rules
app = FunctionApp()
app.register_functions(bp_update_rules)
Reglas de Integración
from azure.functions import FunctionApp
from integration_rules import bp_int_rules
app = FunctionApp()
app.register_functions(bp_int_rules)
3. Definir Modelos y Procesadores
Define los modelos de datos utilizando Pydantic para asegurar la validación de datos entrantes.
from pydantic import BaseModel, EmailStr
class EventoEjemplo(BaseModel):
id: int
nombre: str
email: EmailStr
Implementa procesadores para manejar la lógica de actualización de acuerdo con las reglas de negocio.
from centraal_client_flow.rules.update import UpdateProcessor
from modelos import EventoEjemplo
class EjemploProcessor(UpdateProcessor):
def process_message(self, event: EventoEjemplo, current_registro=None):
# Lógica de procesamiento de eventos
pass
4. Ejecutar la Aplicación
Asegúrate de que todas las dependencias estén instaladas y ejecuta la aplicación utilizando un servidor de funciones de Azure.
func start
Contribuciones
Las contribuciones son bienvenidas. Por favor, abre un issue o un pull request en el repositorio para discutir cualquier cambio.
- Free software: Apache-2.0
- Documentation: https://centraal-api.github.io/centraal_client_flow/
Credits
This package was created with the ppw tool. For more information, please visit the project page.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file centraal_client_flow-0.1.12.tar.gz
.
File metadata
- Download URL: centraal_client_flow-0.1.12.tar.gz
- Upload date:
- Size: 22.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 93de62c6eda10858bdb686dd966583d7d50ff75cd63197a16d3e46d1b1959103 |
|
MD5 | 8889c1a5b33ac92c3be16579e6ca9f34 |
|
BLAKE2b-256 | 3831a2cc5bcd05970e7a5bde765fcb24f88e54a19f8e5f0b76b1747668c31845 |
File details
Details for the file centraal_client_flow-0.1.12-py3-none-any.whl
.
File metadata
- Download URL: centraal_client_flow-0.1.12-py3-none-any.whl
- Upload date:
- Size: 22.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0abdaf87f4660ba04782481a905d3bfdfe00026229a434737f404458d53dee7a |
|
MD5 | 3aefbdb6785731ea9aebdeab7d744971 |
|
BLAKE2b-256 | a097d2f36e10880dfb18750b707ef7e5eb4b8a90d44ebb3d68cdbaec0bef1215 |