Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)
Project description
binflow
Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.
Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.
Filosofia
binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.
Lo que SI hace binflow:
- Mantener conexiones WebSocket vivas y reconectar automaticamente
- Garantizar continuidad del dato (sin gaps en trade IDs)
- Servir datos normalizados, listos para consumir
- Escalar a cientos de simbolos con eficiencia (combined streams)
- Order books correctos e integros en memoria
Lo que NO hace binflow (responsabilidad del consumidor):
- Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
- Callbacks o event-driven processing
- Metricas y monitoring (Prometheus, Grafana)
- Logica de trading o senales
Otra libreria o servicio debe consumir de binflow via polling (get_recent_trades, get_order_book, get_kline, etc.) y encargarse del resto.
Caracteristicas
- Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
- 7 tipos de stream -- trades, order book, aggTrades, klines, ticker 24h, miniTicker, bookTicker
- Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
- Gestion dinamica -- subscribe/unsubscribe de simbolos en combined streams sin recrear la conexion
- Order books sincronizados -- REST snapshot + diffs por WebSocket, con campo
is_syncedque indica integridad verificada - Order books profundos -- hasta 5000 niveles (Spot) o 1000 (Futures) por lado
- Rate limiting automatico -- delegado a panzer, respeta cabeceras
X-MBX-USED-WEIGHTde Binance - Anti-regresion -- el cache rechaza snapshots con
lastUpdateIdanterior al existente, protegiendo contra datos stale - Cache en memoria thread-safe con acceso O(1)
- Reconexion automatica con backoff exponencial y jitter
- API sincrona -- se usa desde codigo Python normal, sin
async/await - Observabilidad -- estado de streams, metricas, deteccion de degradacion
- Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
- Shutdown limpio -- sin tareas pendientes ni warnings
- Context manager -- soporte
withpara manejo automatico del ciclo de vida - Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs
Instalacion
pip install -e .
O con dependencias de desarrollo:
pip install -e ".[dev]"
Requisitos
- Python >= 3.11
websockets >= 13.0panzer >= 2.3.0(rate limiting y peticiones REST a Binance)
Inicio rapido
import time
from market_streaming import MarketStreamManager, StreamConfig
config = StreamConfig(
max_recent_trades=500,
default_order_book_depth=10,
)
with MarketStreamManager(config) as manager:
manager.add_trade_stream("BTCUSDT")
manager.add_order_book_stream("BTCUSDT", depth=10)
time.sleep(5) # esperar a que lleguen datos
# Ultimo trade
trade = manager.get_last_trade("BTCUSDT")
if trade:
print(f"{trade.price} x {trade.quantity}")
# Order book (sincronizado via REST + WebSocket)
book = manager.get_order_book("BTCUSDT")
if book:
print(f"Mejor bid: {book.bids[0].price}")
print(f"Mejor ask: {book.asks[0].price}")
print(f"Sincronizado: {book.is_synced}")
# Klines, tickers, bookTicker...
manager.add_kline_stream("BTCUSDT", interval="1m")
manager.add_ticker_stream("BTCUSDT")
manager.add_book_ticker_stream("BTCUSDT")
time.sleep(3)
kline = manager.get_kline("BTCUSDT", "1m")
if kline:
print(f"Vela 1m: O={kline.open} H={kline.high} L={kline.low} C={kline.close}")
ticker = manager.get_ticker("BTCUSDT")
if ticker:
print(f"24h: {ticker.price_change_pct:.2f}% vol={ticker.volume:.2f}")
bt = manager.get_book_ticker("BTCUSDT")
if bt:
print(f"Spread: {bt.best_ask_price - bt.best_bid_price:.2f}")
# Salud del sistema
print(manager.is_healthy())
Streams masivos (cientos de simbolos)
Para suscribirse a muchos simbolos a la vez, usar add_trade_streams() o add_order_book_streams(). Estos metodos multiplexan automaticamente los simbolos en conexiones combinadas (hasta 200 por WebSocket), evitando el rate limit de conexiones de Binance:
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", ...] # cientos de simbolos
with MarketStreamManager(config) as manager:
# Registra todos los simbolos en pocas conexiones combinadas
manager.add_trade_streams(symbols)
time.sleep(10)
for symbol in symbols:
trade = manager.get_last_trade(symbol)
if trade:
print(f"{symbol}: {trade.price}")
Order books profundos y sincronizados
Para profundidades > 20, el sistema usa automaticamente el diff depth stream de Binance con sincronizacion REST:
- Conecta al diff depth stream y buferea mensajes
- Pide un snapshot REST completo via panzer (con rate limiting automatico)
- Filtra los diffs anteriores al snapshot
- Aplica los diffs posteriores sobre el snapshot
- Marca el order book como
is_synced=True
config = StreamConfig(default_order_book_depth=200)
with MarketStreamManager(config) as manager:
manager.add_order_book_streams(["BTCUSDT", "ETHUSDT"], depth=200)
time.sleep(10) # esperar a sincronizacion REST
book = manager.get_order_book("BTCUSDT")
print(f"Niveles: {len(book.bids)} bids, {len(book.asks)} asks")
print(f"Sincronizado: {book.is_synced}") # True tras REST sync
Para streams masivos (cientos de simbolos), panzer gestiona las peticiones REST en paralelo con bulk_depth(), respetando los rate limits de Binance.
Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos cada ~1s, sin necesidad de REST sync). Cualquier otro valor usa el Diff Depth Stream con sincronizacion REST.
Gestion dinamica de simbolos
En combined streams, se pueden anadir o quitar simbolos sin recrear la conexion WebSocket:
with MarketStreamManager(config) as manager:
ids = manager.add_trade_streams(["BTCUSDT", "ETHUSDT"])
time.sleep(5)
# Anadir un simbolo al primer worker combinado
manager.subscribe_symbol(ids[0], "SOLUSDT")
# Quitar un simbolo
manager.unsubscribe_symbol(ids[0], "ETHUSDT")
Futuros USD-M
from market_streaming import MarketStreamManager, StreamConfig, Market
config = StreamConfig(market=Market.FUTURES_USD)
with MarketStreamManager(config) as manager:
manager.add_trade_stream("BTCUSDT")
manager.add_order_book_stream("BTCUSDT")
# ...
Futuros COIN-M
from market_streaming import StreamConfig, Market
config = StreamConfig(market=Market.FUTURES_COIN)
# Los pares de COIN-M usan formato distinto (ej: BTCUSD_PERP)
API publica
MarketStreamManager
Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.
Ciclo de vida
| Metodo | Descripcion |
|---|---|
start() |
Arranca el motor en un thread dedicado |
stop() |
Detiene todos los streams y libera recursos |
Soporta context manager (with MarketStreamManager(config) as mgr:).
Registro de streams individuales
| Metodo | Descripcion |
|---|---|
add_trade_stream(symbol) |
Trades individuales |
add_order_book_stream(symbol, depth=20) |
Order book (snapshot o diff depth) |
add_agg_trade_stream(symbol) |
Trades agregados (menor volumen de mensajes) |
add_kline_stream(symbol, interval="1m") |
Velas/candlesticks en tiempo real |
add_ticker_stream(symbol) |
Estadisticas 24h completas (OHLC, volumen, bid/ask) |
add_mini_ticker_stream(symbol) |
Estadisticas 24h reducidas (OHLC, volumen) |
add_book_ticker_stream(symbol) |
Mejor bid/ask en tiempo real (minima latencia) |
remove_stream(stream_id) |
Detiene y elimina un stream |
Cada metodo devuelve un stream_id (ej: trade_BTCUSDT, kline_BTCUSDT_1m, ticker_BTCUSDT).
Registro de streams masivos (combinados)
| Metodo | Descripcion |
|---|---|
add_trade_streams(symbols, batch_size=200) |
Trades para multiples simbolos |
add_order_book_streams(symbols, depth=None, batch_size=200) |
Order books para multiples simbolos |
add_agg_trade_streams(symbols, batch_size=200) |
AggTrades para multiples simbolos |
add_kline_streams(symbols, interval="1m", batch_size=200) |
Klines para multiples simbolos |
add_ticker_streams(symbols, batch_size=200) |
Tickers 24h para multiples simbolos |
add_mini_ticker_streams(symbols, batch_size=200) |
MiniTickers para multiples simbolos |
add_book_ticker_streams(symbols, batch_size=200) |
BookTickers para multiples simbolos |
Estos metodos crean CombinedStreamWorkers que multiplexan hasta batch_size simbolos por conexion WebSocket usando el endpoint /stream?streams=... de Binance. Devuelven una lista de worker IDs.
Gestion dinamica de simbolos
| Metodo | Descripcion |
|---|---|
subscribe_symbol(stream_id, symbol) |
Anade un simbolo a un combined stream existente |
unsubscribe_symbol(stream_id, symbol) |
Quita un simbolo de un combined stream existente |
Estas operaciones envian SUBSCRIBE/UNSUBSCRIBE por el WebSocket existente, sin recrear la conexion.
Consulta de datos
| Metodo | Retorno |
|---|---|
get_last_trade(symbol) |
Trade o None |
get_recent_trades(symbol, limit=100) |
list[Trade] |
get_order_book(symbol) |
OrderBookSnapshot o None |
get_last_agg_trade(symbol) |
AggTrade o None |
get_recent_agg_trades(symbol, limit=100) |
list[AggTrade] |
get_kline(symbol, interval="1m") |
Kline o None |
get_ticker(symbol) |
Ticker o None |
get_mini_ticker(symbol) |
MiniTicker o None |
get_book_ticker(symbol) |
BookTicker o None |
Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.
Observabilidad
| Metodo | Descripcion |
|---|---|
get_stream_status(stream_id) |
Estado de un stream individual |
get_all_streams_status() |
Estado de todos los streams |
get_health() |
Diagnostico de salud global (SystemHealth) |
get_stats() |
Estadisticas resumidas (dict) |
is_healthy() |
True si todos los streams estan sanos |
print_summary() |
Imprime resumen por logging |
Modelos de datos
Trade
@dataclass(frozen=True)
class Trade:
symbol: str # par de trading (ej: "BTCUSDT")
trade_id: int # ID unico del trade en Binance
price: float # precio de ejecucion
quantity: float # cantidad ejecutada
quote_quantity: float # cantidad en moneda cotizada
buyer_maker: bool # True si el comprador es el maker (trade = venta)
timestamp_ms: int # timestamp del trade en milisegundos
price_str: str # precio como cadena original (precision completa)
quantity_str: str # cantidad como cadena original
quote_quantity_str: str
is_best_match: bool # solo Spot
event_time_ms: int # timestamp del evento WebSocket
Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.
OrderBookSnapshot
@dataclass
class OrderBookSnapshot:
symbol: str
bids: list[OrderBookLevel] # ordenados precio descendente (mejor bid primero)
asks: list[OrderBookLevel] # ordenados precio ascendente (mejor ask primero)
last_update_id: int # ID de la ultima actualizacion aplicada
timestamp: float # time.time() de la ultima actualizacion
event_time_ms: int
transaction_time_ms: int
is_synced: bool # True si paso por REST snapshot + diffs (book fiable)
is_synced indica si el order book fue sincronizado correctamente con un REST snapshot. Un book con is_synced=False solo contiene diffs parciales y puede estar incompleto. Tras la sincronizacion REST, is_synced pasa a True y se preserva en actualizaciones posteriores.
OrderBookLevel
@dataclass(frozen=True)
class OrderBookLevel:
price: float
quantity: float
price_str: str # cadena original de Binance
quantity_str: str
Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.
AggTrade
@dataclass(frozen=True)
class AggTrade:
symbol: str
agg_trade_id: int # ID del trade agregado
price: float
quantity: float
first_trade_id: int # rango de trades individuales incluidos
last_trade_id: int
timestamp_ms: int
buyer_maker: bool
price_str: str
quantity_str: str
event_time_ms: int
Agrupa uno o mas trades individuales ejecutados al mismo precio y en la misma orden. Incluye to_rest_format() compatible con GET /api/v3/aggTrades.
Kline
@dataclass(frozen=True)
class Kline:
symbol: str
interval: str # "1m", "5m", "1h", "1d", etc.
open_time_ms: int
close_time_ms: int
open: float
close: float
high: float
low: float
volume: float
quote_volume: float
trades: int
taker_buy_volume: float
taker_buy_quote_volume: float
is_closed: bool # True cuando la vela es definitiva
# + campos _str para precision original
Se actualiza en tiempo real mientras la vela esta abierta. Cuando is_closed=True, la vela es definitiva. Incluye to_rest_format() compatible con GET /api/v3/klines.
Ticker
@dataclass(frozen=True)
class Ticker:
symbol: str
price_change: float
price_change_pct: float
weighted_avg_price: float
last_price: float
best_bid_price: float
best_ask_price: float
open_price: float
high_price: float
low_price: float
volume: float
quote_volume: float
trade_count: int
# + 15 campos mas (prev_close, quantities, times, trade IDs, strings)
Estadisticas completas de 24h rolling. Incluye to_rest_format() compatible con GET /api/v3/ticker/24hr.
MiniTicker
@dataclass(frozen=True)
class MiniTicker:
symbol: str
close_price: float
open_price: float
high_price: float
low_price: float
volume: float
quote_volume: float
# + campos _str
Version ligera de Ticker con solo OHLC y volumen.
BookTicker
@dataclass(frozen=True)
class BookTicker:
symbol: str
update_id: int
best_bid_price: float
best_bid_quantity: float
best_ask_price: float
best_ask_quantity: float
# + campos _str
Mejor bid/ask en tiempo real. El stream mas rapido para obtener spread. Incluye to_rest_format() compatible con GET /api/v3/ticker/bookTicker.
StreamState
Estados posibles de un stream:
| Estado | Significado |
|---|---|
starting |
Iniciando conexion |
running |
Conectado y recibiendo datos |
reconnecting |
Reconectando tras un error |
stale |
Conectado pero sin datos recientes |
stopped |
Detenido de forma limpia |
failed |
Error fatal, no reconectable |
SystemHealth
@dataclass
class SystemHealth:
is_healthy: bool # True si todos los streams estan sanos
total_streams: int
running_streams: int
stale_streams: int
reconnecting_streams: int
stopped_streams: int
uptime_s: float # segundos desde el arranque
total_messages: int # total de mensajes procesados
stream_details: dict[str, StreamStatus]
Configuracion
Toda la configuracion se centraliza en StreamConfig:
from market_streaming import StreamConfig, Market
config = StreamConfig(
# Mercado (default: Spot)
market=Market.SPOT, # SPOT | FUTURES_USD | FUTURES_COIN
# base_url="wss://...", # si se pasa, tiene prioridad sobre market
# Conexion
ping_interval_s=20.0,
ping_timeout_s=10.0,
# Cache
max_recent_trades=1000,
default_order_book_depth=20, # >20 usa diff depth stream
# Reconexion
reconnect_base_delay_s=1.0,
reconnect_max_delay_s=60.0,
reconnect_jitter_factor=0.2,
max_reconnect_attempts=0, # 0 = sin limite
# Health
stale_threshold_s=30.0,
# Logging
log_level=logging.INFO,
log_dir="logs",
log_max_bytes=10 * 1024 * 1024, # 10 MB por archivo
log_backup_count=5, # max 5 archivos rotados
)
Mercados soportados
| Market | URL WebSocket | Ejemplo de par |
|---|---|---|
Market.SPOT |
wss://stream.binance.com:9443/ws |
BTCUSDT |
Market.FUTURES_USD |
wss://fstream.binance.com/ws |
BTCUSDT |
Market.FUTURES_COIN |
wss://dstream.binance.com/ws |
BTCUSD_PERP |
Todos los campos tienen valores por defecto razonables. StreamConfig() sin argumentos conecta a Spot.
Logging
El sistema genera dos archivos de log con rotacion automatica:
| Archivo | Nivel | Uso |
|---|---|---|
binflow.log |
DEBUG+ | Analisis completo, debug, tracing |
binflow.error.log |
WARNING+ | Deteccion rapida de fallos e interrupciones |
Rotacion y limites de disco
- Cada archivo rota al alcanzar
log_max_bytes(default: 10 MB) - Se mantienen como maximo
log_backup_countarchivos rotados (default: 5) - Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
- Los archivos rotados se nombran
binflow.log.1,binflow.log.2, etc.
Formato de archivo
2026-03-06 20:15:32 | INFO | market_streaming.worker:_connect_and_consume:178 | Stream trade_BTCUSDT: conexion establecida
2026-03-06 20:15:33 | WARNING | market_streaming.worker:_run:123 | Error en stream depth_BTCUSDT (intento 1): [Errno 111] Connection refused
Configuracion de logging
import logging
from market_streaming import StreamConfig
# Produccion: solo errores en consola, todo en disco
config = StreamConfig(
log_level=logging.ERROR,
log_dir="logs",
log_max_bytes=10 * 1024 * 1024,
log_backup_count=5,
)
# Desarrollo: verbose en consola, sin disco
config = StreamConfig(
log_level=logging.DEBUG,
log_dir="", # desactiva archivos en disco
)
# Directorio personalizado
config = StreamConfig(log_dir="/var/log/binflow")
Arquitectura
MarketStreamManager API publica sincrona (fachada)
|
+-- threading.Thread Thread dedicado con event loop asyncio
| |
| +-- WebSocketStreamWorker (trade_BTCUSDT) 1 simbolo / conexion
| +-- WebSocketStreamWorker (kline_BTCUSDT_1m) 1 simbolo / conexion
| +-- CombinedStreamWorker (combined_trades_0) hasta 200 simbolos / conexion
| +-- CombinedStreamWorker (combined_tickers_0) hasta 200 simbolos / conexion
| +-- ... (7 tipos de stream soportados)
|
+-- BinancePublicClient REST client (panzer) con rate limiting automatico
| Usado por workers para sync de order books
|
+-- MarketDataCache Cache en memoria (threading.Lock)
| Anti-regresion: rechaza snapshots con ID anterior
|
+-- HealthMonitor Evaluacion de salud
|
+-- Logging RotatingFileHandler (general + errores)
Workers
| Worker | Uso | Conexiones |
|---|---|---|
WebSocketStreamWorker |
1 simbolo por conexion (add_*_stream) |
1 WS por stream |
CombinedStreamWorker |
Hasta 200 simbolos por conexion (add_*_streams) |
1 WS por batch |
Ambos workers soportan los 7 tipos de stream. Los workers combinados usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance, desenvuelven el formato {stream, data}, y permiten SUBSCRIBE/UNSUBSCRIBE dinamico de simbolos sin recrear la conexion.
Modelo de concurrencia
El sistema usa un thread dedicado con event loop asyncio:
- API publica sincrona: el codigo cliente llama
manager.start(),manager.get_last_trade(), etc. sinawait. Esto permite usar el motor desde cualquier contexto Python. - Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
- Cache thread-safe:
threading.Lockprotege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).
Reconexion
Cada worker gestiona su propia reconexion:
- Error de conexion o desconexion detectada
- Backoff exponencial:
min(base * 2^n, max_delay) - Jitter aleatorio de +-20% sobre el delay calculado
- Contador de reconexiones incrementado
- Si se supera
max_reconnect_attempts(y no es 0), el stream pasa afailed - Errores fatales (ej: stream invalido) no reconectan
Depth streams y sincronizacion
| Profundidad | Stream | Sincronizacion |
|---|---|---|
| 5, 10, 20 | symbol@depth{N} (Partial Book Depth) |
Snapshots completos cada ~1s, no requiere REST |
| Cualquier otro valor | symbol@depth (Diff Depth) |
REST snapshot + diffs incrementales |
Para el diff depth stream, el flujo de sincronizacion sigue el protocolo oficial de Binance:
- Conectar al diff depth stream y almacenar los mensajes en un buffer
- Pedir snapshot REST via
panzer.BinancePublicClient.depth()(single) obulk_depth()(masivo) - Filtrar diffs: descartar los anteriores al
lastUpdateIddel snapshot (reglas Spot vs Futures) - Aplicar el buffer filtrado sobre el snapshot
- Marcar
is_synced=Truey continuar con actualizaciones en tiempo real
El cache protege contra regresiones: si un snapshot REST llega con un lastUpdateId menor al que ya tiene el book (porque los diffs ya avanzaron), se rechaza. La excepcion es cuando el snapshot viene con is_synced=True y el book existente no esta sincronizado.
Cache en memoria
| Dato | Estructura | Acceso |
|---|---|---|
| Ultimo trade por simbolo | dict[symbol, Trade] |
O(1) |
| Trades recientes por simbolo | dict[symbol, deque(maxlen=N)] |
O(1) amortizado |
| Order book por simbolo | dict[symbol, OrderBookSnapshot] |
O(1) |
| Ultimo aggTrade por simbolo | dict[symbol, AggTrade] |
O(1) |
| AggTrades recientes | dict[symbol, deque(maxlen=N)] |
O(1) amortizado |
| Kline por simbolo e intervalo | dict[symbol, dict[interval, Kline]] |
O(1) |
| Ticker 24h por simbolo | dict[symbol, Ticker] |
O(1) |
| MiniTicker por simbolo | dict[symbol, MiniTicker] |
O(1) |
| BookTicker por simbolo | dict[symbol, BookTicker] |
O(1) |
El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero. Cada worker puede especificar su propia profundidad maxima (max_depth), independiente del default global.
Proteccion anti-regresion: update_order_book() rechaza snapshots con lastUpdateId menor al existente, excepto cuando el snapshot nuevo esta sincronizado (is_synced=True) y el existente no. Esto protege contra respuestas REST que llegan tarde cuando los diffs del WebSocket ya avanzaron el estado del book.
Estructura del proyecto
market_streaming/
__init__.py Re-exports de la API publica
config.py StreamConfig, Market, MARKET_URLS
models.py Trade, AggTrade, Kline, Ticker, MiniTicker, BookTicker, OrderBook*, Stream*
exceptions.py Excepciones del dominio
cache.py MarketDataCache
worker.py WebSocketStreamWorker, CombinedStreamWorker
health.py HealthMonitor
manager.py MarketStreamManager
logging_config.py Setup de logging con rotacion
tests/
test_cache.py 40 tests - trades, order book, aggTrade, kline, ticker, miniTicker, bookTicker
test_cache_concurrent.py 3 tests - lectura/escritura concurrente multi-thread
test_models.py 37 tests - inmutabilidad, defaults, config, todos los modelos nuevos
test_health.py 8 tests - deteccion stale, salud global
test_worker.py 54 tests - URLs, parsing, backoff, builders y dispatch de todos los tipos
test_manager.py 12 tests - lifecycle, streams, observabilidad
test_logging.py 13 tests - handlers, rotacion, archivos, teardown
test_load.py 8 tests - carga: 728 symbols trades, 60s
test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos
examples/
inspect_trade.py Estructura del Trade: campos, tipos, precision, formato REST
inspect_order_book.py OrderBookSnapshot: estructura, niveles, formato REST
inspect_config.py StreamConfig: campos, Market enum, URLs (sin conexion)
inspect_health.py SystemHealth, StreamStatus, StreamState
live_trades.py Feed de trades en tiempo real
live_order_book.py Order book visual con barras de volumen y spread
multi_market.py Tabla comparativa multi-simbolo en tiempo real
notebooks/
streaming_demo.ipynb Demo completa: stream, inspeccion, acumulacion, verificacion
Ejemplos
Todos los ejemplos se ejecutan con python examples/<nombre>.py.
Inspeccion de datos (sin conexion)
python examples/inspect_trade.py # estructura Trade
python examples/inspect_order_book.py # estructura OrderBookSnapshot
python examples/inspect_config.py # StreamConfig, Market, URLs
python examples/inspect_health.py # SystemHealth, StreamStatus
Streams en vivo
python examples/live_trades.py # feed de trades BTCUSDT en tiempo real
python examples/live_order_book.py # order book visual con spread
python examples/multi_market.py # tabla comparativa multi-simbolo
Notebook
jupyter notebook notebooks/streaming_demo.ipynb
Demo interactiva: arranque de trades y order book, inspeccion de estructuras, acumulacion de 2 minutos con verificacion de continuidad, order book sincronizado con spread, klines en tiempo real, tickers, bookTicker para spread, y shutdown limpio.
Tests
# Ejecutar todos los tests unitarios (169 tests)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v
# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s # 728 symbols, depth 200, 60s
# Todos los tests (190 tests)
pytest tests/ -v -s
# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v
Pruebas de carga
Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:
- Continuidad: trade IDs consecutivos sin huecos
- Unicidad: cero duplicados
- Orden: IDs estrictamente crecientes
- Timestamps: no decrecientes
- Profundidad: order books de hasta 200 niveles por lado
- Integridad estructural: bids descendentes, asks ascendentes, spread positivo
- Precision: cadenas originales de Binance preservadas
- Estabilidad: todos los workers running, cero reconexiones
Limitaciones
- No persiste datos en disco. Si el proceso muere, se pierde el cache.
- Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
- No implementa autenticacion para streams privados (user data streams).
- No implementa throttle de conexiones WebSocket (max 5/s de Binance). Con muchos workers arrancando simultaneamente, existe riesgo de alcanzar el limite.
Licencia
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file binflow-0.3.0.tar.gz.
File metadata
- Download URL: binflow-0.3.0.tar.gz
- Upload date:
- Size: 51.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c16c718b865b5c7564d64eb96a68ecf4d2421661dd9058367175eb94ed85c392
|
|
| MD5 |
5eaace6ca89792d013b23d11b881aea7
|
|
| BLAKE2b-256 |
6c6fa7bf67857dfceefda9326fd80989c0df7d297322f589bb6d17615f6c4957
|
File details
Details for the file binflow-0.3.0-py3-none-any.whl.
File metadata
- Download URL: binflow-0.3.0-py3-none-any.whl
- Upload date:
- Size: 34.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7166116296c9c5df3a3898492f0fd6bfa43387c01c263f262d1f8e98680d956b
|
|
| MD5 |
5a59eb9a3d17f4a3057d38fff2f211cc
|
|
| BLAKE2b-256 |
7cb823daa69f99289b9e275e611aca3546ab842c24982877238d492e749093ce
|