Skip to main content

Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)

Project description

binflow

Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.

Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.

Filosofia

binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.

Lo que SI hace binflow:

  • Mantener conexiones WebSocket vivas y reconectar automaticamente
  • Garantizar continuidad del dato (sin gaps en trade IDs)
  • Servir datos normalizados, listos para consumir
  • Escalar a cientos de simbolos con eficiencia (combined streams)
  • Order books correctos e integros en memoria

Lo que NO hace binflow (responsabilidad del consumidor):

  • Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
  • Callbacks o event-driven processing
  • Metricas y monitoring (Prometheus, Grafana)
  • Logica de trading o senales

Otra libreria o servicio debe consumir de binflow via polling (.get(), .get_recent()) y encargarse del resto.

Caracteristicas

  • Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
  • 8 tipos de stream -- trades, order book, aggTrades, klines, ticker 24h, miniTicker, bookTicker, liquidaciones (forceOrder)
  • Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
  • Gestion dinamica -- subscribe/unsubscribe de simbolos en combined streams sin recrear la conexion
  • Order books sincronizados -- REST snapshot + diffs por WebSocket, con campo is_synced que indica integridad verificada
  • Order books profundos -- hasta 5000 niveles (Spot) o 1000 (Futures) por lado
  • Rate limiting automatico -- delegado a panzer, respeta cabeceras X-MBX-USED-WEIGHT de Binance
  • Anti-regresion -- el cache rechaza snapshots con lastUpdateId anterior al existente, protegiendo contra datos stale
  • Cache en memoria thread-safe con acceso O(1)
  • Reconexion automatica con backoff exponencial y jitter
  • API sincrona -- se usa desde codigo Python normal, sin async/await
  • Observabilidad -- estado de streams, metricas, deteccion de degradacion
  • Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
  • Shutdown limpio -- sin tareas pendientes ni warnings
  • Context manager -- soporte with para manejo automatico del ciclo de vida
  • GC-safe -- si se pierde la referencia sin llamar a stop(), el garbage collector limpia automaticamente (ideal para notebooks)
  • Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs

Instalacion

pip install binflow

O con dependencias de desarrollo:

pip install -e ".[dev]"

Requisitos

  • Python >= 3.11
  • websockets >= 13.0
  • panzer >= 2.4.0 (rate limiting, peticiones REST a Binance, liquidaciones)

Inicio rapido

import time
from market_streaming import Manager, Config

# Cada manager = un mercado + un tipo de stream + N simbolos
config = Config(
    market="spot",
    stream_type="trade",
    symbols=["BTCUSDT", "ETHUSDT"],
    max_history=500,
    log_level="info",
)

with Manager(config) as manager:
    time.sleep(5)

    # Un simbolo: devuelve Trade o None
    trade = manager.get("BTCUSDT")
    if trade:
        print(f"{trade.price} x {trade.quantity}")

    # Todos los simbolos: devuelve dict[str, Trade]
    all_trades = manager.get()

    # Historial reciente
    recent = manager.get_recent("BTCUSDT", limit=50)

    # Anadir simbolos en caliente
    manager.add_symbols(["SOLUSDT", "ADAUSDT"])

No es necesario importar Market, StreamType ni logging; todos los campos aceptan strings.

Multiples managers

Para diferentes tipos de datos, se usa un manager por tipo:

from market_streaming import Manager, Config

symbols = ["BTCUSDT", "ETHUSDT"]

trades = Manager(Config(
    market="spot", stream_type="trade", symbols=symbols,
    max_history=1000,
))
books = Manager(Config(
    market="spot", stream_type="depth", symbols=symbols,
    depth=200,
))

trades.start()
books.start()

# Ver todas las instancias en memoria
Manager.dashboard()

Order books profundos y sincronizados

Para profundidades > 20, el sistema usa automaticamente el diff depth stream de Binance con sincronizacion REST:

  1. Conecta al diff depth stream y buferea mensajes
  2. Pide un snapshot REST completo via panzer (con rate limiting automatico)
  3. Filtra los diffs anteriores al snapshot
  4. Aplica los diffs posteriores sobre el snapshot
  5. Marca el order book como is_synced=True
config = Config(
    stream_type="depth",
    symbols=["BTCUSDT", "ETHUSDT"],
    depth=200,
)

with Manager(config) as manager:
    time.sleep(10)

    book = manager.get("BTCUSDT")
    print(f"Niveles: {len(book.bids)} bids, {len(book.asks)} asks")
    print(f"Sincronizado: {book.is_synced}")

