Skip to main content

Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)

Project description

binflow

Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.

Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.

Filosofia

binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.

Lo que SI hace binflow:

  • Mantener conexiones WebSocket vivas y reconectar automaticamente
  • Garantizar continuidad del dato (sin gaps en trade IDs)
  • Servir datos normalizados, listos para consumir
  • Escalar a cientos de simbolos con eficiencia (combined streams)
  • Order books correctos e integros en memoria

Lo que NO hace binflow (responsabilidad del consumidor):

  • Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
  • Callbacks o event-driven processing
  • Metricas y monitoring (Prometheus, Grafana)
  • Logica de trading o senales

Otra libreria o servicio debe consumir de binflow via polling (get_recent_trades, get_order_book, get_kline, etc.) y encargarse del resto.

Caracteristicas

  • Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
  • 7 tipos de stream -- trades, order book, aggTrades, klines, ticker 24h, miniTicker, bookTicker
  • Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
  • Gestion dinamica -- subscribe/unsubscribe de simbolos en combined streams sin recrear la conexion
  • Order books sincronizados -- REST snapshot + diffs por WebSocket, con campo is_synced que indica integridad verificada
  • Order books profundos -- hasta 5000 niveles (Spot) o 1000 (Futures) por lado
  • Rate limiting automatico -- delegado a panzer, respeta cabeceras X-MBX-USED-WEIGHT de Binance
  • Anti-regresion -- el cache rechaza snapshots con lastUpdateId anterior al existente, protegiendo contra datos stale
  • Cache en memoria thread-safe con acceso O(1)
  • Reconexion automatica con backoff exponencial y jitter
  • API sincrona -- se usa desde codigo Python normal, sin async/await
  • Observabilidad -- estado de streams, metricas, deteccion de degradacion
  • Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
  • Shutdown limpio -- sin tareas pendientes ni warnings
  • Context manager -- soporte with para manejo automatico del ciclo de vida
  • Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs

Instalacion

pip install -e .

O con dependencias de desarrollo:

pip install -e ".[dev]"

Requisitos

  • Python >= 3.11
  • websockets >= 13.0
  • panzer >= 2.3.0 (rate limiting y peticiones REST a Binance)

Inicio rapido

import time
from market_streaming import MarketStreamManager, StreamConfig

config = StreamConfig(
    max_recent_trades=500,
    default_order_book_depth=10,
)

with MarketStreamManager(config) as manager:
    manager.add_trade_stream("BTCUSDT")
    manager.add_order_book_stream("BTCUSDT", depth=10)

    time.sleep(5)  # esperar a que lleguen datos

    # Ultimo trade
    trade = manager.get_last_trade("BTCUSDT")
    if trade:
        print(f"{trade.price} x {trade.quantity}")

    # Order book (sincronizado via REST + WebSocket)
    book = manager.get_order_book("BTCUSDT")
    if book:
        print(f"Mejor bid: {book.bids[0].price}")
        print(f"Mejor ask: {book.asks[0].price}")
        print(f"Sincronizado: {book.is_synced}")

    # Klines, tickers, bookTicker...
    manager.add_kline_stream("BTCUSDT", interval="1m")
    manager.add_ticker_stream("BTCUSDT")
    manager.add_book_ticker_stream("BTCUSDT")

    time.sleep(3)

    kline = manager.get_kline("BTCUSDT", "1m")
    if kline:
        print(f"Vela 1m: O={kline.open} H={kline.high} L={kline.low} C={kline.close}")

    ticker = manager.get_ticker("BTCUSDT")
    if ticker:
        print(f"24h: {ticker.price_change_pct:.2f}% vol={ticker.volume:.2f}")

    bt = manager.get_book_ticker("BTCUSDT")
    if bt:
        print(f"Spread: {bt.best_ask_price - bt.best_bid_price:.2f}")

    # Salud del sistema
    print(manager.is_healthy())

Streams masivos (cientos de simbolos)

Para suscribirse a muchos simbolos a la vez, usar add_trade_streams() o add_order_book_streams(). Estos metodos multiplexan automaticamente los simbolos en conexiones combinadas (hasta 200 por WebSocket), evitando el rate limit de conexiones de Binance:

symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", ...]  # cientos de simbolos

with MarketStreamManager(config) as manager:
    # Registra todos los simbolos en pocas conexiones combinadas
    manager.add_trade_streams(symbols)

    time.sleep(10)

    for symbol in symbols:
        trade = manager.get_last_trade(symbol)
        if trade:
            print(f"{symbol}: {trade.price}")

