Skip to main content

Proyecto que facilita el flujo de información de cliente.

Project description

centraal-client-flow

Release Status CI Status Documentation Status

centraal-client-flow es una librería de Python diseñada para facilitar la implementación de una una solución basada en eventos para la sincronización y consistencia de datos entre sistemas distribuidos de clientes utilizando Azure. Esta librería proporciona herramientas para manejar eventos, aplicar reglas de procesamiento, integrar con sistemas externos y mantener un esquema unificado de clientes.

Introducción

Centraal-Cliente-Flow facilita la implementación de arquitecturas de sincronización de datos en Azure, proporcionando una base sólida para manejar eventos en tiempo real, reglas de negocio, integración con APIs externas y mantenimiento de un log de auditoría.

Arquitectura

La arquitectura está diseñada para unificar la información de los clientes alrededor de un identificador único, asegurando la consistencia de los datos a través de los siguientes componentes clave:

  • Eventos: Gestionados por Azure Functions para manejar eventos entrantes en tiempo real y operaciones periódicas de extracción de datos.
  • Reglas: Implementan la lógica de procesamiento de eventos para actualizar un esquema unificado de clientes.
  • Reglas de Integración: Sincronizan las actualizaciones del esquema unificado con sistemas externos a través de APIs REST.
  • Esquema Unificado: Modelo de datos centralizado que asegura consistencia y escalabilidad en la información de los clientes.
  • Log de Auditoría: Registra todas las actualizaciones del esquema unificado para asegurar trazabilidad.
  • Uso de Pydantic: La libreria hace uso extensivo e incentiva que cada intercambio de datos este mediado por modelos Pydantic, ya de esta manera se asegura calidad y reglas de negocio.
                Evento
                  ^
                  |
                  v
+------------------------------+
|   Receiver/Timer Function     |
|  (Valida y envía a la cola    |
|   usando Pydantic)            |
+------------------------------+
            |
            |
         [P]-EventoBase(IdModel)
            |
            v
+------------------------------+
|   Azure Service Bus Queue     |
|  (Ordena eventos por          |
|   Session ID)                 |
+------------------------------+
            |
            v
+---------------------------------------+
|  Processor Function                   |
|  Reglas de Actualización              |
|  (Actualiza esquema y log de          |
|   auditoría usando Pydantic,          |
|   Publica actualizaciones)            |
+---------------------------------------+
            |                   |
            |                   |
         [P]-EntradaEsquemaUnificado
    [P] - AuditoriaEntry        |
            |                   |
            v                   v
+-----------------------+   +-------------------------+
|     Cosmos DB         |   |  Azure Service Bus      |
| (Esquema Unificado y  |   |         Topic           |
|  Log de Auditoría)    |   |                         |
+-----------------------+   +-------------------------+
            |                           |
            |                           |
      [P]-AuditoriaEntry                |
            |                           |
            v                           v
+-----------------------+   +-------------------------+
| Log de Auditoría en   |   | Integration Function    |
| Cosmos DB             |   | Reglas  y Estrategias   |
| (Registra cambios)    |   | de Integración          |
+-----------------------+   |                         |
                            |                         |
                            +-------------------------+
                                       |
                                 [P]-BaseModel
                                       |
                                       v
                            +-------------------------+
                            |    Sistemas Externos    |
                            | (Reciben actualizaciones|
                            |  a través de APIs REST) |
                            +-------------------------+

Componentes Clave

1. Eventos

  • Receiver Functions: Manejan eventos entrantes en tiempo real. Implementadas en el módulo receiver.py utilizando clases como EventFunctionBuilder.
  • Timer Functions: Ejecutan tareas periódicas para extraer información de sistemas externos, definidas en timer.py usando TimerFunctionBuilder.

2. Reglas de Procesamiento

Las reglas para actualizar el esquema unificado de clientes se implementan usando UpdateProcessor y RuleProcessor, que permiten procesar y aplicar reglas específicas a los eventos entrantes.

3. Reglas de Integración

Se implementan en strategy.py usando la clase RESTIntegration, que permite la sincronización de datos con APIs REST externas.

4. Esquema Unificado

Definido en schemas.py, utiliza modelos Pydantic para asegurar la validación y consistencia de datos. Los modelos incluyen IDModel, EntradaEsquemaUnificado, y otros sub-esquemas específicos.