Para streams masivos (cientos de simbolos), panzer gestiona las peticiones REST en paralelo con bulk_depth(), respetando los rate limits de Binance.

Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos cada 100ms, sin necesidad de REST sync). Cualquier otro valor usa el Diff Depth Stream (diffs cada 100ms) con sincronizacion REST.

Gestion dinamica de simbolos

Se pueden anadir o quitar simbolos sin recrear la conexion WebSocket:

with Manager(config) as manager:
    manager.add_symbols(["SOLUSDT", "ADAUSDT"])
    manager.remove_symbols("ETHUSDT")

Reconfiguracion en caliente

Se pueden cambiar parametros del stream sin detenerlo ni perder datos:

with Manager(config) as manager:
    time.sleep(5)

    # Cambiar tamano del buffer de trades/aggTrades/liquidaciones
    manager.reconfigure(max_history=2000)

    # Cambiar profundidad del order book
    manager.reconfigure(depth=500)

    # Cambiar intervalo de klines
    manager.reconfigure(interval="5m")

Cada stream_type acepta solo sus parametros relevantes:

stream_type Parametro reconfigurable Efecto
"trade", "agg_trade", "force_order" max_history Redimensiona el deque. Si se reduce, los datos mas antiguos se descartan
"depth" depth Cambia niveles del book. Si se reduce, trunca. Si se aumenta, resincroniza via REST
"kline" interval Cambia el intervalo de vela (resubscribe al nuevo stream)

No se pueden cambiar market, stream_type ni symbols (definen la identidad del Manager; usar add_symbols/remove_symbols para simbolos).

Futuros USD-M y COIN-M

config = Config(market="futures_usd", stream_type="trade", symbols=["BTCUSDT"], max_history=500)
# COIN-M: market="futures_coin", pares tipo BTCUSD_PERP

Liquidaciones (solo futuros)

config = Config(
    market="futures_usd",
    stream_type="force_order",   # o "liquidation"
    symbols=["BTCUSDT", "ETHUSDT"],
    max_history=500,
)

with Manager(config) as mgr:
    time.sleep(10)

    # Ultima liquidacion de un simbolo
    liq = mgr.get("BTCUSDT")
    if liq:
        print(f"{liq.symbol} {liq.side} {liq.price} x {liq.original_qty} ({liq.status})")

    # Historial reciente
    recent = mgr.get_recent("BTCUSDT", limit=50)

Solo disponible en market="futures_usd" y market="futures_coin". En spot lanza ValueError.

API publica

Manager

Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.

Ciclo de vida

Metodo Descripcion
start() Arranca el motor en un thread dedicado
stop() Detiene todos los streams y libera recursos

Soporta context manager (with Manager(config) as mgr:).

Gestion de simbolos

Metodo Descripcion
add_symbols(symbols, batch_size=200) Anade simbolos (str o lista, case-insensitive)
remove_symbols(symbols) Elimina simbolos del manager

Los simbolos se pueden registrar antes de start(). Se arrancan automaticamente al iniciar.

Reconfiguracion en caliente

Metodo Descripcion
reconfigure(max_history=N) Redimensiona el buffer de trades/aggTrades/liquidaciones sin detener el stream
reconfigure(depth=N) Cambia la profundidad del order book (trunca o resincroniza via REST)
reconfigure(interval="5m") Cambia el intervalo de klines (resubscribe al nuevo stream)

No se pueden cambiar market, stream_type ni symbols via reconfigure().

Dashboard

Metodo Descripcion
Manager.dashboard() Muestra todas las instancias en memoria

Consulta de datos

Metodo Descripcion
get(symbol) Devuelve el dato actual del simbolo, o None si no hay datos
get(symbols_list) Devuelve dict[str, dato] con los simbolos que tienen datos
get() Devuelve dict[str, dato] con todos los simbolos en cache
get_recent(symbol, limit=100) Historial reciente como lista

El tipo de retorno depende del stream_type del manager:

stream_type .get(symbol) .get_recent()
"trade" Trade list[Trade] (deque, hasta limit)
"agg_trade" AggTrade list[AggTrade] (deque, hasta limit)
"depth" OrderBookSnapshot [OrderBookSnapshot]
"kline" Kline [Kline]
"ticker" Ticker [Ticker]
"mini_ticker" MiniTicker [MiniTicker]
"book_ticker" BookTicker [BookTicker]
"force_order" Liquidation list[Liquidation] (deque, hasta limit)

