Persistent WebSocket engine for Binance market data (Spot, USD-M, COIN-M)
Project description
binflow
Motor de WebSockets persistente para consumo de datos de mercado en tiempo real desde Binance.
Soporta Spot, USD-M Futures y COIN-M Futures. Componente de infraestructura reutilizable que mantiene un cache de mercado vivo en memoria, con reconexion automatica, observabilidad integrada y API sincrona simple.
Filosofia
binflow es una libreria de infraestructura de datos de mercado. Su responsabilidad es servir datos fiables, continuos y disponibles.
Lo que SI hace binflow:
- Mantener conexiones WebSocket vivas y reconectar automaticamente
- Garantizar continuidad del dato (sin gaps en trade IDs)
- Servir datos normalizados, listos para consumir
- Escalar a cientos de simbolos con eficiencia (combined streams)
- Order books correctos e integros en memoria
Lo que NO hace binflow (responsabilidad del consumidor):
- Persistir datos en base de datos (TimescaleDB, Parquet, etc.)
- Callbacks o event-driven processing
- Metricas y monitoring (Prometheus, Grafana)
- Logica de trading o senales
Otra libreria o servicio debe consumir de binflow via polling (.get(), .get_recent()) y encargarse del resto.
Caracteristicas
- Multi-mercado -- Spot, USD-M Futures y COIN-M Futures con un solo parametro
- 8 tipos de stream -- trades, order book, aggTrades, klines, ticker 24h, miniTicker, bookTicker, liquidaciones (forceOrder)
- Streams combinados -- multiplexa cientos de simbolos en pocas conexiones WebSocket
- Gestion dinamica -- subscribe/unsubscribe de simbolos en combined streams sin recrear la conexion
- Order books sincronizados -- REST snapshot + diffs por WebSocket, con campo
is_syncedque indica integridad verificada - Order books profundos -- hasta 5000 niveles (Spot) o 1000 (Futures) por lado
- Rate limiting automatico -- delegado a panzer, respeta cabeceras
X-MBX-USED-WEIGHTde Binance - Anti-regresion -- el cache rechaza snapshots con
lastUpdateIdanterior al existente, protegiendo contra datos stale - Cache en memoria thread-safe con acceso O(1)
- Reconexion automatica con backoff exponencial y jitter
- API sincrona -- se usa desde codigo Python normal, sin
async/await - Observabilidad -- estado de streams, metricas, deteccion de degradacion
- Logging persistente -- rotacion por tamano, archivo de errores separado, limites de disco
- Shutdown limpio -- sin tareas pendientes ni warnings
- Context manager -- soporte
withpara manejo automatico del ciclo de vida - GC-safe -- si se pierde la referencia sin llamar a
stop(), el garbage collector limpia automaticamente (ideal para notebooks) - Probado bajo carga -- 728 simbolos simultaneos, 60s, cero gaps en trade IDs
Instalacion
pip install binflow
O con dependencias de desarrollo:
pip install -e ".[dev]"
Requisitos
- Python >= 3.11
websockets >= 13.0panzer >= 2.4.0(rate limiting, peticiones REST a Binance, liquidaciones)
Inicio rapido
import time
from market_streaming import Manager, Config
# Cada manager = un mercado + un tipo de stream + N simbolos
config = Config(
market="spot",
stream_type="trade",
symbols=["BTCUSDT", "ETHUSDT"],
max_history=500,
log_level="info",
)
with Manager(config) as manager:
time.sleep(5)
# Un simbolo: devuelve Trade o None
trade = manager.get("BTCUSDT")
if trade:
print(f"{trade.price} x {trade.quantity}")
# Todos los simbolos: devuelve dict[str, Trade]
all_trades = manager.get()
# Historial reciente
recent = manager.get_recent("BTCUSDT", limit=50)
# Anadir simbolos en caliente
manager.add_symbols(["SOLUSDT", "ADAUSDT"])
No es necesario importar Market, StreamType ni logging; todos los campos aceptan strings.
Multiples managers
Para diferentes tipos de datos, se usa un manager por tipo:
from market_streaming import Manager, Config
symbols = ["BTCUSDT", "ETHUSDT"]
trades = Manager(Config(
market="spot", stream_type="trade", symbols=symbols,
max_history=1000,
))
books = Manager(Config(
market="spot", stream_type="depth", symbols=symbols,
depth=200,
))
trades.start()
books.start()
# Ver todas las instancias en memoria
Manager.dashboard()
Order books profundos y sincronizados
Para profundidades > 20, el sistema usa automaticamente el diff depth stream de Binance con sincronizacion REST:
- Conecta al diff depth stream y buferea mensajes
- Pide un snapshot REST completo via panzer (con rate limiting automatico)
- Filtra los diffs anteriores al snapshot
- Aplica los diffs posteriores sobre el snapshot
- Marca el order book como
is_synced=True
config = Config(
stream_type="depth",
symbols=["BTCUSDT", "ETHUSDT"],
depth=200,
)
with Manager(config) as manager:
time.sleep(10)
book = manager.get("BTCUSDT")
print(f"Niveles: {len(book.bids)} bids, {len(book.asks)} asks")
print(f"Sincronizado: {book.is_synced}")
Para streams masivos (cientos de simbolos), panzer gestiona las peticiones REST en paralelo con bulk_depth(), respetando los rate limits de Binance.
Nota: profundidades de 5, 10 y 20 usan el Partial Book Depth Stream (snapshots completos cada 100ms, sin necesidad de REST sync). Cualquier otro valor usa el Diff Depth Stream (diffs cada 100ms) con sincronizacion REST.
Gestion dinamica de simbolos
Se pueden anadir o quitar simbolos sin recrear la conexion WebSocket:
with Manager(config) as manager:
manager.add_symbols(["SOLUSDT", "ADAUSDT"])
manager.remove_symbols("ETHUSDT")
Reconfiguracion en caliente
Se pueden cambiar parametros del stream sin detenerlo ni perder datos:
with Manager(config) as manager:
time.sleep(5)
# Cambiar tamano del buffer de trades/aggTrades/liquidaciones
manager.reconfigure(max_history=2000)
# Cambiar profundidad del order book
manager.reconfigure(depth=500)
# Cambiar intervalo de klines
manager.reconfigure(interval="5m")
Cada stream_type acepta solo sus parametros relevantes:
stream_type |
Parametro reconfigurable | Efecto |
|---|---|---|
"trade", "agg_trade", "force_order" |
max_history |
Redimensiona el deque. Si se reduce, los datos mas antiguos se descartan |
"depth" |
depth |
Cambia niveles del book. Si se reduce, trunca. Si se aumenta, resincroniza via REST |
"kline" |
interval |
Cambia el intervalo de vela (resubscribe al nuevo stream) |
No se pueden cambiar market, stream_type ni symbols (definen la identidad del Manager; usar add_symbols/remove_symbols para simbolos).
Futuros USD-M y COIN-M
config = Config(market="futures_usd", stream_type="trade", symbols=["BTCUSDT"], max_history=500)
# COIN-M: market="futures_coin", pares tipo BTCUSD_PERP
Liquidaciones (solo futuros)
config = Config(
market="futures_usd",
stream_type="force_order", # o "liquidation"
symbols=["BTCUSDT", "ETHUSDT"],
max_history=500,
)
with Manager(config) as mgr:
time.sleep(10)
# Ultima liquidacion de un simbolo
liq = mgr.get("BTCUSDT")
if liq:
print(f"{liq.symbol} {liq.side} {liq.price} x {liq.original_qty} ({liq.status})")
# Historial reciente
recent = mgr.get_recent("BTCUSDT", limit=50)
Solo disponible en market="futures_usd" y market="futures_coin". En spot lanza ValueError.
API publica
Manager
Fachada principal del sistema. Gestiona streams, expone datos del cache y metricas.
Ciclo de vida
| Metodo | Descripcion |
|---|---|
start() |
Arranca el motor en un thread dedicado |
stop() |
Detiene todos los streams y libera recursos |
Soporta context manager (with Manager(config) as mgr:).
Gestion de simbolos
| Metodo | Descripcion |
|---|---|
add_symbols(symbols, batch_size=200) |
Anade simbolos (str o lista, case-insensitive) |
remove_symbols(symbols) |
Elimina simbolos del manager |
Los simbolos se pueden registrar antes de start(). Se arrancan automaticamente al iniciar.
Reconfiguracion en caliente
| Metodo | Descripcion |
|---|---|
reconfigure(max_history=N) |
Redimensiona el buffer de trades/aggTrades/liquidaciones sin detener el stream |
reconfigure(depth=N) |
Cambia la profundidad del order book (trunca o resincroniza via REST) |
reconfigure(interval="5m") |
Cambia el intervalo de klines (resubscribe al nuevo stream) |
No se pueden cambiar market, stream_type ni symbols via reconfigure().
Dashboard
| Metodo | Descripcion |
|---|---|
Manager.dashboard() |
Muestra todas las instancias en memoria |
Consulta de datos
| Metodo | Descripcion |
|---|---|
get(symbol) |
Devuelve el dato actual del simbolo, o None si no hay datos |
get(symbols_list) |
Devuelve dict[str, dato] con los simbolos que tienen datos |
get() |
Devuelve dict[str, dato] con todos los simbolos en cache |
get_recent(symbol, limit=100) |
Historial reciente como lista |
El tipo de retorno depende del stream_type del manager:
stream_type |
.get(symbol) |
.get_recent() |
|---|---|---|
"trade" |
Trade |
list[Trade] (deque, hasta limit) |
"agg_trade" |
AggTrade |
list[AggTrade] (deque, hasta limit) |
"depth" |
OrderBookSnapshot |
[OrderBookSnapshot] |
"kline" |
Kline |
[Kline] |
"ticker" |
Ticker |
[Ticker] |
"mini_ticker" |
MiniTicker |
[MiniTicker] |
"book_ticker" |
BookTicker |
[BookTicker] |
"force_order" |
Liquidation |
list[Liquidation] (deque, hasta limit) |
Para trade, agg_trade y force_order, .get_recent() devuelve el historial del deque (hasta limit elementos). Para el resto de tipos, devuelve una lista con el dato actual si existe, o lista vacia.
Los simbolos son case-insensitive: "btcusdt" y "BTCUSDT" son equivalentes.
Lectura no destructiva: .get() y .get_recent() son de solo lectura. No consumen, borran ni alteran los datos del cache. Llamar a estos metodos cualquier numero de veces devuelve siempre el mismo estado (o uno mas reciente si llego un mensaje nuevo del WebSocket entre llamadas). Los datos solo cambian por dos motivos:
- Llegada de datos nuevos desde el WebSocket (el dato mas reciente se sobreescribe; en deques, los mas antiguos se descartan al llenarse).
- Reconfiguracion via
reconfigure()(por ejemplo, reducirmax_historydescarta los elementos mas antiguos que no caben en el nuevo tamano).
Para order books, .get() devuelve una copia del snapshot actual, por lo que modificar el objeto devuelto no afecta al cache interno.
Observabilidad
| Metodo | Descripcion |
|---|---|
get_stream_status(stream_id) |
Estado de un stream individual |
get_all_streams_status() |
Estado de todos los streams |
get_health() |
Diagnostico de salud global (SystemHealth) |
get_stats() |
Estadisticas resumidas (dict) |
is_healthy() |
True si todos los streams estan sanos |
print_summary() |
Imprime resumen por logging |
Modelos de datos
Trade
@dataclass(frozen=True)
class Trade:
symbol: str # par de trading (ej: "BTCUSDT")
trade_id: int # ID unico del trade en Binance
price: float # precio de ejecucion
quantity: float # cantidad ejecutada
quote_quantity: float # cantidad en moneda cotizada
buyer_maker: bool # True si el comprador es el maker (trade = venta)
timestamp_ms: int # timestamp del trade en milisegundos
price_str: str # precio como cadena original (precision completa)
quantity_str: str # cantidad como cadena original
quote_quantity_str: str
is_best_match: bool # solo Spot
event_time_ms: int # timestamp del evento WebSocket
Incluye to_rest_format() que devuelve un dict compatible con GET /api/v3/trades.
OrderBookSnapshot
@dataclass
class OrderBookSnapshot:
symbol: str
bids: list[OrderBookLevel] # ordenados precio descendente (mejor bid primero)
asks: list[OrderBookLevel] # ordenados precio ascendente (mejor ask primero)
last_update_id: int # ID de la ultima actualizacion aplicada
timestamp: float # time.time() de la ultima actualizacion
event_time_ms: int
transaction_time_ms: int
is_synced: bool # True si paso por REST snapshot + diffs (book fiable)
is_synced indica si el order book fue sincronizado correctamente con un REST snapshot. Un book con is_synced=False solo contiene diffs parciales y puede estar incompleto. Tras la sincronizacion REST, is_synced pasa a True y se preserva en actualizaciones posteriores.
OrderBookLevel
@dataclass(frozen=True)
class OrderBookLevel:
price: float
quantity: float
price_str: str # cadena original de Binance
quantity_str: str
Incluye to_rest_format() que devuelve [price_str, quantity_str] compatible con la REST API.
AggTrade
@dataclass(frozen=True)
class AggTrade:
symbol: str
agg_trade_id: int # ID del trade agregado
price: float
quantity: float
first_trade_id: int # rango de trades individuales incluidos
last_trade_id: int
timestamp_ms: int
buyer_maker: bool
price_str: str
quantity_str: str
event_time_ms: int
Agrupa uno o mas trades individuales ejecutados al mismo precio y en la misma orden. Incluye to_rest_format() compatible con GET /api/v3/aggTrades.
Kline
@dataclass(frozen=True)
class Kline:
symbol: str
interval: str # "1m", "5m", "1h", "1d", etc.
open_time_ms: int
close_time_ms: int
open: float
close: float
high: float
low: float
volume: float
quote_volume: float
trades: int
taker_buy_volume: float
taker_buy_quote_volume: float
is_closed: bool # True cuando la vela es definitiva
# + campos _str para precision original
Se actualiza en tiempo real mientras la vela esta abierta. Cuando is_closed=True, la vela es definitiva. Incluye to_rest_format() compatible con GET /api/v3/klines.
Ticker
@dataclass(frozen=True)
class Ticker:
symbol: str
price_change: float
price_change_pct: float
weighted_avg_price: float
last_price: float
best_bid_price: float
best_ask_price: float
open_price: float
high_price: float
low_price: float
volume: float
quote_volume: float
trade_count: int
# + 15 campos mas (prev_close, quantities, times, trade IDs, strings)
Estadisticas completas de 24h rolling. Incluye to_rest_format() compatible con GET /api/v3/ticker/24hr.
MiniTicker
@dataclass(frozen=True)
class MiniTicker:
symbol: str
close_price: float
open_price: float
high_price: float
low_price: float
volume: float
quote_volume: float
# + campos _str
Version ligera de Ticker con solo OHLC y volumen.
BookTicker
@dataclass(frozen=True)
class BookTicker:
symbol: str
update_id: int
best_bid_price: float
best_bid_quantity: float
best_ask_price: float
best_ask_quantity: float
# + campos _str
Mejor bid/ask en tiempo real. El stream mas rapido para obtener spread. Incluye to_rest_format() compatible con GET /api/v3/ticker/bookTicker.
Liquidation
@dataclass(frozen=True)
class Liquidation:
symbol: str
side: str # "SELL" (long liquidado) o "BUY" (short liquidado)
order_type: str # normalmente "LIMIT"
time_in_force: str # normalmente "IOC"
original_qty: float
price: float
average_price: float
status: str # "NEW", "PARTIALLY_FILLED", "FILLED"
last_filled_qty: float
filled_qty: float
trade_time_ms: int
event_time_ms: int
# + campos _str para precision original
Orden de liquidacion forzada en Binance Futures. Solo disponible en mercados de futuros (USD-M y COIN-M). Incluye to_rest_format() compatible con GET /fapi/v1/forceOrders.
StreamState
Estados posibles de un stream:
| Estado | Significado |
|---|---|
starting |
Iniciando conexion |
running |
Conectado y recibiendo datos |
reconnecting |
Reconectando tras un error |
stale |
Conectado pero sin datos recientes |
stopped |
Detenido de forma limpia |
failed |
Error fatal, no reconectable |
SystemHealth
@dataclass
class SystemHealth:
is_healthy: bool # True si todos los streams estan sanos
total_streams: int
running_streams: int
stale_streams: int
reconnecting_streams: int
stopped_streams: int
uptime_s: float # segundos desde el arranque
total_messages: int # total de mensajes procesados
stream_details: dict[str, StreamStatus]
Configuracion
Toda la configuracion se centraliza en Config. Todos los campos tipo enum aceptan strings:
from market_streaming import Config
config = Config(
# Mercado y stream
market="spot", # "spot" | "futures_usd" | "futures_coin"
stream_type="trade", # "trade" | "depth" | "agg_trade" | "kline" | "ticker" | "mini_ticker" | "book_ticker" | "force_order"
symbols=["BTCUSDT"], # case-insensitive, se normalizan a mayusculas
interval="1m", # solo para stream_type="kline"
# Cache (obligatorios segun stream_type)
max_history=1000, # obligatorio para "trade" y "agg_trade": trades recientes en cache
depth=20, # obligatorio para "depth": niveles por lado (depth=20 = 20 bids + 20 asks)
# Reconexion
reconnect_base_delay_s=1.0,
reconnect_max_delay_s=60.0,
max_reconnect_attempts=0, # 0 = sin limite
# Logging
log_level="info", # "debug" | "info" | "warning" | "error" | "critical"
log_dir="logs",
)
Campos obligatorios segun tipo de stream
stream_type |
Campo obligatorio | Descripcion |
|---|---|---|
"trade" |
max_history |
Cuantos trades recientes mantener en cache |
"agg_trade" |
max_history |
Cuantos aggTrades recientes mantener en cache |
"depth" |
depth |
Niveles por lado del order book (20 = 20 bids + 20 asks) |
"kline" |
-- | Usa interval (default "1m") |
"ticker" |
-- | Sin campos adicionales |
"mini_ticker" |
-- | Sin campos adicionales |
"book_ticker" |
-- | Sin campos adicionales |
"force_order" |
max_history |
Cuantas liquidaciones recientes mantener. Solo futuros |
Mercados soportados
| Market | URL WebSocket | Ejemplo de par |
|---|---|---|
"spot" |
wss://stream.binance.com:9443/ws |
BTCUSDT |
"futures_usd" |
wss://fstream.binance.com/ws |
BTCUSDT |
"futures_coin" |
wss://dstream.binance.com/ws |
BTCUSD_PERP |
Logging
El sistema genera dos archivos de log con rotacion automatica:
| Archivo | Nivel | Uso |
|---|---|---|
binflow.log |
DEBUG+ | Analisis completo, debug, tracing |
binflow.error.log |
WARNING+ | Deteccion rapida de fallos e interrupciones |
Rotacion y limites de disco
- Cada archivo rota al alcanzar
log_max_bytes(default: 10 MB) - Se mantienen como maximo
log_backup_countarchivos rotados (default: 5) - Espacio maximo en disco: ~120 MB (2 archivos x 6 versiones x 10 MB)
- Los archivos rotados se nombran
binflow.log.1,binflow.log.2, etc.
Formato de archivo
2026-03-06 20:15:32 | INFO | market_streaming.worker:_connect_and_consume:178 | Stream trade_0: conexion establecida
2026-03-06 20:15:33 | WARNING | market_streaming.worker:_run:123 | Error en stream depth_0 (intento 1): [Errno 111] Connection refused
Configuracion de logging
from market_streaming import Config
# Produccion: solo errores en consola, todo en disco
config = Config(stream_type="ticker", log_level="error", log_dir="logs")
# Desarrollo: verbose en consola, sin disco
config = Config(stream_type="ticker", log_level="debug", log_dir="")
# Directorio personalizado
config = Config(stream_type="ticker", log_dir="/var/log/binflow")
Arquitectura
Manager 1 manager = 1 mercado + 1 tipo de stream
|
+-- threading.Thread Thread dedicado con event loop asyncio
| |
| +-- Worker (trade_0) hasta 200 simbolos / conexion
| +-- Worker (trade_1) siguiente batch si > 200
| +-- ...
|
+-- BinancePublicClient REST client (panzer) con rate limiting automatico
|
+-- MarketDataCache Cache en memoria (threading.Lock, O(1))
|
+-- HealthMonitor Evaluacion de salud
|
+-- Logging RotatingFileHandler (general + errores)
Los workers usan el endpoint /stream?streams=sym1@trade/sym2@trade/... de Binance, agrupando hasta 200 simbolos por conexion WebSocket. Soportan SUBSCRIBE/UNSUBSCRIBE dinamico sin recrear la conexion.
Modelo de concurrencia
El sistema usa un thread dedicado con event loop asyncio:
- API publica sincrona: el codigo cliente llama
manager.start(),manager.get(), etc. sinawait. Esto permite usar el motor desde cualquier contexto Python. - Event loop interno: un thread daemon ejecuta el loop asyncio donde corren todos los workers WebSocket de forma concurrente.
- Cache thread-safe:
threading.Lockprotege las escrituras (desde el thread asyncio) y las lecturas (desde el thread principal). El contention es despreciable porque las operaciones son O(1).
Reconexion
Cada worker gestiona su propia reconexion:
- Error de conexion o desconexion detectada
- Backoff exponencial:
min(base * 2^n, max_delay) - Jitter aleatorio de +-20% sobre el delay calculado
- Contador de reconexiones incrementado
- Si se supera
max_reconnect_attempts(y no es 0), el stream pasa afailed - Errores fatales (ej: stream invalido) no reconectan
Depth streams y sincronizacion
| Profundidad | Stream | Sincronizacion |
|---|---|---|
| 5, 10, 20 | symbol@depth{N}@100ms (Partial Book Depth) |
Snapshots completos cada 100ms, no requiere REST |
| Cualquier otro valor | symbol@depth@100ms (Diff Depth) |
REST snapshot + diffs incrementales cada 100ms |
Para el diff depth stream, el flujo de sincronizacion sigue el protocolo oficial de Binance:
- Conectar al diff depth stream y almacenar los mensajes en un buffer
- Pedir snapshot REST via
panzer.BinancePublicClient.depth()(single) obulk_depth()(masivo) - Filtrar diffs: descartar los anteriores al
lastUpdateIddel snapshot (reglas Spot vs Futures) - Aplicar el buffer filtrado sobre el snapshot
- Marcar
is_synced=Truey continuar con actualizaciones en tiempo real
El cache protege contra regresiones: si un snapshot REST llega con un lastUpdateId menor al que ya tiene el book (porque los diffs ya avanzaron), se rechaza. La excepcion es cuando el snapshot viene con is_synced=True y el book existente no esta sincronizado.
Cache en memoria
El cache es un repositorio permanente y en memoria que se actualiza continuamente. Los metodos .get() y .get_recent() son de solo lectura: no consumen ni borran datos. Puedes consultarlos tantas veces como quieras sin afectar el estado interno.
| Dato | Estructura | Comportamiento |
|---|---|---|
| Ultimo trade | dict[symbol, Trade] |
Se sobreescribe con cada trade nuevo |
| Trades recientes | dict[symbol, deque(maxlen=max_history)] |
FIFO: los mas antiguos se descartan al llegar nuevos |
| Order book | dict[symbol, OrderBookSnapshot] |
Se actualiza in-place con diffs; .get() devuelve copia |
| Ultimo aggTrade | dict[symbol, AggTrade] |
Se sobreescribe con cada aggTrade nuevo |
| AggTrades recientes | dict[symbol, deque(maxlen=max_history)] |
FIFO: igual que trades |
| Kline | dict[symbol, dict[interval, Kline]] |
Se sobreescribe con cada actualizacion de vela |
| Ticker 24h | dict[symbol, Ticker] |
Se sobreescribe con cada actualizacion |
| MiniTicker | dict[symbol, MiniTicker] |
Se sobreescribe con cada actualizacion |
| BookTicker | dict[symbol, BookTicker] |
Se sobreescribe con cada actualizacion |
| Ultima liquidacion | dict[symbol, Liquidation] |
Se sobreescribe con cada liquidacion nueva |
| Liquidaciones recientes | dict[symbol, deque(maxlen=max_history)] |
FIFO: igual que trades |
El order book soporta tanto snapshots completos como actualizaciones incrementales con eliminacion de niveles con cantidad cero.
Proteccion anti-regresion: update_order_book() rechaza snapshots con lastUpdateId menor al existente, excepto cuando el snapshot nuevo esta sincronizado (is_synced=True) y el existente no. Esto protege contra respuestas REST que llegan tarde cuando los diffs del WebSocket ya avanzaron el estado del book.
Estructura del proyecto
market_streaming/
__init__.py Re-exports de la API publica
config.py Config, Market, MARKET_URLS
models.py Trade, AggTrade, Kline, Ticker, MiniTicker, BookTicker, Liquidation, OrderBook*, Stream*
exceptions.py Excepciones del dominio
cache.py MarketDataCache
worker.py Worker (combined streams, hasta 200 simbolos/conexion)
health.py HealthMonitor
manager.py Manager
logging_config.py Setup de logging con rotacion
tests/
test_manager.py 84 tests - lifecycle, get/get_recent, symbols, stats, GC safety, config validation
test_worker.py 69 tests - URLs, parsing, backoff, builders, dispatch, diff buffering, depth speed
test_cache.py 62 tests - trades, order book, aggTrade, kline, ticker, liquidation, remove_symbol, invariantes
test_models.py 44 tests - inmutabilidad, defaults, config, todos los modelos
test_logging.py 13 tests - handlers, rotacion, archivos, teardown
test_health.py 8 tests - deteccion stale, salud global
test_cache_concurrent.py 3 tests - lectura/escritura concurrente multi-thread
test_load.py 8 tests - carga: 728 symbols trades, 60s
test_load_orderbook.py 13 tests - carga: 728 symbols order books profundos
Tests
# Ejecutar todos los tests unitarios (279 tests, 300 con carga)
pytest tests/ --ignore=tests/test_load.py --ignore=tests/test_load_orderbook.py -v
# Tests de carga contra Binance real (~70s cada uno)
pytest tests/test_load.py -v -s # 728 symbols, trades, 60s
pytest tests/test_load_orderbook.py -v -s # 728 symbols, depth 200, 60s
# Un test especifico
pytest tests/test_cache.py::TestTradeCache::test_trade_ids_continuous -v
Pruebas de carga
Los tests de carga (test_load.py, test_load_orderbook.py) conectan a Binance real y verifican:
- Continuidad: trade IDs consecutivos sin huecos
- Unicidad: cero duplicados
- Orden: IDs estrictamente crecientes
- Timestamps: no decrecientes
- Profundidad: order books de hasta 200 niveles por lado
- Integridad estructural: bids descendentes, asks ascendentes, spread positivo
- Precision: cadenas originales de Binance preservadas
- Estabilidad: todos los workers running, cero reconexiones
Limitaciones
- No persiste datos en disco. Si el proceso muere, se pierde el cache.
- Las metricas son in-process. No hay exportacion a Prometheus, StatsD u otros sistemas externos.
- No implementa autenticacion para streams privados (user data streams).
- Liquidaciones (
force_order) solo disponibles en futuros; Binance Vision no publica datos historicos de liquidaciones. - No implementa throttle de conexiones WebSocket (max 5/s de Binance). Con muchos workers arrancando simultaneamente, existe riesgo de alcanzar el limite.
Licencia
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file binflow-0.6.0.tar.gz.
File metadata
- Download URL: binflow-0.6.0.tar.gz
- Upload date:
- Size: 74.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d38c697005d19615cc1b3a44f33859c63b492ebd30dbed0a27a23d46bde53b82
|
|
| MD5 |
247e8a6dd9e43b0f42a7f99b707e0a99
|
|
| BLAKE2b-256 |
db299ce482d0d3af11051ac6953c72ffe6e075d5c0a85856f61b32e9eb0eb94c
|
File details
Details for the file binflow-0.6.0-py3-none-any.whl.
File metadata
- Download URL: binflow-0.6.0-py3-none-any.whl
- Upload date:
- Size: 41.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb7253c3ccec1c6651ae47fe7f2655ad53a062bfbbdbf09e90158d935d4f21ed
|
|
| MD5 |
856c39df142260f82842729f99a829c0
|
|
| BLAKE2b-256 |
9d8e258b4b09f985ef78bb1987ad3eedcf45fb464f54d0a60fb9ef1a7d78d123
|