Skip to main content

Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)

Project description

binflow

Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.

Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.

Filosofia

binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.

Lo que SI hace binflow:

  • Mantener conexiones WebSocket vivas y reconectar automaticamente
  • Garantizar continuidad del dato (sin gaps en trade IDs)
  • Servir datos normalizados, listos para consumir
  • Escalar a cientos de simbolos con eficiencia (combined streams)
  • Order books correctos e integros en memoria

Lo que NO hace binflow (responsabilidad del consumidor):

  • Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
  • Callbacks o event-driven processing
  • Metricas y monitoring (Prometheus, Grafana)
  • Logica de trading o senales

Otra libreria o servicio debe consumir de binflow via polling (get_recent_trades, get_order_book) y encargarse del resto.

Caracteristicas

  • Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
  • Streaming en tiempo real de trades y order book desde Binance
  • Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
  • Order books profundos -- hasta 200 niveles por lado via diff depth stream
  • Cache en memoria thread-safe con acceso O(1)
  • Reconexion automatica con backoff exponencial y jitter
  • API sincrona -- se usa desde codigo Python normal, sin async/await
  • Observabilidad -- estado de streams, metricas, deteccion de degradacion
  • Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
  • Shutdown limpio -- sin tareas pendientes ni warnings
  • Context manager -- soporte with para manejo automatico del ciclo de vida
  • Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs

Instalacion

pip install -e .

O con dependencias de desarrollo:

pip install -e ".[dev]"

Requisitos

  • Python >= 3.11
  • websockets >= 13.0

Inicio rapido

import time
from market_streaming import MarketStreamManager, StreamConfig

config = StreamConfig(
    max_recent_trades=500,
    default_order_book_depth=10,
)

with MarketStreamManager(config) as manager:
    manager.add_trade_stream("BTCUSDT")
    manager.add_order_book_stream("BTCUSDT", depth=10)

    time.sleep(5)  # esperar a que lleguen datos

    # Ultimo trade
    trade = manager.get_last_trade("BTCUSDT")
    if trade:
        print(f"{trade.price} x {trade.quantity}")

    # Order book
    book = manager.get_order_book("BTCUSDT")
    if book:
        print(f"Mejor bid: {book.bids[0].price}")
        print(f"Mejor ask: {book.asks[0].price}")

    # Salud del sistema
    print(manager.is_healthy())

Streams masivos (cientos de simbolos)

Para suscribirse a muchos simbolos a la vez, usar add_trade_streams() o add_order_book_streams(). Estos metodos multiplexan automaticamente los simbolos en conexiones combinadas (hasta 200 por WebSocket), evitando el rate limit de conexiones de Binance:

symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", ...]  # cientos de simbolos

with MarketStreamManager(config) as manager:
    # Registra todos los simbolos en pocas conexiones combinadas
    manager.add_trade_streams(symbols)

    time.sleep(10)

    for symbol in symbols:
        trade = manager.get_last_trade(symbol)
        if trade:
            print(f"{symbol}: {trade.price}")

Order books profundos

Para acumular mas de 20 niveles, configurar default_order_book_depth > 20. El sistema usara automaticamente el diff depth stream de Binance:

config = StreamConfig(default_order_book_depth=200)

with MarketStreamManager(config) as manager:
    manager.add_order_book_streams(["BTCUSDT", "ETHUSDT"], depth=200)
    time.sleep(60)  # acumular niveles

    book = manager.get_order_book("BTCUSDT")
    print(f"Niveles bid: {len(book.bids)}, ask: {len(book.asks)}")

Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos). Cualquier otro valor usa el Diff Depth Stream (actualizaciones incrementales), que acumula niveles con el tiempo.

Futuros USD-M

from market_streaming import MarketStreamManager, StreamConfig, Market

config = StreamConfig(market=Market.FUTURES_USD)

with MarketStreamManager(config) as manager:
    manager.add_trade_stream("BTCUSDT")
    manager.add_order_book_stream("BTCUSDT")
    # ...

Futuros COIN-M

from market_streaming import StreamConfig, Market

config = StreamConfig(market=Market.FUTURES_COIN)
# Los pares de COIN-M usan formato distinto (ej: BTCUSD_PERP)

API publica

MarketStreamManager

Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.

Ciclo de vida

Metodo Descripcion
start() Arranca el motor en un thread dedicado
stop() Detiene todos los streams y libera recursos

Soporta context manager (with MarketStreamManager(config) as mgr:).

Registro de streams individuales