Para trade, agg_trade y force_order, .get_recent() devuelve el historial del deque (hasta limit elementos). Para el resto de tipos, devuelve una lista con el dato actual si existe, o lista vacia.

Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.

Lectura no destructiva: .get() y .get_recent() son de solo lectura. No consumen, borran ni alteran los datos del cache. Llamar a estos metodos cualquier numero de veces devuelve siempre el mismo estado (o uno mas reciente si llego un mensaje nuevo del WebSocket entre llamadas). Los datos solo cambian por dos motivos:

  1. Llegada de datos nuevos desde el WebSocket (el dato mas reciente se sobreescribe; en deques, los mas antiguos se descartan al llenarse).
  2. Reconfiguracion via reconfigure() (por ejemplo, reducir max_history descarta los elementos mas antiguos que no caben en el nuevo tamano).

Para order books, .get() devuelve una copia del snapshot actual, por lo que modificar el objeto devuelto no afecta al cache interno.

Observabilidad

Metodo Descripcion
get_stream_status(stream_id) Estado de un stream individual
get_all_streams_status() Estado de todos los streams
get_health() Diagnostico de salud global (SystemHealth)
get_stats() Estadisticas resumidas (dict)
is_healthy() True si todos los streams estan sanos
print_summary() Imprime resumen por logging

Modelos de datos

Trade

@dataclass(frozen=True)
class Trade:
    symbol: str          # par de trading (ej: "BTCUSDT")
    trade_id: int        # ID unico del trade en Binance
    price: float         # precio de ejecucion
    quantity: float      # cantidad ejecutada
    quote_quantity: float # cantidad en moneda cotizada
    buyer_maker: bool    # True si el comprador es el maker (trade = venta)
    timestamp_ms: int    # timestamp del trade en milisegundos
    price_str: str       # precio como cadena original (precision completa)
    quantity_str: str    # cantidad como cadena original
    quote_quantity_str: str
    is_best_match: bool  # solo Spot
    event_time_ms: int   # timestamp del evento WebSocket

Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.

OrderBookSnapshot

@dataclass
class OrderBookSnapshot:
    symbol: str
    bids: list[OrderBookLevel]   # ordenados precio descendente (mejor bid primero)
    asks: list[OrderBookLevel]   # ordenados precio ascendente (mejor ask primero)
    last_update_id: int          # ID de la ultima actualizacion aplicada
    timestamp: float             # time.time() de la ultima actualizacion
    event_time_ms: int
    transaction_time_ms: int
    is_synced: bool              # True si paso por REST snapshot + diffs (book fiable)

is_synced indica si el order book fue sincronizado correctamente con un REST snapshot. Un book con is_synced=False solo contiene diffs parciales y puede estar incompleto. Tras la sincronizacion REST, is_synced pasa a True y se preserva en actualizaciones posteriores.

OrderBookLevel

@dataclass(frozen=True)
class OrderBookLevel:
    price: float
    quantity: float
    price_str: str       # cadena original de Binance
    quantity_str: str

Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.

AggTrade

@dataclass(frozen=True)
class AggTrade:
    symbol: str
    agg_trade_id: int      # ID del trade agregado
    price: float
    quantity: float
    first_trade_id: int    # rango de trades individuales incluidos
    last_trade_id: int
    timestamp_ms: int
    buyer_maker: bool
    price_str: str
    quantity_str: str
    event_time_ms: int

Agrupa uno o mas trades individuales ejecutados al mismo precio y en la misma orden. Incluye to_rest_format() compatible con GET /api/v3/aggTrades.

Kline

@dataclass(frozen=True)
class Kline:
    symbol: str
    interval: str          # "1m", "5m", "1h", "1d", etc.
    open_time_ms: int
    close_time_ms: int
    open: float
    close: float
    high: float
    low: float
    volume: float
    quote_volume: float
    trades: int
    taker_buy_volume: float
    taker_buy_quote_volume: float
    is_closed: bool        # True cuando la vela es definitiva
    # + campos _str para precision original

Se actualiza en tiempo real mientras la vela esta abierta. Cuando is_closed=True, la vela es definitiva. Incluye to_rest_format() compatible con GET /api/v3/klines.

Ticker

@dataclass(frozen=True)
class Ticker:
    symbol: str
    price_change: float
    price_change_pct: float
    weighted_avg_price: float
    last_price: float
    best_bid_price: float
    best_ask_price: float
    open_price: float
    high_price: float
    low_price: float
    volume: float
    quote_volume: float
    trade_count: int
    # + 15 campos mas (prev_close, quantities, times, trade IDs, strings)