Order books profundos y sincronizados

Para profundidades > 20, el sistema usa automaticamente el diff depth stream de Binance con sincronizacion REST:

  1. Conecta al diff depth stream y buferea mensajes
  2. Pide un snapshot REST completo via panzer (con rate limiting automatico)
  3. Filtra los diffs anteriores al snapshot
  4. Aplica los diffs posteriores sobre el snapshot
  5. Marca el order book como is_synced=True
config = StreamConfig(default_order_book_depth=200)

with MarketStreamManager(config) as manager:
    manager.add_order_book_streams(["BTCUSDT", "ETHUSDT"], depth=200)
    time.sleep(10)  # esperar a sincronizacion REST

    book = manager.get_order_book("BTCUSDT")
    print(f"Niveles: {len(book.bids)} bids, {len(book.asks)} asks")
    print(f"Sincronizado: {book.is_synced}")  # True tras REST sync

Para streams masivos (cientos de simbolos), panzer gestiona las peticiones REST en paralelo con bulk_depth(), respetando los rate limits de Binance.

Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos cada ~1s, sin necesidad de REST sync). Cualquier otro valor usa el Diff Depth Stream con sincronizacion REST.

Gestion dinamica de simbolos

En combined streams, se pueden anadir o quitar simbolos sin recrear la conexion WebSocket:

with MarketStreamManager(config) as manager:
    ids = manager.add_trade_streams(["BTCUSDT", "ETHUSDT"])

    time.sleep(5)

    # Anadir un simbolo al primer worker combinado
    manager.subscribe_symbol(ids[0], "SOLUSDT")

    # Quitar un simbolo
    manager.unsubscribe_symbol(ids[0], "ETHUSDT")

Futuros USD-M

from market_streaming import MarketStreamManager, StreamConfig, Market

config = StreamConfig(market=Market.FUTURES_USD)

with MarketStreamManager(config) as manager:
    manager.add_trade_stream("BTCUSDT")
    manager.add_order_book_stream("BTCUSDT")
    # ...

Futuros COIN-M

from market_streaming import StreamConfig, Market

config = StreamConfig(market=Market.FUTURES_COIN)
# Los pares de COIN-M usan formato distinto (ej: BTCUSD_PERP)

API publica

MarketStreamManager

Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.

Ciclo de vida

Metodo Descripcion
start() Arranca el motor en un thread dedicado
stop() Detiene todos los streams y libera recursos

Soporta context manager (with MarketStreamManager(config) as mgr:).

Registro de streams individuales

Metodo Descripcion
add_trade_stream(symbol) Trades individuales
add_order_book_stream(symbol, depth=20) Order book (snapshot o diff depth)
add_agg_trade_stream(symbol) Trades agregados (menor volumen de mensajes)
add_kline_stream(symbol, interval="1m") Velas/candlesticks en tiempo real
add_ticker_stream(symbol) Estadisticas 24h completas (OHLC, volumen, bid/ask)
add_mini_ticker_stream(symbol) Estadisticas 24h reducidas (OHLC, volumen)
add_book_ticker_stream(symbol) Mejor bid/ask en tiempo real (minima latencia)
remove_stream(stream_id) Detiene y elimina un stream

Cada metodo devuelve un stream_id (ej: trade_BTCUSDT, kline_BTCUSDT_1m, ticker_BTCUSDT).

Registro de streams masivos (combinados)

Metodo Descripcion
add_trade_streams(symbols, batch_size=200) Trades para multiples simbolos
add_order_book_streams(symbols, depth=None, batch_size=200) Order books para multiples simbolos
add_agg_trade_streams(symbols, batch_size=200) AggTrades para multiples simbolos
add_kline_streams(symbols, interval="1m", batch_size=200) Klines para multiples simbolos
add_ticker_streams(symbols, batch_size=200) Tickers 24h para multiples simbolos
add_mini_ticker_streams(symbols, batch_size=200) MiniTickers para multiples simbolos
add_book_ticker_streams(symbols, batch_size=200) BookTickers para multiples simbolos

Estos metodos crean CombinedStreamWorkers que multiplexan hasta batch_size simbolos por conexion WebSocket usando el endpoint /stream?streams=... de Binance. Devuelven una lista de worker IDs.

Gestion dinamica de simbolos

Metodo Descripcion
subscribe_symbol(stream_id, symbol) Anade un simbolo a un combined stream existente
unsubscribe_symbol(stream_id, symbol) Quita un simbolo de un combined stream existente

