Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)
Project description
binflow
Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.
Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.
Filosofia
binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.
Lo que SI hace binflow:
- Mantener conexiones WebSocket vivas y reconectar automaticamente
- Garantizar continuidad del dato (sin gaps en trade IDs)
- Servir datos normalizados, listos para consumir
- Escalar a cientos de simbolos con eficiencia (combined streams)
- Order books correctos e integros en memoria
Lo que NO hace binflow (responsabilidad del consumidor):
- Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
- Callbacks o event-driven processing
- Metricas y monitoring (Prometheus, Grafana)
- Logica de trading o senales
Otra libreria o servicio debe consumir de binflow via polling (get_recent_trades, get_order_book) y encargarse del resto.
Caracteristicas
- Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
- Streaming en tiempo real de trades y order book desde Binance
- Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
- Order books profundos -- hasta 200 niveles por lado via diff depth stream
- Cache en memoria thread-safe con acceso O(1)
- Reconexion automatica con backoff exponencial y jitter
- API sincrona -- se usa desde codigo Python normal, sin
async/await - Observabilidad -- estado de streams, metricas, deteccion de degradacion
- Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
- Shutdown limpio -- sin tareas pendientes ni warnings
- Context manager -- soporte
withpara manejo automatico del ciclo de vida - Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs
Instalacion
pip install -e .
O con dependencias de desarrollo:
pip install -e ".[dev]"
Requisitos
- Python >= 3.11
websockets >= 13.0
Inicio rapido
import time
from market_streaming import MarketStreamManager, StreamConfig
config = StreamConfig(
max_recent_trades=500,
default_order_book_depth=10,
)
with MarketStreamManager(config) as manager:
manager.add_trade_stream("BTCUSDT")
manager.add_order_book_stream("BTCUSDT", depth=10)
time.sleep(5) # esperar a que lleguen datos
# Ultimo trade
trade = manager.get_last_trade("BTCUSDT")
if trade:
print(f"{trade.price} x {trade.quantity}")
# Order book
book = manager.get_order_book("BTCUSDT")
if book:
print(f"Mejor bid: {book.bids[0].price}")
print(f"Mejor ask: {book.asks[0].price}")
# Salud del sistema
print(manager.is_healthy())
Streams masivos (cientos de simbolos)
Para suscribirse a muchos simbolos a la vez, usar add_trade_streams() o add_order_book_streams(). Estos metodos multiplexan automaticamente los simbolos en conexiones combinadas (hasta 200 por WebSocket), evitando el rate limit de conexiones de Binance:
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", ...] # cientos de simbolos
with MarketStreamManager(config) as manager:
# Registra todos los simbolos en pocas conexiones combinadas
manager.add_trade_streams(symbols)
time.sleep(10)
for symbol in symbols:
trade = manager.get_last_trade(symbol)
if trade:
print(f"{symbol}: {trade.price}")
Order books profundos
Para acumular mas de 20 niveles, configurar default_order_book_depth > 20. El sistema usara automaticamente el diff depth stream de Binance:
config = StreamConfig(default_order_book_depth=200)
with MarketStreamManager(config) as manager:
manager.add_order_book_streams(["BTCUSDT", "ETHUSDT"], depth=200)
time.sleep(60) # acumular niveles
book = manager.get_order_book("BTCUSDT")
print(f"Niveles bid: {len(book.bids)}, ask: {len(book.asks)}")
Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos). Cualquier otro valor usa el Diff Depth Stream (actualizaciones incrementales), que acumula niveles con el tiempo.
Futuros USD-M
from market_streaming import MarketStreamManager, StreamConfig, Market
config = StreamConfig(market=Market.FUTURES_USD)
with MarketStreamManager(config) as manager:
manager.add_trade_stream("BTCUSDT")
manager.add_order_book_stream("BTCUSDT")
# ...
Futuros COIN-M
from market_streaming import StreamConfig, Market
config = StreamConfig(market=Market.FUTURES_COIN)
# Los pares de COIN-M usan formato distinto (ej: BTCUSD_PERP)
API publica
MarketStreamManager
Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.
Ciclo de vida
| Metodo | Descripcion |
|---|---|
start() |
Arranca el motor en un thread dedicado |
stop() |
Detiene todos los streams y libera recursos |
Soporta context manager (with MarketStreamManager(config) as mgr:).
Registro de streams individuales
| Metodo | Descripcion |
|---|---|
add_trade_stream(symbol) |
Suscribe al stream de trades de un simbolo |
add_order_book_stream(symbol, depth=20) |
Suscribe al stream de profundidad de un simbolo |
remove_stream(stream_id) |
Detiene y elimina un stream |
Cada metodo devuelve un stream_id (ej: trade_BTCUSDT, depth_BTCUSDT).
Registro de streams masivos (combinados)
| Metodo | Descripcion |
|---|---|
add_trade_streams(symbols, batch_size=200) |
Registra trades para multiples simbolos en conexiones combinadas |
add_order_book_streams(symbols, depth=None, batch_size=200) |
Registra order books para multiples simbolos en conexiones combinadas |
Estos metodos crean CombinedStreamWorkers que multiplexan hasta batch_size simbolos por conexion WebSocket usando el endpoint /stream?streams=... de Binance. Devuelven una lista de worker IDs.
Consulta de datos
| Metodo | Retorno |
|---|---|
get_last_trade(symbol) |
Trade o None |
get_recent_trades(symbol, limit=100) |
list[Trade] |
get_order_book(symbol) |
OrderBookSnapshot o None |
Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.
Observabilidad
| Metodo | Descripcion |
|---|---|
get_stream_status(stream_id) |
Estado de un stream individual |
get_all_streams_status() |
Estado de todos los streams |
get_health() |
Diagnostico de salud global (SystemHealth) |
get_stats() |
Estadisticas resumidas (dict) |
is_healthy() |
True si todos los streams estan sanos |
print_summary() |
Imprime resumen por logging |
Modelos de datos
Trade
@dataclass(frozen=True)
class Trade:
symbol: str # par de trading (ej: "BTCUSDT")
trade_id: int # ID unico del trade en Binance
price: float # precio de ejecucion
quantity: float # cantidad ejecutada
quote_quantity: float # cantidad en moneda cotizada
buyer_maker: bool # True si el comprador es el maker (trade = venta)
timestamp_ms: int # timestamp del trade en milisegundos
price_str: str # precio como cadena original (precision completa)
quantity_str: str # cantidad como cadena original
quote_quantity_str: str
is_best_match: bool # solo Spot
event_time_ms: int # timestamp del evento WebSocket
Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.
OrderBookSnapshot
@dataclass
class OrderBookSnapshot:
symbol: str
bids: list[OrderBookLevel] # ordenados precio descendente (mejor bid primero)
asks: list[OrderBookLevel] # ordenados precio ascendente (mejor ask primero)
last_update_id: int # ID de la ultima actualizacion aplicada
timestamp: float # time.time() de la ultima actualizacion
event_time_ms: int
transaction_time_ms: int
OrderBookLevel
@dataclass(frozen=True)
class OrderBookLevel:
price: float
quantity: float
price_str: str # cadena original de Binance
quantity_str: str
Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.
StreamState
Estados posibles de un stream:
| Estado | Significado |
|---|---|
starting |
Iniciando conexion |
running |
Conectado y recibiendo datos |
reconnecting |
Reconectando tras un error |
stale |
Conectado pero sin datos recientes |
stopped |
Detenido de forma limpia |
failed |
Error fatal, no reconectable |
SystemHealth
@dataclass
class SystemHealth:
is_healthy: bool # True si todos los streams estan sanos
total_streams: int
running_streams: int
stale_streams: int
reconnecting_streams: int
stopped_streams: int
uptime_s: float # segundos desde el arranque
total_messages: int # total de mensajes procesados
stream_details: dict[str, StreamStatus]
Configuracion
Toda la configuracion se centraliza en StreamConfig:
from market_streaming import StreamConfig, Market
config = StreamConfig(
# Mercado (default: Spot)
market=Market.SPOT, # SPOT | FUTURES_USD | FUTURES_COIN
# base_url="wss://...", # si se pasa, tiene prioridad sobre market
# Conexion
ping_interval_s=20.0,
ping_timeout_s=10.0,
# Cache
max_recent_trades=1000,
default_order_book_depth=20, # >20 usa diff depth stream
# Reconexion
reconnect_base_delay_s=1.0,
reconnect_max_delay_s=60.0,
reconnect_jitter_factor=0.2,
max_reconnect_attempts=0, # 0 = sin limite
# Health
stale_threshold_s=30.0,
# Logging
log_level=logging.INFO,
log_dir="logs",
log_max_bytes=10 * 1024 * 1024, # 10 MB por archivo
log_backup_count=5, # max 5 archivos rotados
)
Mercados soportados
| Market | URL WebSocket | Ejemplo de par |
|---|---|---|
Market.SPOT |
wss://stream.binance.com:9443/ws |
BTCUSDT |
Market.FUTURES_USD |
wss://fstream.binance.com/ws |
BTCUSDT |
Market.FUTURES_COIN |
wss://dstream.binance.com/ws |
BTCUSD_PERP |
Todos los campos tienen valores por defecto razonables. StreamConfig() sin argumentos conecta a Spot.
Logging
El sistema genera dos archivos de log con rotacion automatica:
| Archivo | Nivel | Uso |
|---|---|---|
binflow.log |
DEBUG+ | Analisis completo, debug, tracing |
binflow.error.log |
WARNING+ | Deteccion rapida de fallos e interrupciones |
Rotacion y limites de disco
- Cada archivo rota al alcanzar
log_max_bytes(default: 10 MB) - Se mantienen como maximo
log_backup_countarchivos rotados (default: 5) - Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
- Los archivos rotados se nombran
binflow.log.1,binflow.log.2, etc.
Formato de archivo
2026-03-06 20:15:32 | INFO | market_streaming.worker:_connect_and_consume:178 | Stream trade_BTCUSDT: conexion establecida
2026-03-06 20:15:33 | WARNING | market_streaming.worker:_run:123 | Error en stream depth_BTCUSDT (intento 1): [Errno 111] Connection refused
Configuracion de logging
import logging
from market_streaming import StreamConfig
# Produccion: solo errores en consola, todo en disco
config = StreamConfig(
log_level=logging.ERROR,
log_dir="logs",
log_max_bytes=10 * 1024 * 1024,
log_backup_count=5,
)
# Desarrollo: verbose en consola, sin disco
config = StreamConfig(
log_level=logging.DEBUG,
log_dir="", # desactiva archivos en disco
)
# Directorio personalizado
config = StreamConfig(log_dir="/var/log/binflow")
Arquitectura
MarketStreamManager API publica sincrona (fachada)
|
+-- threading.Thread Thread dedicado con event loop asyncio
| |
| +-- WebSocketStreamWorker (trade_BTCUSDT) 1 simbolo / conexion
| +-- WebSocketStreamWorker (depth_BTCUSDT) 1 simbolo / conexion
| +-- CombinedStreamWorker (combined_trades_0) hasta 200 simbolos / conexion
| +-- CombinedStreamWorker (combined_depth_0) hasta 200 simbolos / conexion
| +-- ...
|
+-- MarketDataCache Cache en memoria (threading.Lock)
|
+-- HealthMonitor Evaluacion de salud
|
+-- Logging RotatingFileHandler (general + errores)
Workers
| Worker | Uso | Conexiones |
|---|---|---|
WebSocketStreamWorker |
1 simbolo por conexion (add_trade_stream, add_order_book_stream) |
1 WS por stream |
CombinedStreamWorker |
Hasta 200 simbolos por conexion (add_trade_streams, add_order_book_streams) |
1 WS por batch |
Los workers combinados usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance y desenvuelven el formato {stream, data} de los mensajes.
Modelo de concurrencia
El sistema usa un thread dedicado con event loop asyncio:
- API publica sincrona: el codigo cliente llama
manager.start(),manager.get_last_trade(), etc. sinawait. Esto permite usar el motor desde cualquier contexto Python. - Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
- Cache thread-safe:
threading.Lockprotege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).
Reconexion
Cada worker gestiona su propia reconexion:
- Error de conexion o desconexion detectada
- Backoff exponencial:
min(base * 2^n, max_delay) - Jitter aleatorio de +-20% sobre el delay calculado
- Contador de reconexiones incrementado
- Si se supera
max_reconnect_attempts(y no es 0), el stream pasa afailed - Errores fatales (ej: stream invalido) no reconectan
Depth streams
| Profundidad | Stream | Tipo de mensaje |
|---|---|---|
| 5, 10, 20 | symbol@depth{N} (Partial Book Depth) |
Snapshots completos cada ~1s |
| Cualquier otro valor | symbol@depth (Diff Depth) |
Actualizaciones incrementales |
El diff depth stream acumula niveles con el tiempo. Para un order book completo desde el primer momento, usar profundidades de 5, 10 o 20.
Cache en memoria
| Dato | Estructura | Acceso |
|---|---|---|
| Ultimo trade por simbolo | dict[symbol, Trade] |
O(1) |
| Trades recientes por simbolo | dict[symbol, deque(maxlen=N)] |
O(1) amortizado |
| Order book por simbolo | dict[symbol, OrderBookSnapshot] |
O(1) |
El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero.
Estructura del proyecto
market_streaming/
__init__.py Re-exports de la API publica
config.py StreamConfig, Market, MARKET_URLS
models.py Trade, OrderBookSnapshot, StreamStatus, SystemHealth
exceptions.py Excepciones del dominio
cache.py MarketDataCache
worker.py WebSocketStreamWorker, CombinedStreamWorker
health.py HealthMonitor
manager.py MarketStreamManager
logging_config.py Setup de logging con rotacion
tests/
test_cache.py 18 tests - trades, order book, continuidad
test_models.py 16 tests - inmutabilidad, defaults, config, mercados
test_health.py 8 tests - deteccion stale, salud global
test_worker.py 12 tests - URLs por mercado, parsing mensajes, backoff
test_manager.py 13 tests - lifecycle, streams, observabilidad
test_logging.py 13 tests - handlers, rotacion, archivos, teardown
test_load.py 8 tests - carga: 728 symbols trades, 60s
test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos
examples/
inspect_trade.py Estructura del Trade: campos, tipos, precision, formato REST
inspect_order_book.py OrderBookSnapshot: estructura, niveles, formato REST
inspect_config.py StreamConfig: campos, Market enum, URLs (sin conexion)
inspect_health.py SystemHealth, StreamStatus, StreamState
live_trades.py Feed de trades en tiempo real
live_order_book.py Order book visual con barras de volumen y spread
multi_market.py Tabla comparativa multi-simbolo en tiempo real
notebooks/
streaming_demo.ipynb Demo completa: stream, inspeccion, acumulacion, verificacion
Ejemplos
Todos los ejemplos se ejecutan con python examples/<nombre>.py.
Inspeccion de datos (sin conexion)
python examples/inspect_trade.py # estructura Trade
python examples/inspect_order_book.py # estructura OrderBookSnapshot
python examples/inspect_config.py # StreamConfig, Market, URLs
python examples/inspect_health.py # SystemHealth, StreamStatus
Streams en vivo
python examples/live_trades.py # feed de trades BTCUSDT en tiempo real
python examples/live_order_book.py # order book visual con spread
python examples/multi_market.py # tabla comparativa multi-simbolo
Notebook
jupyter notebook notebooks/streaming_demo.ipynb
Demo interactiva: arranque, inspeccion, acumulacion de 2 minutos con verificacion de continuidad, estadisticas y shutdown limpio.
Tests
# Ejecutar todos los tests unitarios (83 tests, <1s)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v
# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s # 728 symbols, depth 200, 60s
# Todos los tests (102 tests)
pytest tests/ -v -s
# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v
Pruebas de carga
Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:
- Continuidad: trade IDs consecutivos sin huecos
- Unicidad: cero duplicados
- Orden: IDs estrictamente crecientes
- Timestamps: no decrecientes
- Profundidad: order books de hasta 200 niveles por lado
- Integridad estructural: bids descendentes, asks ascendentes, spread positivo
- Precision: cadenas originales de Binance preservadas
- Estabilidad: todos los workers running, cero reconexiones
Limitaciones
- No implementa el flujo completo de sincronizacion del order book de Binance (REST snapshot + WebSocket diff). Para profundidades > 20, el diff depth stream acumula niveles incrementalmente sin snapshot REST inicial.
- No persiste datos en disco. Si el proceso muere, se pierde el cache.
- Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
- No implementa autenticacion para streams privados (user data streams).
Licencia
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file binflow-0.1.1.tar.gz.
File metadata
- Download URL: binflow-0.1.1.tar.gz
- Upload date:
- Size: 33.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
169c396572bc68a06185914c22835da427d0361ea3f35f40b71268c0ddfa2e75
|
|
| MD5 |
818384425761366aaeb91d4be82b2192
|
|
| BLAKE2b-256 |
6d5c34fa9ec7c24a1e76b8760d2658104146c723bc8ee69f03642788ff3c8eec
|
File details
Details for the file binflow-0.1.1-py3-none-any.whl.
File metadata
- Download URL: binflow-0.1.1-py3-none-any.whl
- Upload date:
- Size: 25.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7339116d08a61f30d98f6653e58279effbe928cbb327ff16eadd5c342532503a
|
|
| MD5 |
9fe399f3d28ee02aa9422d5ab048269c
|
|
| BLAKE2b-256 |
87b6927cba71890869b8d193364e0a4b74c141ca0dbeb46bb65353fec37085c3
|