Estadisticas completas de 24h rolling. Incluye to_rest_format() compatible con GET /api/v3/ticker/24hr.

MiniTicker

@dataclass(frozen=True)
class MiniTicker:
    symbol: str
    close_price: float
    open_price: float
    high_price: float
    low_price: float
    volume: float
    quote_volume: float
    # + campos _str

Version ligera de Ticker con solo OHLC y volumen.

BookTicker

@dataclass(frozen=True)
class BookTicker:
    symbol: str
    update_id: int
    best_bid_price: float
    best_bid_quantity: float
    best_ask_price: float
    best_ask_quantity: float
    # + campos _str

Mejor bid/ask en tiempo real. El stream mas rapido para obtener spread. Incluye to_rest_format() compatible con GET /api/v3/ticker/bookTicker.

Liquidation

@dataclass(frozen=True)
class Liquidation:
    symbol: str
    side: str              # "SELL" (long liquidado) o "BUY" (short liquidado)
    order_type: str        # normalmente "LIMIT"
    time_in_force: str     # normalmente "IOC"
    original_qty: float
    price: float
    average_price: float
    status: str            # "NEW", "PARTIALLY_FILLED", "FILLED"
    last_filled_qty: float
    filled_qty: float
    trade_time_ms: int
    event_time_ms: int
    # + campos _str para precision original

Orden de liquidacion forzada en Binance Futures. Solo disponible en mercados de futuros (USD-M y COIN-M). Incluye to_rest_format() compatible con GET /fapi/v1/forceOrders.

StreamState

Estados posibles de un stream:

Estado Significado
starting Iniciando conexion
running Conectado y recibiendo datos
reconnecting Reconectando tras un error
stale Conectado pero sin datos recientes
stopped Detenido de forma limpia
failed Error fatal, no reconectable

SystemHealth

@dataclass
class SystemHealth:
    is_healthy: bool             # True si todos los streams estan sanos
    total_streams: int
    running_streams: int
    stale_streams: int
    reconnecting_streams: int
    stopped_streams: int
    uptime_s: float              # segundos desde el arranque
    total_messages: int          # total de mensajes procesados
    stream_details: dict[str, StreamStatus]

Configuracion

Toda la configuracion se centraliza en Config. Todos los campos tipo enum aceptan strings:

from market_streaming import Config

config = Config(
    # Mercado y stream
    market="spot",              # "spot" | "futures_usd" | "futures_coin"
    stream_type="trade",        # "trade" | "depth" | "agg_trade" | "kline" | "ticker" | "mini_ticker" | "book_ticker" | "force_order"
    symbols=["BTCUSDT"],        # case-insensitive, se normalizan a mayusculas
    interval="1m",              # solo para stream_type="kline"

    # Cache (obligatorios segun stream_type)
    max_history=1000,           # obligatorio para "trade" y "agg_trade": trades recientes en cache
    depth=20,                   # obligatorio para "depth": niveles por lado (depth=20 = 20 bids + 20 asks)

    # Reconexion
    reconnect_base_delay_s=1.0,
    reconnect_max_delay_s=60.0,
    max_reconnect_attempts=0,      # 0 = sin limite

    # Logging
    log_level="info",              # "debug" | "info" | "warning" | "error" | "critical"
    log_dir="logs",
)

Campos obligatorios segun tipo de stream

stream_type Campo obligatorio Descripcion
"trade" max_history Cuantos trades recientes mantener en cache
"agg_trade" max_history Cuantos aggTrades recientes mantener en cache
"depth" depth Niveles por lado del order book (20 = 20 bids + 20 asks)
"kline" -- Usa interval (default "1m")
"ticker" -- Sin campos adicionales
"mini_ticker" -- Sin campos adicionales
"book_ticker" -- Sin campos adicionales
"force_order" max_history Cuantas liquidaciones recientes mantener. Solo futuros

Mercados soportados

Market URL WebSocket Ejemplo de par
"spot" wss://stream.binance.com:9443/ws BTCUSDT
"futures_usd" wss://fstream.binance.com/ws BTCUSDT
"futures_coin" wss://dstream.binance.com/ws BTCUSD_PERP

Logging

El sistema genera dos archivos de log con rotacion automatica:

Archivo Nivel Uso
binflow.log DEBUG+ Analisis completo, debug, tracing
binflow.error.log WARNING+ Deteccion rapida de fallos e interrupciones