Metodo Descripcion
add_trade_stream(symbol) Suscribe al stream de trades de un simbolo
add_order_book_stream(symbol, depth=20) Suscribe al stream de profundidad de un simbolo
remove_stream(stream_id) Detiene y elimina un stream

Cada metodo devuelve un stream_id (ej: trade_BTCUSDT, depth_BTCUSDT).

Registro de streams masivos (combinados)

Metodo Descripcion
add_trade_streams(symbols, batch_size=200) Registra trades para multiples simbolos en conexiones combinadas
add_order_book_streams(symbols, depth=None, batch_size=200) Registra order books para multiples simbolos en conexiones combinadas

Estos metodos crean CombinedStreamWorkers que multiplexan hasta batch_size simbolos por conexion WebSocket usando el endpoint /stream?streams=... de Binance. Devuelven una lista de worker IDs.

Consulta de datos

Metodo Retorno
get_last_trade(symbol) Trade o None
get_recent_trades(symbol, limit=100) list[Trade]
get_order_book(symbol) OrderBookSnapshot o None

Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.

Observabilidad

Metodo Descripcion
get_stream_status(stream_id) Estado de un stream individual
get_all_streams_status() Estado de todos los streams
get_health() Diagnostico de salud global (SystemHealth)
get_stats() Estadisticas resumidas (dict)
is_healthy() True si todos los streams estan sanos
print_summary() Imprime resumen por logging

Modelos de datos

Trade

@dataclass(frozen=True)
class Trade:
    symbol: str          # par de trading (ej: "BTCUSDT")
    trade_id: int        # ID unico del trade en Binance
    price: float         # precio de ejecucion
    quantity: float      # cantidad ejecutada
    quote_quantity: float # cantidad en moneda cotizada
    buyer_maker: bool    # True si el comprador es el maker (trade = venta)
    timestamp_ms: int    # timestamp del trade en milisegundos
    price_str: str       # precio como cadena original (precision completa)
    quantity_str: str    # cantidad como cadena original
    quote_quantity_str: str
    is_best_match: bool  # solo Spot
    event_time_ms: int   # timestamp del evento WebSocket

Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.

OrderBookSnapshot

@dataclass
class OrderBookSnapshot:
    symbol: str
    bids: list[OrderBookLevel]   # ordenados precio descendente (mejor bid primero)
    asks: list[OrderBookLevel]   # ordenados precio ascendente (mejor ask primero)
    last_update_id: int          # ID de la ultima actualizacion aplicada
    timestamp: float             # time.time() de la ultima actualizacion
    event_time_ms: int
    transaction_time_ms: int

OrderBookLevel

@dataclass(frozen=True)
class OrderBookLevel:
    price: float
    quantity: float
    price_str: str       # cadena original de Binance
    quantity_str: str

Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.

StreamState

Estados posibles de un stream:

Estado Significado
starting Iniciando conexion
running Conectado y recibiendo datos
reconnecting Reconectando tras un error
stale Conectado pero sin datos recientes
stopped Detenido de forma limpia
failed Error fatal, no reconectable

SystemHealth

@dataclass
class SystemHealth:
    is_healthy: bool             # True si todos los streams estan sanos
    total_streams: int
    running_streams: int
    stale_streams: int
    reconnecting_streams: int
    stopped_streams: int
    uptime_s: float              # segundos desde el arranque
    total_messages: int          # total de mensajes procesados
    stream_details: dict[str, StreamStatus]

Configuracion

Toda la configuracion se centraliza en StreamConfig:

from market_streaming import StreamConfig, Market

config = StreamConfig(
    # Mercado (default: Spot)
    market=Market.SPOT,       # SPOT | FUTURES_USD | FUTURES_COIN
    # base_url="wss://...",   # si se pasa, tiene prioridad sobre market

    # Conexion
    ping_interval_s=20.0,
    ping_timeout_s=10.0,

    # Cache
    max_recent_trades=1000,
    default_order_book_depth=20,   # >20 usa diff depth stream

    # Reconexion
    reconnect_base_delay_s=1.0,
    reconnect_max_delay_s=60.0,
    reconnect_jitter_factor=0.2,
    max_reconnect_attempts=0,  # 0 = sin limite

    # Health
    stale_threshold_s=30.0,

    # Logging
    log_level=logging.INFO,
    log_dir="logs",
    log_max_bytes=10 * 1024 * 1024,  # 10 MB por archivo
    log_backup_count=5,               # max 5 archivos rotados
)

Mercados soportados