Estas operaciones envian SUBSCRIBE/UNSUBSCRIBE por el WebSocket existente, sin recrear la conexion.

Consulta de datos

Metodo Retorno
get_last_trade(symbol) Trade o None
get_recent_trades(symbol, limit=100) list[Trade]
get_order_book(symbol) OrderBookSnapshot o None
get_last_agg_trade(symbol) AggTrade o None
get_recent_agg_trades(symbol, limit=100) list[AggTrade]
get_kline(symbol, interval="1m") Kline o None
get_ticker(symbol) Ticker o None
get_mini_ticker(symbol) MiniTicker o None
get_book_ticker(symbol) BookTicker o None

Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.

Observabilidad

Metodo Descripcion
get_stream_status(stream_id) Estado de un stream individual
get_all_streams_status() Estado de todos los streams
get_health() Diagnostico de salud global (SystemHealth)
get_stats() Estadisticas resumidas (dict)
is_healthy() True si todos los streams estan sanos
print_summary() Imprime resumen por logging

Modelos de datos

Trade

@dataclass(frozen=True)
class Trade:
    symbol: str          # par de trading (ej: "BTCUSDT")
    trade_id: int        # ID unico del trade en Binance
    price: float         # precio de ejecucion
    quantity: float      # cantidad ejecutada
    quote_quantity: float # cantidad en moneda cotizada
    buyer_maker: bool    # True si el comprador es el maker (trade = venta)
    timestamp_ms: int    # timestamp del trade en milisegundos
    price_str: str       # precio como cadena original (precision completa)
    quantity_str: str    # cantidad como cadena original
    quote_quantity_str: str
    is_best_match: bool  # solo Spot
    event_time_ms: int   # timestamp del evento WebSocket

Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.

OrderBookSnapshot

@dataclass
class OrderBookSnapshot:
    symbol: str
    bids: list[OrderBookLevel]   # ordenados precio descendente (mejor bid primero)
    asks: list[OrderBookLevel]   # ordenados precio ascendente (mejor ask primero)
    last_update_id: int          # ID de la ultima actualizacion aplicada
    timestamp: float             # time.time() de la ultima actualizacion
    event_time_ms: int
    transaction_time_ms: int
    is_synced: bool              # True si paso por REST snapshot + diffs (book fiable)

is_synced indica si el order book fue sincronizado correctamente con un REST snapshot. Un book con is_synced=False solo contiene diffs parciales y puede estar incompleto. Tras la sincronizacion REST, is_synced pasa a True y se preserva en actualizaciones posteriores.

OrderBookLevel

@dataclass(frozen=True)
class OrderBookLevel:
    price: float
    quantity: float
    price_str: str       # cadena original de Binance
    quantity_str: str

Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.

AggTrade

@dataclass(frozen=True)
class AggTrade:
    symbol: str
    agg_trade_id: int      # ID del trade agregado
    price: float
    quantity: float
    first_trade_id: int    # rango de trades individuales incluidos
    last_trade_id: int
    timestamp_ms: int
    buyer_maker: bool
    price_str: str
    quantity_str: str
    event_time_ms: int

Agrupa uno o mas trades individuales ejecutados al mismo precio y en la misma orden. Incluye to_rest_format() compatible con GET /api/v3/aggTrades.

Kline

@dataclass(frozen=True)
class Kline:
    symbol: str
    interval: str          # "1m", "5m", "1h", "1d", etc.
    open_time_ms: int
    close_time_ms: int
    open: float
    close: float
    high: float
    low: float
    volume: float
    quote_volume: float
    trades: int
    taker_buy_volume: float
    taker_buy_quote_volume: float
    is_closed: bool        # True cuando la vela es definitiva
    # + campos _str para precision original

Se actualiza en tiempo real mientras la vela esta abierta. Cuando is_closed=True, la vela es definitiva. Incluye to_rest_format() compatible con GET /api/v3/klines.

Ticker

@dataclass(frozen=True)
class Ticker:
    symbol: str
    price_change: float
    price_change_pct: float
    weighted_avg_price: float
    last_price: float
    best_bid_price: float
    best_ask_price: float
    open_price: float
    high_price: float
    low_price: float
    volume: float
    quote_volume: float
    trade_count: int
    # + 15 campos mas (prev_close, quantities, times, trade IDs, strings)

Estadisticas completas de 24h rolling. Incluye to_rest_format() compatible con GET /api/v3/ticker/24hr.

MiniTicker