Rotacion y limites de disco

  • Cada archivo rota al alcanzar log_max_bytes (default: 10 MB)
  • Se mantienen como maximo log_backup_count archivos rotados (default: 5)
  • Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
  • Los archivos rotados se nombran binflow.log.1, binflow.log.2, etc.

Formato de archivo

2026-03-06 20:15:32 | INFO     | market_streaming.worker:_connect_and_consume:178 | Stream trade_0: conexion establecida
2026-03-06 20:15:33 | WARNING  | market_streaming.worker:_run:123 | Error en stream depth_0 (intento 1): [Errno 111] Connection refused

Configuracion de logging

from market_streaming import Config

# Produccion: solo errores en consola, todo en disco
config = Config(stream_type="ticker", log_level="error", log_dir="logs")

# Desarrollo: verbose en consola, sin disco
config = Config(stream_type="ticker", log_level="debug", log_dir="")

# Directorio personalizado
config = Config(stream_type="ticker", log_dir="/var/log/binflow")

Arquitectura

Manager                       1 manager = 1 mercado + 1 tipo de stream
    |
    +-- threading.Thread       Thread dedicado con event loop asyncio
    |       |
    |       +-- Worker (trade_0)    hasta 200 simbolos / conexion
    |       +-- Worker (trade_1)    siguiente batch si > 200
    |       +-- ...
    |
    +-- BinancePublicClient    REST client (panzer) con rate limiting automatico
    |
    +-- MarketDataCache        Cache en memoria (threading.Lock, O(1))
    |
    +-- HealthMonitor          Evaluacion de salud
    |
    +-- Logging                RotatingFileHandler (general + errores)

Los workers usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance, agrupando hasta 200 simbolos por conexion WebSocket. Soportan SUBSCRIBE/UNSUBSCRIBE dinamico sin recrear la conexion.

Modelo de concurrencia

El sistema usa un thread dedicado con event loop asyncio:

  • API publica sincrona: el codigo cliente llama manager.start(), manager.get(), etc. sin await. Esto permite usar el motor desde cualquier contexto Python.
  • Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
  • Cache thread-safe: threading.Lock protege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).

Reconexion

Cada worker gestiona su propia reconexion:

  1. Error de conexion o desconexion detectada
  2. Backoff exponencial: min(base * 2^n, max_delay)
  3. Jitter aleatorio de +-20% sobre el delay calculado
  4. Contador de reconexiones incrementado
  5. Si se supera max_reconnect_attempts (y no es 0), el stream pasa a failed
  6. Errores fatales (ej: stream invalido) no reconectan

Depth streams y sincronizacion

Profundidad Stream Sincronizacion
5, 10, 20 symbol@depth{N}@100ms (Partial Book Depth) Snapshots completos cada 100ms, no requiere REST
Cualquier otro valor symbol@depth@100ms (Diff Depth) REST snapshot + diffs incrementales cada 100ms

Para el diff depth stream, el flujo de sincronizacion sigue el protocolo oficial de Binance:

  1. Conectar al diff depth stream y almacenar los mensajes en un buffer
  2. Pedir snapshot REST via panzer.BinancePublicClient.depth() (single) o bulk_depth() (masivo)
  3. Filtrar diffs: descartar los anteriores al lastUpdateId del snapshot (reglas Spot vs Futures)
  4. Aplicar el buffer filtrado sobre el snapshot
  5. Marcar is_synced=True y continuar con actualizaciones en tiempo real

El cache protege contra regresiones: si un snapshot REST llega con un lastUpdateId menor al que ya tiene el book (porque los diffs ya avanzaron), se rechaza. La excepcion es cuando el snapshot viene con is_synced=True y el book existente no esta sincronizado.

Cache en memoria

El cache es un repositorio permanente y en memoria que se actualiza continuamente. Los metodos .get() y .get_recent() son de solo lectura: no consumen ni borran datos. Puedes consultarlos tantas veces como quieras sin afectar el estado interno.