Market URL WebSocket Ejemplo de par
Market.SPOT wss://stream.binance.com:9443/ws BTCUSDT
Market.FUTURES_USD wss://fstream.binance.com/ws BTCUSDT
Market.FUTURES_COIN wss://dstream.binance.com/ws BTCUSD_PERP

Todos los campos tienen valores por defecto razonables. StreamConfig() sin argumentos conecta a Spot.

Logging

El sistema genera dos archivos de log con rotacion automatica:

Archivo Nivel Uso
binflow.log DEBUG+ Analisis completo, debug, tracing
binflow.error.log WARNING+ Deteccion rapida de fallos e interrupciones

Rotacion y limites de disco

  • Cada archivo rota al alcanzar log_max_bytes (default: 10 MB)
  • Se mantienen como maximo log_backup_count archivos rotados (default: 5)
  • Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
  • Los archivos rotados se nombran binflow.log.1, binflow.log.2, etc.

Formato de archivo

2026-03-06 20:15:32 | INFO     | market_streaming.worker:_connect_and_consume:178 | Stream trade_BTCUSDT: conexion establecida
2026-03-06 20:15:33 | WARNING  | market_streaming.worker:_run:123 | Error en stream depth_BTCUSDT (intento 1): [Errno 111] Connection refused

Configuracion de logging

import logging
from market_streaming import StreamConfig

# Produccion: solo errores en consola, todo en disco
config = StreamConfig(
    log_level=logging.ERROR,
    log_dir="logs",
    log_max_bytes=10 * 1024 * 1024,
    log_backup_count=5,
)

# Desarrollo: verbose en consola, sin disco
config = StreamConfig(
    log_level=logging.DEBUG,
    log_dir="",  # desactiva archivos en disco
)

# Directorio personalizado
config = StreamConfig(log_dir="/var/log/binflow")

Arquitectura

MarketStreamManager           API publica sincrona (fachada)
    |
    +-- threading.Thread       Thread dedicado con event loop asyncio
    |       |
    |       +-- WebSocketStreamWorker (trade_BTCUSDT)     1 simbolo / conexion
    |       +-- WebSocketStreamWorker (depth_BTCUSDT)     1 simbolo / conexion
    |       +-- CombinedStreamWorker (combined_trades_0)  hasta 200 simbolos / conexion
    |       +-- CombinedStreamWorker (combined_depth_0)   hasta 200 simbolos / conexion
    |       +-- ...
    |
    +-- MarketDataCache        Cache en memoria (threading.Lock)
    |
    +-- HealthMonitor          Evaluacion de salud
    |
    +-- Logging                RotatingFileHandler (general + errores)

Workers

Worker Uso Conexiones
WebSocketStreamWorker 1 simbolo por conexion (add_trade_stream, add_order_book_stream) 1 WS por stream
CombinedStreamWorker Hasta 200 simbolos por conexion (add_trade_streams, add_order_book_streams) 1 WS por batch

Los workers combinados usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance y desenvuelven el formato {stream, data} de los mensajes.

Modelo de concurrencia

El sistema usa un thread dedicado con event loop asyncio:

  • API publica sincrona: el codigo cliente llama manager.start(), manager.get_last_trade(), etc. sin await. Esto permite usar el motor desde cualquier contexto Python.
  • Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
  • Cache thread-safe: threading.Lock protege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).

Reconexion

Cada worker gestiona su propia reconexion:

  1. Error de conexion o desconexion detectada
  2. Backoff exponencial: min(base * 2^n, max_delay)
  3. Jitter aleatorio de +-20% sobre el delay calculado
  4. Contador de reconexiones incrementado
  5. Si se supera max_reconnect_attempts (y no es 0), el stream pasa a failed
  6. Errores fatales (ej: stream invalido) no reconectan

Depth streams

Profundidad Stream Tipo de mensaje
5, 10, 20 symbol@depth{N} (Partial Book Depth) Snapshots completos cada ~1s
Cualquier otro valor symbol@depth (Diff Depth) Actualizaciones incrementales

El diff depth stream acumula niveles con el tiempo. Para un order book completo desde el primer momento, usar profundidades de 5, 10 o 20.

Cache en memoria

Dato Estructura Acceso
Ultimo trade por simbolo dict[symbol, Trade] O(1)
Trades recientes por simbolo dict[symbol, deque(maxlen=N)] O(1) amortizado
Order book por simbolo dict[symbol, OrderBookSnapshot] O(1)

El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero.

Estructura del proyecto

