Skip to main content

Librería común para uso en ETL

Project description

Librería Python para ETLs

Esta librería tiene funciones comunes que pueden ser útiles para uso en las ETLs.

Página de la libreria en PyPI:https://pypi.org/project/tc-etl-lib/

Empaquetado de la librería

La librería permite versionado y se puede acceder a las diferentes versiones en dist. En ese directorio se van acumulando el empaquetado de cada versión de la librería.

Para empaquetar se procede de la siguiente manera:

  • En el fichero de configuración setup.py hay una variable VERSION que indica la versión. Se debe incrementar esta versión al empaquetar.

  • Se puede empaquetar la librería en diferentes formatos, dependiendo del sistema operativo desde que se ejecuta, puede ser zip, tar.gz, etc.. Por defecto en linux, si no le indicas un formato concreto, usará el .tar.gz. El empaquetado, creará un archivo en la carpeta /dist/ del tipo tc_etl_lib-<version>.tar.gz, que se puede utilizar en los proyectos que sea necesario. Se empaqueta mediante el siguiente comando:

      python3 setup.py sdist bdist_wheel
    
  • Una vez generada la librería, se puede utilizar desde cualquier proyecto instalándola mediante pip

Subida de la libreria a repositorio público de pip

Una vez construido el paquete de la librería según el procedimiento descrito en la sección anterior, se puede realizar su subida al repositorio público de pip.

En primer lugar, instalar la herramienta twine si no se dispone previamente de ella:

    pip install twine

Para subir la libreria al repositorio público de pip ejecutar el comando:

    twine upload dist/tc_etl_lib-x.y.z.tar.gz

Es necesario estar registrado previamente en https://pypi.org y con permisos en https://pypi.org/project/tc-etl-lib/, ya que se solicitará el usuario y password para realizar la subida.

Instalación de la librería

Existen distintas alternativas

Desde el repositorio público de pip (recomendada)

Para instalar la última versión:

    pip install tc_etl_lib

Para instalar una versión concreta:

    pip install tc_etl_lib==0.1.0

También se puede añadir la depedencia a requirements.txt:

    tc_etl_lib==0.1.0

he instalar (junto con el resto de depedencias) con el habitual:

    pip install -r requirements.txt

Con el paquete .tar.gz

La librería de tipo tc_etl_lib-<version>.tar.gz, se puede instalar en cualquier entorno python con el siguiente comando:

pip install ./tc_etl_lib/dist/tc_etl_lib-0.1.0.tar.gz

En este caso, la librería está disponible en ese directorio. Si estuviera disponible en cualquier otra ruta, se debería de reemplazar por la ruta relativa o absoluta correspondiente. Si se quiere instalar sobre un entorno venv, primero has de activar el entorno venv y luego ejecutar el comando de instalación de la librería.

Pueden obtenerlse los .tar.gz correspondientes a cada versión de la libreria en este enlace.

Desde el repositorio git

La librería se puede instalar usando el repositorio remoto de github, para ello se ha de añadir la siguiente linea en el fichero requirements.txt.

Para instalar una versión concreta

tc_etl_lib @ git+https://github.com/telefonicasc/etl-framework@{tag_version}#package=tc_etl_lib&subdirectory=python-lib/tc_etl_lib

Donde:

{tag_version} : Acceso a una versión concreta de la librería.

Para instalar la última versión

tc_etl_lib @ git+https://github.com/telefonicasc/etl-framework#package=tc_etl_lib&subdirectory=python-lib/tc_etl_lib

Una vez añadida esa referencia en requeriments.txt, se puede instalar con el comando:

pip install -r requirements.txt

Uso de la librería

El uso de la librería una vez se ha instalado en el venv, es similar al resto de librerías que dispondrías en python. A continuación mostramos un ejemplo de referencia:

# import library
import tc_etl_lib as tc
import json

# create a auth manager and use it
auth: tc.auth.authManager = tc.auth.authManager(endpoint='http://<auth_endpoint>:<port>', 
                                                service = 'alcobendas', 
                                                user = 'usuario_alcobendas', 
                                                password = 'xxx')

auth.get_auth_token_subservice(subservice = '/energia')

# create a cb manager and use it
cbm: tc.cb.cbManager = tc.cb.cbManager(endpoint = 'http://<cb_endpoint>:<port>')
entities = cbm.get_entities_page(subservice='/energia', auth=auth, type='myType')

# have a look to the retrieved entities
print (json.dumps(entities))

Ejemplo de uso de Recogida de todos los datos de tipo SupplyPoint con y sin paginación:

# import library
import tc_etl_lib as tc

# declare authManager
auth: tc.auth.authManager = tc.auth.authManager(endpoint='http://<auth_endpoint>:<port>', 
                                                service = 'alcobendas', 
                                                subservice = '/energia',
                                                user = 'usuario_alcobendas', 
                                                password = 'xxx')
