Skip to main content

Librería que utiliza generadores de python para simular procesamiento de flujo de datos

Project description

Esta librería implementa una interfaz de programación similar a la brindada por el proyecto WebFlux de spring.

La interfaz implementada es similar a la siguiente: Flux

filter(predicate, on_mismatch)

Permite filtrar un flujo permitiendo continuar a aquellos valores que cumplen con el predicado que se indica.

Si no se cumple el predicado opcionalmente se puede indicar una función que dado el valor y el contexto actual retorna el valor que se enviará al flujo.

  • predicate: función(valor, contexto) que retorna un booleano

  • on_mismatch: función(valor, contexto) que retorna un valor alternativo al testeado originalmente

map(map_function)

Permite modificar el valor de un flujo.

Recibe una función(valor, contexto) que se evaluará para cada elemento del flujo y se substituirá en el mismo por lo obtenido de la función.

  • map_function: función(valor, contexto) desde donde se obtendrá el valor a sustituir.

map_context(map_context_function)

Permite modificar el contexto de un flujo.

Recibe una función(valor, contexto) que se evaluará para cada elemento del flujo y retornará un nuevo contexto que sustituirá al actual

  • map_context_function: función(valor, contexto) desde donde se obtendrá el contexto a subsituir.

flat_map(flatmap_function)

Permite sustituir el valor de un flujo por elementos de otro flujo.

Recibe una función(valor, contexto) que se evaluará para cada elemento del flujo original y retornará un flujo cuyos elementos se usarán como nuevo origen del flujo de datos.

  • flatmap_function: función(valor, contexto) desde donde se obtendrá el valor a sustituir.

do_on_next(on_next)

Ejecuta para cada elemento del flujo una acción de forma asincrónica. No afecta a los valores del flujo.

  • on_next: función(valor, contexto) que se ejecutará para cada valor del flujo

  • on_error: función(ex, contexto) se ejecuta en caso de error

  • exec_async: Booleano que indica si las funciones se ejecutarán de forma asíncrona

delay_ms(delay_ms, predicate=fu.default_predicate)

Retrasa en delay milisegundos la ejecución de paso del flujo. Esto marca el tiempo en que se procesarán sus elementos.

  • delay_ms: Delay en milisegundos que se retrasará el procesamiento de los elementos del flujo

  • predicate: Si es verdadero se aplica el delay establecido

take(n)

Corta la ejecución del flujo luego de procesar n elementos.

  • n: Cantidad de elementos procesados antes de cortar el flujo

take_until_seconds(n)

Corta la ejecución del flujo luego de n segundos de procesamiento.

La ejecución se cortará si al momento de evaluar este paso el tiempo transcurrido fue superior al configurado.

  • n: Cantidad de segundos que el stream procesa elementos.

log(build_log_message, build_error_message, level)

Recibe una función(valor, contexto) que retorna un mensaje de texto que será logueado con el nivel de log indicado en level.

El logueo se hace de forma asincrónica y no afecta al flujo.

  • build_log_message: Función que dado un valor y contexto retorna un mensaje a loguear

  • build_error_message: Función que dada una excepción y el contexto retorna el mensaje a loguear como ERROR

  • level: Nivel de logueo que se usará

on_error_resume(resume=fu.default_error_resume, exceptions=[Exception])

En caso de error, este paso ejecuta la función resume que debe retornar un valor a partir del error recibido por parámetro.

  • resume: función(ex, contexto) Debe retornar el valor a sustituir en el flujo.

  • exceptions: Lista de excepciones para los que se aplica la función resume.

on_error_retry(retries=3, delay_ms=lambda i: 0, exceptions=[Exception])

En caso de error, este paso intenta ejecutar nuevamente el flujo retries cantidad de veces.

Esto lo hace sólo para las excepciones indicadas por exceptions y los reintentos tienen un delay dado por la función delay_ms que recibe el número de reintento que es y espera obtener los ms de delay que se deben aplicar.

  • retries: Cantidad de reintentos a aplicar

  • delay_ms: función(int) que retorna dado el número de reintento en el que se está cuantos: ms de delay se deben aplicar.

  • exceptions: Lista de excepciones para los que se aplica el método resume.

subscribe(context, skip_error)

Crea un objeto iterable a partir del flujo. Si se itera sobre este objeto se obtendrán los valores del flujo.

  • skip_error: Ignora errores al obtener los valores desde el flujo

  • context: Contexto inicial para el flujo

  • return: Objeto iterable

foreach(on_success=fu.default_success, on_error=fu.default_error, context={})

Itera sobre los elementos del flujo e invoca a funciones on_success y on_error dependiendo el estado del flujo.

  • on_success: función(valor, contexto) se invoca si el flujo procesa correctamente un valor

  • on_error: función(ex, valor, contexto) se invoca si hay un error en el flujo. Esto no corta el procesamiento a menos que se lance una excepción en el método

  • on_finish: función(contexto) se invoca cuando el flujo finaliza

  • context: Contexto inicial para el flujo

to_list(context={}, skip_error=True)

Itera sobre los elementos del flujo y los retorna todos dentro de una lista.

  • context: contexto inicial para el flujo

  • skip_error: Ignora errores al obtener los valores desde el flujo

  • return: Lista de elementos

collect(init=lambda c: {}, reduce=lambda v, a: a, context={})

Permite resumir en un sólo objeto el procesamiento de todos los elementos del flujo.

Se inicializa un acumulador a través de la función init(contexto) y luego para cada elemento del flujo se invoca la función reduce(valor, acumulador) que procesa el valor y retorna un nuevo valor del acumulador.

  • init: función(contexto) Retorna valor inicial del acumulador

  • reduce: funcón(valor, acumulador) Dados el nuevo valor y el acumulador retorna un nuevo valor de acumulador.

  • context: Contexto inicial para el flujo

  • return: Acumulador

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_flux-1.0.8.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_flux-1.0.8-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file python_flux-1.0.8.tar.gz.

File metadata

  • Download URL: python_flux-1.0.8.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/22.6.0

File hashes

Hashes for python_flux-1.0.8.tar.gz
Algorithm Hash digest
SHA256 913ed4421495eab374fde4b4a0f720396e7e6a611ed1adb5e2b7309c658fff3d
MD5 6e4a07d42c0b7a5d78165a6307340be4
BLAKE2b-256 61f1b7ed8ba858a07ad9db5f0d6949388602ea5d2e76af4277a78af19556a264

See more details on using hashes here.

File details

Details for the file python_flux-1.0.8-py3-none-any.whl.

File metadata

  • Download URL: python_flux-1.0.8-py3-none-any.whl
  • Upload date:
  • Size: 8.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/22.6.0

File hashes

Hashes for python_flux-1.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 ef8a815f084e932bd6c16f4de7418aacbb02a6f83abb1864c74dd1792aa26a55
MD5 491507f211994d6659af295bff562038
BLAKE2b-256 0c98d4d3c226a398fcb4a175d7b41ffdf21b8f395bdef2789f95940422268127

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page