market_streaming/
    __init__.py          Re-exports de la API publica
    config.py            StreamConfig, Market, MARKET_URLS
    models.py            Trade, OrderBookSnapshot, StreamStatus, SystemHealth
    exceptions.py        Excepciones del dominio
    cache.py             MarketDataCache
    worker.py            WebSocketStreamWorker, CombinedStreamWorker
    health.py            HealthMonitor
    manager.py           MarketStreamManager
    logging_config.py    Setup de logging con rotacion
tests/
    test_cache.py        18 tests - trades, order book, continuidad
    test_models.py       16 tests - inmutabilidad, defaults, config, mercados
    test_health.py        8 tests - deteccion stale, salud global
    test_worker.py       12 tests - URLs por mercado, parsing mensajes, backoff
    test_manager.py      13 tests - lifecycle, streams, observabilidad
    test_logging.py      13 tests - handlers, rotacion, archivos, teardown
    test_load.py          8 tests - carga: 728 symbols trades, 60s
    test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos
examples/
    inspect_trade.py     Estructura del Trade: campos, tipos, precision, formato REST
    inspect_order_book.py  OrderBookSnapshot: estructura, niveles, formato REST
    inspect_config.py    StreamConfig: campos, Market enum, URLs (sin conexion)
    inspect_health.py    SystemHealth, StreamStatus, StreamState
    live_trades.py       Feed de trades en tiempo real
    live_order_book.py   Order book visual con barras de volumen y spread
    multi_market.py      Tabla comparativa multi-simbolo en tiempo real
notebooks/
    streaming_demo.ipynb Demo completa: stream, inspeccion, acumulacion, verificacion

Ejemplos

Todos los ejemplos se ejecutan con python examples/<nombre>.py.

Inspeccion de datos (sin conexion)

python examples/inspect_trade.py        # estructura Trade
python examples/inspect_order_book.py   # estructura OrderBookSnapshot
python examples/inspect_config.py       # StreamConfig, Market, URLs
python examples/inspect_health.py       # SystemHealth, StreamStatus

Streams en vivo

python examples/live_trades.py          # feed de trades BTCUSDT en tiempo real
python examples/live_order_book.py      # order book visual con spread
python examples/multi_market.py         # tabla comparativa multi-simbolo

Notebook

jupyter notebook notebooks/streaming_demo.ipynb

Demo interactiva: arranque, inspeccion, acumulacion de 2 minutos con verificacion de continuidad, estadisticas y shutdown limpio.

Tests

# Ejecutar todos los tests unitarios (83 tests, <1s)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v

# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s              # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s    # 728 symbols, depth 200, 60s

# Todos los tests (102 tests)
pytest tests/ -v -s

# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v

Pruebas de carga

Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:

  • Continuidad: trade IDs consecutivos sin huecos
  • Unicidad: cero duplicados
  • Orden: IDs estrictamente crecientes
  • Timestamps: no decrecientes
  • Profundidad: order books de hasta 200 niveles por lado
  • Integridad estructural: bids descendentes, asks ascendentes, spread positivo
  • Precision: cadenas originales de Binance preservadas
  • Estabilidad: todos los workers running, cero reconexiones

Limitaciones

  • No implementa el flujo completo de sincronizacion del order book de Binance (REST snapshot + WebSocket diff). Para profundidades > 20, el diff depth stream acumula niveles incrementalmente sin snapshot REST inicial.
  • No persiste datos en disco. Si el proceso muere, se pierde el cache.
  • Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
  • No implementa autenticacion para streams privados (user data streams).

Licencia

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

binflow-0.1.1.tar.gz (33.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

binflow-0.1.1-py3-none-any.whl (25.4 kB view details)

Uploaded Python 3

File details

Details for the file binflow-0.1.1.tar.gz.

File metadata

  • Download URL: binflow-0.1.1.tar.gz
  • Upload date:
  • Size: 33.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 169c396572bc68a06185914c22835da427d0361ea3f35f40b71268c0ddfa2e75
MD5 818384425761366aaeb91d4be82b2192
BLAKE2b-256 6d5c34fa9ec7c24a1e76b8760d2658104146c723bc8ee69f03642788ff3c8eec

See more details on using hashes here.

File details

Details for the file binflow-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: binflow-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 25.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for binflow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7339116d08a61f30d98f6653e58279effbe928cbb327ff16eadd5c342532503a
MD5 9fe399f3d28ee02aa9422d5ab048269c
BLAKE2b-256 87b6927cba71890869b8d193364e0a4b74c141ca0dbeb46bb65353fec37085c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page