# get token
try:
    token = auth.get_auth_token_subservice()
    logger.info(f'token: {token}')
except Exception as err:
    logger.error('get_auth_token_subservice problems!!')
    logger.error(err)

# get all SupplyPoint entities with pg
cbm: tc.cb.cbManager = tc.cb.cbManager(endpoint = 'http://<cb_endpoint>:<port>')

pg = 1
limit = 5
entities = None
while entities != []:
    entities = cbm.get_entities_page(auth = auth, offset = (pg-1)*limit, limit = limit, type='SupplyPoint')
    for i, item in enumerate(entities):
        logger.info(f'(pg:{pg})[{i + (pg-1)*limit}]--> {item["id"]} ({item["type"]})')
    pg += 1

# get all SupplyPoint entities without pg
entities = cbm.get_entities(auth = auth, type='SupplyPoint')
for i, item in enumerate(entities):
    logger.info(f'[{i}]--> {item["id"]} ({item["type"]})')

Ejemplo de Envío de datos en ráfaga

# import library
import tc_etl_lib as tc

# declare authManager
auth: tc.auth.authManager = tc.auth.authManager(endpoint='http://<auth_endpoint>:<port>', 
                                                service = 'alcobendas', 
                                                subservice = '/energia',
                                                user = 'usuario_alcobendas', 
                                                password = 'xxx')

# get token
try:
    auth.get_auth_token_subservice()
except Exception as err:
    logger.error('get_auth_token_subservice problems!!')
    logger.error(err)

# send entities
cbm: tc.cb.cbManager = tc.cb.cbManager(endpoint = 'http://<cb_endpoint>:<port>')

entities = [
            {
                "id": "myEntity1",
                "type": "myType",
                "description": {
                    "value": "My first happy entity",
                    "type": "Text"
                },
                "online": {
                    "value": 'true',
                    "type": "Boolean"
                },
                "temperature": {
                    "value": '42.3',
                    "type": "Number"
                }
            },
            {
                "id": "myEntity2",
                "type": "myType",
                "description": {
                    "value": "My second happy entity",
                    "type": "Text"
                },
                "online": {
                    "value": 'false',
                    "type": "Boolean"
                },
                "temperature": {
                    "value": '12.3',
                    "type": "Number"
                }
            }
	    ]
 
cbm.send_batch(auth=auth, subservice='/energia', entities=entities)

Funciones disponibles en la librería