Dato Estructura Comportamiento
Ultimo trade dict[symbol, Trade] Se sobreescribe con cada trade nuevo
Trades recientes dict[symbol, deque(maxlen=max_history)] FIFO: los mas antiguos se descartan al llegar nuevos
Order book dict[symbol, OrderBookSnapshot] Se actualiza in-place con diffs; .get() devuelve copia
Ultimo aggTrade dict[symbol, AggTrade] Se sobreescribe con cada aggTrade nuevo
AggTrades recientes dict[symbol, deque(maxlen=max_history)] FIFO: igual que trades
Kline dict[symbol, dict[interval, Kline]] Se sobreescribe con cada actualizacion de vela
Ticker 24h dict[symbol, Ticker] Se sobreescribe con cada actualizacion
MiniTicker dict[symbol, MiniTicker] Se sobreescribe con cada actualizacion
BookTicker dict[symbol, BookTicker] Se sobreescribe con cada actualizacion
Ultima liquidacion dict[symbol, Liquidation] Se sobreescribe con cada liquidacion nueva
Liquidaciones recientes dict[symbol, deque(maxlen=max_history)] FIFO: igual que trades

El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero.

Proteccion anti-regresion: update_order_book() rechaza snapshots con lastUpdateId menor al existente, excepto cuando el snapshot nuevo esta sincronizado (is_synced=True) y el existente no. Esto protege contra respuestas REST que llegan tarde cuando los diffs del WebSocket ya avanzaron el estado del book.

Estructura del proyecto

market_streaming/
    __init__.py          Re-exports de la API publica
    config.py            Config, Market, MARKET_URLS
    models.py            Trade, AggTrade, Kline, Ticker, MiniTicker, BookTicker, Liquidation, OrderBook*, Stream*
    exceptions.py        Excepciones del dominio
    cache.py             MarketDataCache
    worker.py            Worker (combined streams, hasta 200 simbolos/conexion)
    health.py            HealthMonitor
    manager.py           Manager
    logging_config.py    Setup de logging con rotacion
tests/
    test_manager.py      84 tests - lifecycle, get/get_recent, symbols, stats, GC safety, config validation
    test_worker.py       69 tests - URLs, parsing, backoff, builders, dispatch, diff buffering, depth speed
    test_cache.py        62 tests - trades, order book, aggTrade, kline, ticker, liquidation, remove_symbol, invariantes
    test_models.py       44 tests - inmutabilidad, defaults, config, todos los modelos
    test_logging.py      13 tests - handlers, rotacion, archivos, teardown
    test_health.py        8 tests - deteccion stale, salud global
    test_cache_concurrent.py  3 tests - lectura/escritura concurrente multi-thread
    test_load.py          8 tests - carga: 728 symbols trades, 60s
    test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos

Tests

# Ejecutar todos los tests unitarios (279 tests, 300 con carga)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v

# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s              # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s    # 728 symbols, depth 200, 60s

# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v

Pruebas de carga

Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:

  • Continuidad: trade IDs consecutivos sin huecos
  • Unicidad: cero duplicados
  • Orden: IDs estrictamente crecientes
  • Timestamps: no decrecientes
  • Profundidad: order books de hasta 200 niveles por lado
  • Integridad estructural: bids descendentes, asks ascendentes, spread positivo
  • Precision: cadenas originales de Binance preservadas
  • Estabilidad: todos los workers running, cero reconexiones

Limitaciones

  • No persiste datos en disco. Si el proceso muere, se pierde el cache.
  • Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
  • No implementa autenticacion para streams privados (user data streams).
  • Liquidaciones (force_order) solo disponibles en futuros; Binance Vision no publica datos historicos de liquidaciones.
  • No implementa throttle de conexiones WebSocket (max 5/s de Binance). Con muchos workers arrancando simultaneamente, existe riesgo de alcanzar el limite.

Licencia

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

binflow-0.6.0.tar.gz (74.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

binflow-0.6.0-py3-none-any.whl (41.5 kB view details)

Uploaded Python 3

File details

Details for the file binflow-0.6.0.tar.gz.

File metadata

  • Download URL: binflow-0.6.0.tar.gz
  • Upload date:
  • Size: 74.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.6.0.tar.gz
Algorithm Hash digest
SHA256 d38c697005d19615cc1b3a44f33859c63b492ebd30dbed0a27a23d46bde53b82
MD5 247e8a6dd9e43b0f42a7f99b707e0a99
BLAKE2b-256 db299ce482d0d3af11051ac6953c72ffe6e075d5c0a85856f61b32e9eb0eb94c

See more details on using hashes here.

File details

Details for the file binflow-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: binflow-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 41.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fb7253c3ccec1c6651ae47fe7f2655ad53a062bfbbdbf09e90158d935d4f21ed
MD5 856c39df142260f82842729f99a829c0
BLAKE2b-256 9d8e258b4b09f985ef78bb1987ad3eedcf45fb464f54d0a60fb9ef1a7d78d123

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page