@dataclass(frozen=True)
class MiniTicker:
    symbol: str
    close_price: float
    open_price: float
    high_price: float
    low_price: float
    volume: float
    quote_volume: float
    # + campos _str

Version ligera de Ticker con solo OHLC y volumen.

BookTicker

@dataclass(frozen=True)
class BookTicker:
    symbol: str
    update_id: int
    best_bid_price: float
    best_bid_quantity: float
    best_ask_price: float
    best_ask_quantity: float
    # + campos _str

Mejor bid/ask en tiempo real. El stream mas rapido para obtener spread. Incluye to_rest_format() compatible con GET /api/v3/ticker/bookTicker.

StreamState

Estados posibles de un stream:

Estado Significado
starting Iniciando conexion
running Conectado y recibiendo datos
reconnecting Reconectando tras un error
stale Conectado pero sin datos recientes
stopped Detenido de forma limpia
failed Error fatal, no reconectable

SystemHealth

@dataclass
class SystemHealth:
    is_healthy: bool             # True si todos los streams estan sanos
    total_streams: int
    running_streams: int
    stale_streams: int
    reconnecting_streams: int
    stopped_streams: int
    uptime_s: float              # segundos desde el arranque
    total_messages: int          # total de mensajes procesados
    stream_details: dict[str, StreamStatus]

Configuracion

Toda la configuracion se centraliza en StreamConfig:

from market_streaming import StreamConfig, Market

config = StreamConfig(
    # Mercado (default: Spot)
    market=Market.SPOT,       # SPOT | FUTURES_USD | FUTURES_COIN
    # base_url="wss://...",   # si se pasa, tiene prioridad sobre market

    # Conexion
    ping_interval_s=20.0,
    ping_timeout_s=10.0,

    # Cache
    max_recent_trades=1000,
    default_order_book_depth=20,   # >20 usa diff depth stream

    # Reconexion
    reconnect_base_delay_s=1.0,
    reconnect_max_delay_s=60.0,
    reconnect_jitter_factor=0.2,
    max_reconnect_attempts=0,  # 0 = sin limite

    # Health
    stale_threshold_s=30.0,

    # Logging
    log_level=logging.INFO,
    log_dir="logs",
    log_max_bytes=10 * 1024 * 1024,  # 10 MB por archivo
    log_backup_count=5,               # max 5 archivos rotados
)

Mercados soportados

Market URL WebSocket Ejemplo de par
Market.SPOT wss://stream.binance.com:9443/ws BTCUSDT
Market.FUTURES_USD wss://fstream.binance.com/ws BTCUSDT
Market.FUTURES_COIN wss://dstream.binance.com/ws BTCUSD_PERP

Todos los campos tienen valores por defecto razonables. StreamConfig() sin argumentos conecta a Spot.

Logging

El sistema genera dos archivos de log con rotacion automatica:

Archivo Nivel Uso
binflow.log DEBUG+ Analisis completo, debug, tracing
binflow.error.log WARNING+ Deteccion rapida de fallos e interrupciones

Rotacion y limites de disco

  • Cada archivo rota al alcanzar log_max_bytes (default: 10 MB)
  • Se mantienen como maximo log_backup_count archivos rotados (default: 5)
  • Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
  • Los archivos rotados se nombran binflow.log.1, binflow.log.2, etc.

Formato de archivo

2026-03-06 20:15:32 | INFO     | market_streaming.worker:_connect_and_consume:178 | Stream trade_BTCUSDT: conexion establecida
2026-03-06 20:15:33 | WARNING  | market_streaming.worker:_run:123 | Error en stream depth_BTCUSDT (intento 1): [Errno 111] Connection refused

Configuracion de logging

import logging
from market_streaming import StreamConfig

# Produccion: solo errores en consola, todo en disco
config = StreamConfig(
    log_level=logging.ERROR,
    log_dir="logs",
    log_max_bytes=10 * 1024 * 1024,
    log_backup_count=5,
)

# Desarrollo: verbose en consola, sin disco
config = StreamConfig(
    log_level=logging.DEBUG,
    log_dir="",  # desactiva archivos en disco
)

# Directorio personalizado
config = StreamConfig(log_dir="/var/log/binflow")

Arquitectura