La librería está creada con diferentes clases dependiendo de la funcionalidad deseada.

  • Clase authManager: En esta clase están las funciones relacionadas con la Autenticación (IDM).
    • __init()__: constructor de objetos de la clase
      • :param obligatorio endpoint: define el endpoint de identificación (ejemplo: https://<service>:<port>). Se debe especificar en el constructor del objeto de tipo authManager, sino avisará con una excepción ValueError.
      • :param obligatorio user: define el usuario para identificarse. Se debe especificar en el constructor del objeto de tipo authManager, sino avisará con una excepción ValueError.
      • :param obligatorio password: define la contraseña para identificarse. Se debe especificar en el constructor del objeto de tipo authManager, sino avisará con una excepción ValueError.
      • :param obligatorio service: define el servicio donde identificarse. Se debe especificar en el constructor del objeto del tipo authManager, sino avisará con una excepción ValueError.
      • :param opcional subservice: define el subservicio donde identificarse. Se puede especificar en el constructor o a posteriori cuando se llamen a las funciones del cbManager.
      • :raises ValueError: Se lanza cuando le falta alguno de los argumentos obligatorios
    • get_auth_token_subservice: Realiza una petición de token al IDM, identificándose con los creenciales de usuario contraseña, servicio y subservicio. El token obtenido se almacena internamente en un caché dentro del propio objeto, de forma que otras funciones de la librería que hagan uso del authManager (pe. las del cbManager) intentará utilizar siempre primero este caché antes que solicitar un nuevo token via API del IDM.
      • :param opcional subservice: se define el subservicio al que identificarse. Este parámetro es opcional. Si no se le indica, cogerá el subservicio inicializado en el objeto de tipo authManager que esté llamando la función. Si no está inicializado en el objeto ni se pasa por parámetro, se lanzará un ValueError.
      • :raises ValueError: Se lanza cuando le falta algún argumento o inicializar alguna variable del objeto authManager, para poder realizar la autenticación.
      • :raises Exception: Se lanza cuando el servicio de identificación, responde con un error concreto.
      • :return: El token que te asigna el servicio de identificación, a parte internamente authManager almacena en caché el token que ha recibido de la autenticación para cada servicio, por si lo necesita a postetiori para realizar acciones con ese token.
  • Clase cbManager: En esta clase están funciones para la interacción con el Context Broker.
    • __init()__: constructor de objetos de la clase
      • :param obligatorio endpoint: define el endpoint del context broker (ejemplo: https://<service>:<port>). Se debe especificar en el constructor del objeto de tipo cbManager, sino avisará con una excepción ValueError.
      • :param opcional timeout: timeout definido en segundos (default: 10).
      • :param opcional post_retry_connect: Número de reintentos a la hora de realizar un envío de datos (default: 3)
      • :param opcional post_retry_backoff_factor: Factor que se usa, para esperar varios segundos tras enviar una ráfaga de datos. (default: 0)
      • :param opcional sleep_send_batch: Pausa en segundos, que se realiza cada vez que se envia una ráfaga de datos. (default: 0).
      • :param opcional cb_flowcontrol: Opción del Context Broker, que permite un mejor rendimiento en caso de envío masivo de datos (batch updates). Este mecanismo, requiere arrancar el Context Broker con un flag concreto y en las peticiones de envío de datos, añadir esa opción. Referencia en documentación de Orion (default: False)
      • :param opcional block_size: Cuando se realiza el envío de datos al Context Broker mediante la función de send_batch, se realiza envíos en tramos que no excedan el block_size indicado (default: 800000). Se permite modificar el valor de block_size, pero sin superar la limitación de 800000. En caso de indicar un valor que supere ese límite, se lanzará una excepción ValueError indicando que se ha excedido el límite del valor permitido.
      • :raises ValueError: Se lanza cuando le falta alguno de los argumentos obligatorios
    • send_batch: Función que envía un lote de entidades al Context Broker. Se le pasa el authManager, que ya dispone de un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita nuevo y luego envía los datos.
      • :param obligatorio auth: Se le proporciona el authManager, que tiene las credenciales por si ha de solicitar un token y dispone del listado de tokens asociado. Si no se define en la llamada a la función, avisará con una excepción ValueError.
      • :param obligatorio entities: Listado de entidades a enviar, Si no se define en la llamada a la función, avisará con una excepción ValueError.
      • :param opcional subservice: Se puede indicar al subservicio al que se le quiere enviar los datos. Sino se indica, usará el que haya inicializado en el objeto authManager. Sino dispone ninguno de los dos definidos, lanzará un ValueError indicando que necesita definirse el subservicio.
      • :raises ValueError: Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos.
      • :raises Exception: Se lanza cuando el servicio de Context Broker, responde con un error concreto.
      • :return: True si la operación es correcta (i.e. el CB devolió un code http 204).
    • get_entities_page: Función que recoge datos del Context Broker. Se le pasa el authManager, que ya dispone de un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita nuevo y luego recoge los datos.
      • :param obligatorio auth: Se le proporciona el authManager, que tiene las credenciales por si ha de solicitar un token y dispone del listado de tokens asociado. Si no se define en la llamada a la función, avisará con una excepción ValueError.
      • :param opcional subservice: Se puede indicar al subservicio al que se le quiere enviar los datos. Sino se indica, usará el que haya inicializado en el objeto authManager. Sino dispone ninguno de los dos definidos, lanzará un ValueError indicando que necesita definirse el subservicio.
      • :param opcional offset: Se establece el offset cuando se recogen los datos. Si no se especifica, enviará la petición al Context Broker sin offset definido. Por lo tanto se aplicará el valor offset por defecto que tiene el Context Broker que es 0.
      • :param opcional limit: Se establece un límite de recogida de datos. Si no se especifica, enviará la petición al Context Broker sin el limit definido, por lo tanto se aplirá el valor de limit por defecto que tiene el Context Broker, que es 20.
      • :param opcional type: Se establece la recogida de un tipo de datos. Si no se especifica, enviará la petición al Context Broker sin el type definido, por lo tanto los datos no serán filtrados por el tipo de entidad.
      • :param opcional orderBy: Se establece un orden en la recogida de datos. Si no se especifica, enviará la petición al Context Broker sin el orderBy definido, por lo tanto aplicará el orden por defecto en el Context Broker (i.e. orden de fecha de creación de la entidades)
      • :param opcional q: Se establece un filtro de datos en función del valor de los atributos especificados. Si no se especifica, enviará la petición al Context Broker sin el q definido, por lo tanto los datos no serán filtrados por estos atributos.
      • :param opcional mq: Se establece un filtro de datos en función de los metadatos definidos en este parámetro. Si no se especifica, enviará la petición al Context Broker sin el mq definido, por lo tanto los datos no serán filtrados por los metadatos.
      • :param opcional georel: Cuando se define un filtro de datos por geolocalización, se ha de especificar en georel la relación espacial que se va a utilizar en el filtrado. Se pueden consultar los diferentes valores de georel en NGSIv2 API
      • :param opcional geometry: Cuando se define un filtro de datos por geolocalización, se ha de especificar un tipo de dibujo que se utiliza para resolver el filtrado. Se pueden consultar los diferentes valores de geometry en NGSIv2 API
      • :param opcional coords: Cuando se define un filtro de datos por geolocalización, se ha de especificar una lista de coordenadas geograficas separadas por coma. Se pueden consultar los diferentes valores de coords en NGSIv2 API
      • :param opcional id: Se establece un filtro por Identificador. Si no se especifica, enviará la petición al Context Broker sin el id definido, por lo tanto los datos no serán filtrados por identificador.
      • :raises ValueError: Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos.
      • :raises FetchError: Se lanza cuando el servicio de Context Broker, responde con un error concreto.
      • :return: array de datos cuyos elementos son objeto que representan entidades, según el formato descrito en la sección "JSON Entity Representation" de la NGSIv2 API
    • get_entities: Función que recoge datos del Context Broker. Se le pasa el authManager, que ya dispone de un listado con todos los tokens por subservicio y usa el correspondiente para realizar la llamada al Context Broker. Si no se dispone de token o ha caducado, se solicita nuevo y luego recoge los datos. Esta función con diferencia a get_entities_page usa un mecanismo de paginación interna para retornar todas las entidades que concuerden con los filtros definidos, para ello va realizando solicitudes necesarias al Context Broker para recoger todos los datos en tramos y retornarlos acumulados en un array.
      • :param obligatorio auth: Se le proporciona el authManager, que tiene las credenciales por si ha de solicitar un token y dispone del listado de tokens asociado. Si no se define en la llamada a la función, avisará con una excepción ValueError.
      • :param opcional subservice: Se puede indicar al subservicio al que se le quiere enviar los datos. Sino se indica, usará el que haya inicializado en el objeto authManager. Sino dispone ninguno de los dos definidos, lanzará un ValueError indicando que necesita definirse el subservicio.
      • :param opcional limit: Se establece un límite de recogida de datos en cada tramo. Internamente se usa un mecanismo de paginación y se van solicitando al context broker los datos en tramos. Si no se especifica, enviará la petición al Context Broker sin el limit definido, por lo tanto se aplirá el valor de limit por defecto, tiene un valor de 100.
      • :param opcional type: Se establece la recogida de un tipo de datos. Si no se especifica, enviará la petición al Context Broker sin el type definido, por lo tanto los datos no serán filtrados por el tipo de entidad.
      • :param opcional orderBy: Se establece un orden en la recogida de datos. Si no se especifica, enviará la petición al Context Broker sin el orderBy definido, por lo tanto aplicará el orden por defecto en el Context Broker (i.e. orden de fecha de creación de la entidades)
      • :param opcional q: Se establece un filtro de datos en función del valor de los atributos especificados. Si no se especifica, enviará la petición al Context Broker sin el q definido, por lo tanto los datos no serán filtrados por estos atributos.
      • :param opcional mq: Se establece un filtro de datos en función de los metadatos definidos en este parámetro. Si no se especifica, enviará la petición al Context Broker sin el mq definido, por lo tanto los datos no serán filtrados por los metadatos.
      • :param opcional georel: Cuando se define un filtro de datos por geolocalización, se ha de especificar en georel la relación espacial que se va a utilizar en el filtrado. Se pueden consultar los diferentes valores de georel en NGSIv2 API
      • :param opcional geometry: Cuando se define un filtro de datos por geolocalización, se ha de especificar un tipo de dibujo que se utiliza para resolver el filtrado. Se pueden consultar los diferentes valores de geometry en NGSIv2 API
      • :param opcional coords: Cuando se define un filtro de datos por geolocalización, se ha de especificar una lista de coordenadas geograficas separadas por coma. Se pueden consultar los diferentes valores de coords en NGSIv2 API
      • :param opcional id: Se establece un filtro por Identificador. Si no se especifica, enviará la petición al Context Broker sin el id definido, por lo tanto los datos no serán filtrados por identificador.
      • :raises ValueError: Se lanza cuando le falta algún argumento o inicializar alguna varibale del objeto cbManager, para poder realizar la autenticación o envío de datos.
      • :raises FetchError: Se lanza cuando el servicio de Context Broker, responde con un error concreto.
      • :return: array de datos cuyos elementos son objeto que representan entidades, según el formato descrito en la sección "JSON Entity Representation" de la NGSIv2 API

Changelog

0.2.0 (July 13th, 2022)

  • Add: get_entities function to get entities with internal pagination (#5)
  • Add: block control to send_batch function to overcome CB limitation (new cbManager constructor optional param block_size) (#6)
  • Add: more filters (q, mq, georel, geometry, coords, id) to get_entities_page function (#13)

0.1.0 (April 26th, 2022)

  • Initial version

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tc_etl_lib-0.2.0.tar.gz (16.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page