5. Log de Auditoría

Para asegurar la trazabilidad de las actualizaciones, todos los cambios en los sub-esquemas se registran en una colección de auditoría en Cosmos DB.

Uso de la Librería

1. Configuración Inicial

Asegúrate de tener configuradas las variables de entorno necesarias para las conexiones a Cosmos DB y Azure Service Bus.

import os

os.environ["COSMOS_CONN"] = "<tu_cosmos_db_connection_string>"
os.environ["DATABASE"] = "<tu_database_name>"
os.environ["BUS_CONN"] = "<tu_service_bus_connection_string>"

2. Registrar Funciones de Azure

Eventos

Utiliza el siguiente ejemplo para registrar funciones receptoras y de temporización.

from azure.functions import FunctionApp
from centraal_client_flow.receiver import Recieve
from centraal_client_flow.timer import Pull

app = FunctionApp()

# Registrar función receptora
receiver = Recieve(event_source="source_name", queue_name="queue_name", service_bus_client=service_bus_client_instance)
receiver.register_function(app, processor=event_processor_instance, event_model=event_model_instance)

# Registrar función de temporización
pull = Pull(schedule="0 */5 * * * *", event_source="source_name", queue_name="queue_name", service_bus_client=service_bus_client_instance)
pull.register_function(app, processor=pull_processor_instance)

Reglas de Actualización

from azure.functions import FunctionApp
from update_rules import bp_update_rules

app = FunctionApp()
app.register_functions(bp_update_rules)

Reglas de Integración

from azure.functions import FunctionApp
from integration_rules import bp_int_rules

app = FunctionApp()
app.register_functions(bp_int_rules)

3. Definir Modelos y Procesadores

Define los modelos de datos utilizando Pydantic para asegurar la validación de datos entrantes.

from pydantic import BaseModel, EmailStr

class EventoEjemplo(BaseModel):
    id: int
    nombre: str
    email: EmailStr

Implementa procesadores para manejar la lógica de actualización de acuerdo con las reglas de negocio.

from centraal_client_flow.rules.update import UpdateProcessor
from modelos import EventoEjemplo

class EjemploProcessor(UpdateProcessor):
    def process_message(self, event: EventoEjemplo, current_registro=None):
        # Lógica de procesamiento de eventos
        pass

4. Ejecutar la Aplicación

Asegúrate de que todas las dependencias estén instaladas y ejecuta la aplicación utilizando un servidor de funciones de Azure.

func start

Contribuciones

Las contribuciones son bienvenidas. Por favor, abre un issue o un pull request en el repositorio para discutir cualquier cambio.

Credits

This package was created with the ppw tool. For more information, please visit the project page.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

centraal_client_flow-0.1.13.tar.gz (26.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

centraal_client_flow-0.1.13-py3-none-any.whl (27.5 kB view details)

Uploaded Python 3

File details

Details for the file centraal_client_flow-0.1.13.tar.gz.

File metadata

  • Download URL: centraal_client_flow-0.1.13.tar.gz
  • Upload date:
  • Size: 26.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for centraal_client_flow-0.1.13.tar.gz
Algorithm Hash digest
SHA256 cac74faff28a0705e0a9451c706529c3746fb1040f95cce6d8422c484e85bc05
MD5 4f6b83a6beffc6f5f46b6fc5438e18b5
BLAKE2b-256 2084996abe5e66e2bc0091b3ea4b1e4c2a64a238c60df2536e78d22812b658b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for centraal_client_flow-0.1.13.tar.gz:

Publisher: release.yml on centraal-api/centraal-client-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file centraal_client_flow-0.1.13-py3-none-any.whl.

File metadata

File hashes

Hashes for centraal_client_flow-0.1.13-py3-none-any.whl
Algorithm Hash digest
SHA256 416f3260d806bd222ae5e6ec0fbc22090da4af3f5565509b767118f10bcf2779
MD5 6b7ed49e7509a9daf7cedc0afcbc147f
BLAKE2b-256 3ae9fac63d6237a1ef329a4ca751cdcc176d68ab3f5bbe3db6faf6bff6b2028c

See more details on using hashes here.

Provenance

The following attestation bundles were made for centraal_client_flow-0.1.13-py3-none-any.whl:

Publisher: release.yml on centraal-api/centraal-client-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page