MarketStreamManager           API publica sincrona (fachada)
    |
    +-- threading.Thread       Thread dedicado con event loop asyncio
    |       |
    |       +-- WebSocketStreamWorker (trade_BTCUSDT)       1 simbolo / conexion
    |       +-- WebSocketStreamWorker (kline_BTCUSDT_1m)   1 simbolo / conexion
    |       +-- CombinedStreamWorker (combined_trades_0)   hasta 200 simbolos / conexion
    |       +-- CombinedStreamWorker (combined_tickers_0)  hasta 200 simbolos / conexion
    |       +-- ...                                        (7 tipos de stream soportados)
    |
    +-- BinancePublicClient    REST client (panzer) con rate limiting automatico
    |                          Usado por workers para sync de order books
    |
    +-- MarketDataCache        Cache en memoria (threading.Lock)
    |                          Anti-regresion: rechaza snapshots con ID anterior
    |
    +-- HealthMonitor          Evaluacion de salud
    |
    +-- Logging                RotatingFileHandler (general + errores)

Workers

Worker Uso Conexiones
WebSocketStreamWorker 1 simbolo por conexion (add_*_stream) 1 WS por stream
CombinedStreamWorker Hasta 200 simbolos por conexion (add_*_streams) 1 WS por batch

Ambos workers soportan los 7 tipos de stream. Los workers combinados usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance, desenvuelven el formato {stream, data}, y permiten SUBSCRIBE/UNSUBSCRIBE dinamico de simbolos sin recrear la conexion.

Modelo de concurrencia

El sistema usa un thread dedicado con event loop asyncio:

  • API publica sincrona: el codigo cliente llama manager.start(), manager.get_last_trade(), etc. sin await. Esto permite usar el motor desde cualquier contexto Python.
  • Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
  • Cache thread-safe: threading.Lock protege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).

Reconexion

Cada worker gestiona su propia reconexion:

  1. Error de conexion o desconexion detectada
  2. Backoff exponencial: min(base * 2^n, max_delay)
  3. Jitter aleatorio de +-20% sobre el delay calculado
  4. Contador de reconexiones incrementado
  5. Si se supera max_reconnect_attempts (y no es 0), el stream pasa a failed
  6. Errores fatales (ej: stream invalido) no reconectan

Depth streams y sincronizacion

Profundidad Stream Sincronizacion
5, 10, 20 symbol@depth{N} (Partial Book Depth) Snapshots completos cada ~1s, no requiere REST
Cualquier otro valor symbol@depth (Diff Depth) REST snapshot + diffs incrementales

Para el diff depth stream, el flujo de sincronizacion sigue el protocolo oficial de Binance:

  1. Conectar al diff depth stream y almacenar los mensajes en un buffer
  2. Pedir snapshot REST via panzer.BinancePublicClient.depth() (single) o bulk_depth() (masivo)
  3. Filtrar diffs: descartar los anteriores al lastUpdateId del snapshot (reglas Spot vs Futures)
  4. Aplicar el buffer filtrado sobre el snapshot
  5. Marcar is_synced=True y continuar con actualizaciones en tiempo real

El cache protege contra regresiones: si un snapshot REST llega con un lastUpdateId menor al que ya tiene el book (porque los diffs ya avanzaron), se rechaza. La excepcion es cuando el snapshot viene con is_synced=True y el book existente no esta sincronizado.

Cache en memoria

Dato Estructura Acceso
Ultimo trade por simbolo dict[symbol, Trade] O(1)
Trades recientes por simbolo dict[symbol, deque(maxlen=N)] O(1) amortizado
Order book por simbolo dict[symbol, OrderBookSnapshot] O(1)
Ultimo aggTrade por simbolo dict[symbol, AggTrade] O(1)
AggTrades recientes dict[symbol, deque(maxlen=N)] O(1) amortizado
Kline por simbolo e intervalo dict[symbol, dict[interval, Kline]] O(1)
Ticker 24h por simbolo dict[symbol, Ticker] O(1)
MiniTicker por simbolo dict[symbol, MiniTicker] O(1)
BookTicker por simbolo dict[symbol, BookTicker] O(1)

El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero. Cada worker puede especificar su propia profundidad maxima (max_depth), independiente del default global.

Proteccion anti-regresion: update_order_book() rechaza snapshots con lastUpdateId menor al existente, excepto cuando el snapshot nuevo esta sincronizado (is_synced=True) y el existente no. Esto protege contra respuestas REST que llegan tarde cuando los diffs del WebSocket ya avanzaron el estado del book.

Estructura del proyecto

market_streaming/
    __init__.py          Re-exports de la API publica
    config.py            StreamConfig, Market, MARKET_URLS
    models.py            Trade, AggTrade, Kline, Ticker, MiniTicker, BookTicker, OrderBook*, Stream*
    exceptions.py        Excepciones del dominio
    cache.py             MarketDataCache
    worker.py            WebSocketStreamWorker, CombinedStreamWorker
    health.py            HealthMonitor
    manager.py           MarketStreamManager
    logging_config.py    Setup de logging con rotacion
tests/
    test_cache.py        40 tests - trades, order book, aggTrade, kline, ticker, miniTicker, bookTicker
    test_cache_concurrent.py  3 tests - lectura/escritura concurrente multi-thread
    test_models.py       37 tests - inmutabilidad, defaults, config, todos los modelos nuevos
    test_health.py        8 tests - deteccion stale, salud global
    test_worker.py       54 tests - URLs, parsing, backoff, builders y dispatch de todos los tipos
    test_manager.py      12 tests - lifecycle, streams, observabilidad
    test_logging.py      13 tests - handlers, rotacion, archivos, teardown
    test_load.py          8 tests - carga: 728 symbols trades, 60s
    test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos
examples/
    inspect_trade.py     Estructura del Trade: campos, tipos, precision, formato REST
    inspect_order_book.py  OrderBookSnapshot: estructura, niveles, formato REST
    inspect_config.py    StreamConfig: campos, Market enum, URLs (sin conexion)
    inspect_health.py    SystemHealth, StreamStatus, StreamState
    live_trades.py       Feed de trades en tiempo real
    live_order_book.py   Order book visual con barras de volumen y spread
    multi_market.py      Tabla comparativa multi-simbolo en tiempo real
notebooks/
    streaming_demo.ipynb Demo completa: stream, inspeccion, acumulacion, verificacion

Ejemplos

Todos los ejemplos se ejecutan con python examples/<nombre>.py.

Inspeccion de datos (sin conexion)

python examples/inspect_trade.py        # estructura Trade
python examples/inspect_order_book.py   # estructura OrderBookSnapshot
python examples/inspect_config.py       # StreamConfig, Market, URLs
python examples/inspect_health.py       # SystemHealth, StreamStatus

Streams en vivo

python examples/live_trades.py          # feed de trades BTCUSDT en tiempo real
python examples/live_order_book.py      # order book visual con spread
python examples/multi_market.py         # tabla comparativa multi-simbolo

Notebook

jupyter notebook notebooks/streaming_demo.ipynb

Demo interactiva: arranque de trades y order book, inspeccion de estructuras, acumulacion de 2 minutos con verificacion de continuidad, order book sincronizado con spread, klines en tiempo real, tickers, bookTicker para spread, y shutdown limpio.

Tests

# Ejecutar todos los tests unitarios (169 tests)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v

# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s              # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s    # 728 symbols, depth 200, 60s

# Todos los tests (190 tests)
pytest tests/ -v -s

# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v

Pruebas de carga

Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:

  • Continuidad: trade IDs consecutivos sin huecos
  • Unicidad: cero duplicados
  • Orden: IDs estrictamente crecientes
  • Timestamps: no decrecientes
  • Profundidad: order books de hasta 200 niveles por lado
  • Integridad estructural: bids descendentes, asks ascendentes, spread positivo
  • Precision: cadenas originales de Binance preservadas
  • Estabilidad: todos los workers running, cero reconexiones

Limitaciones

  • No persiste datos en disco. Si el proceso muere, se pierde el cache.
  • Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
  • No implementa autenticacion para streams privados (user data streams).
  • No implementa throttle de conexiones WebSocket (max 5/s de Binance). Con muchos workers arrancando simultaneamente, existe riesgo de alcanzar el limite.

Licencia

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

binflow-0.3.0.tar.gz (51.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

binflow-0.3.0-py3-none-any.whl (34.5 kB view details)

Uploaded Python 3

File details

Details for the file binflow-0.3.0.tar.gz.

File metadata

  • Download URL: binflow-0.3.0.tar.gz
  • Upload date:
  • Size: 51.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c16c718b865b5c7564d64eb96a68ecf4d2421661dd9058367175eb94ed85c392
MD5 5eaace6ca89792d013b23d11b881aea7
BLAKE2b-256 6c6fa7bf67857dfceefda9326fd80989c0df7d297322f589bb6d17615f6c4957

See more details on using hashes here.

File details

Details for the file binflow-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: binflow-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 34.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7166116296c9c5df3a3898492f0fd6bfa43387c01c263f262d1f8e98680d956b
MD5 5a59eb9a3d17f4a3057d38fff2f211cc
BLAKE2b-256 7cb823daa69f99289b9e275e611aca3546ab842c24982877238d492e749093ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page