High-performance exchange feed parser and orderflow analytics engine with Rust and Python bindings
Project description
fastreader
fastreader is a Python extension module implemented in Rust with PyO3.
It is designed for binary market data workflows:
- parse order and trade packets
- read from cache or stream from disk
- build token-level order books
- export snapshots in dict or CSV-row form
The module currently exposes these core classes:
- MessageCacheReader
- StreamingBinaryLoader
- OrderbookBuilder
It also exposes compatibility wrappers:
- BinaryDataLoader
- BinaryLoader
- ReadMsgFromBinary
1. Architecture
The data path is:
Binary file on disk
|
+--> MessageCacheReader (load full file into RAM)
|
+--> StreamingBinaryLoader (read one message at a time)
|
v
OrderbookBuilder
|
+------------+-------------------+
| |
v v
get_snapshot / get_orderbook_snapshot get_snapshot_row / snapshot_header
(dict output) (CSV output)
Reader strategy
| Reader | Behavior | Best for |
|---|---|---|
| MessageCacheReader | Loads all supported messages into memory | repeated analysis, debug, backtest |
| StreamingBinaryLoader | Keeps file handle open and reads sequentially | very large files, low memory |
2. Install and import
pip install fastreader
from fastreader import MessageCacheReader, StreamingBinaryLoader, OrderbookBuilder
3. Supported message types
Parser supports:
- T : trade
- N : new order
- M : modify order
- X : cancel order
Space bytes at boundaries are skipped.
4. Class: MessageCacheReader
4.1 Constructor
reader = MessageCacheReader()
print(reader)
Expected output:
<builtins.MessageCacheReader object at 0x...>
4.2 load_to_cache(file_path)
Loads the full file and returns parsed message count.
reader = MessageCacheReader()
count = reader.load_to_cache("data/orders_trades.bin")
print("Loaded:", count)
Expected output (example):
Loaded: 152340
4.3 get_all_messages()
Returns all cached messages as formatted strings.
reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
msgs = reader.get_all_messages()
print(msgs[0])
print(msgs[1])
Expected output (example):
Order Message: SeqNo 1, MsgLen 40, MsgType 'N', ExchTs 1710000000000000000, LocalTs 1710000000000000100, OrderId 10001, Token 26000, Side 'B', Price 2250000, Quantity 75, Missed 0
Trade Message: SeqNo 2, MsgLen 48, MsgType 'T', ExchTs 1710000000000000500, LocalTs 1710000000000000600, BuyOrderId 10001, SellOrderId 10002, Token 26000, Price 2250100, Quantity 50, Missed 0
4.4 get_cache_summary()
Returns dictionary summary of cached data.
reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
summary = reader.get_cache_summary()
print(summary)
Expected output shape:
{
"file_source": "data/orders_trades.bin",
"total_messages": 152340,
"total_orders": 120000,
"total_trades": 32340,
"memory_usage_bytes": 7312320,
}
5. Class: StreamingBinaryLoader
5.1 Constructor
stream = StreamingBinaryLoader()
print(stream)
Expected output:
<builtins.StreamingBinaryLoader object at 0x...>
5.2 open_stream(file_path, count_messages=True)
Opens stream source, validates header, resets cursor to start.
- count_messages=True: pre-scan to count messages, returns count
- count_messages=False: skip pre-scan, returns 0 (faster startup)
stream = StreamingBinaryLoader()
count = stream.open_stream("data/orders_trades.bin", count_messages=True)
print("Count:", count)
Expected output (example):
Count: 152340
Fast-start mode:
stream = StreamingBinaryLoader()
count = stream.open_stream("data/orders_trades.bin", count_messages=False)
print("Count:", count)
Expected output:
Count: 0
5.3 get_next_message()
Returns one formatted message per call. Returns END at EOF.
stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
print(stream.get_next_message())
print(stream.get_next_message())
Expected output (example):
Order Message: SeqNo 1, MsgLen 40, MsgType 'N', ExchTs ..., LocalTs ..., OrderId ..., Token ..., Side 'B', Price ..., Quantity ..., Missed 0
Trade Message: SeqNo 2, MsgLen 48, MsgType 'T', ExchTs ..., LocalTs ..., BuyOrderId ..., SellOrderId ..., Token ..., Price ..., Quantity ..., Missed 0
EOF behavior:
while True:
m = stream.get_next_message()
if m == "END":
print(m)
break
Expected output tail:
END
5.4 reset_cursor()
Moves stream cursor back to file start.
stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
a = stream.get_next_message()
_ = stream.get_next_message()
stream.reset_cursor()
b = stream.get_next_message()
print(a)
print(b)
Expected behavior:
- first printed line and second printed line are the same first message
6. Class: OrderbookBuilder
OrderbookBuilder applies order book logic using data from:
- MessageCacheReader
- StreamingBinaryLoader
- decoded list[dict] payloads
6.1 Constructor
builder = OrderbookBuilder()
print(builder)
Expected output:
<builtins.OrderbookBuilder object at 0x...>
6.2 apply_filter(logic_criteria=None)
Sets message-type filter by first character of each string.
builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X"]) # order lifecycle only
builder.apply_filter(["T"]) # trades only
builder.apply_filter(None) # clear filter
6.3 build_from_list(source)
Accepts:
- MessageCacheReader
- list[dict] decoded messages
Returns processed count after filter.
Example A: from MessageCacheReader
reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
builder = OrderbookBuilder()
processed = builder.build_from_list(reader)
print("Processed:", processed)
Expected output (example):
Processed: 152340
Example B: from decoded list[dict]
builder = OrderbookBuilder()
decoded = [
{
"msg_type": "N",
"exch_ts": 1710000000000000000,
"order_id": 1,
"token": 26000,
"order_type": "B",
"price": 2250000,
"quantity": 150,
"local_ts": 1710000000000000100,
"flags": False,
},
{
"msg_type": "N",
"exch_ts": 1710000000000000200,
"order_id": 2,
"token": 26000,
"order_type": "S",
"price": 2250100,
"quantity": 75,
"local_ts": 1710000000000000300,
"flags": False,
},
]
processed = builder.build_from_list(decoded)
print("Processed:", processed)
Expected output:
Processed: 2
6.4 build_from_source(source, limit=None)
Accepts:
- MessageCacheReader
- StreamingBinaryLoader
Returns processed count after filter.
stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
builder = OrderbookBuilder()
processed = builder.build_from_source(stream, limit=10000)
print("Processed:", processed)
Expected output:
Processed: 10000
6.5 get_snapshot(token, levels=None)
Returns top-level snapshot dict.
snapshot = builder.get_snapshot(token=26000, levels=5)
print(snapshot)
Expected output shape:
{
"token": 26000,
"found": True,
"mid_price": 2250050,
"best_bid": (2250000, 150),
"best_ask": (2250100, 75),
"spread": 100,
"bids": [(2250000, 150), (2249900, 200), (2249800, 100), (2249700, 50), (2249600, 25)],
"asks": [(2250100, 75), (2250200, 125), (2250300, 100), (2250400, 80), (2250500, 60)],
}
Unknown token output shape:
{
"token": 999999,
"found": False,
"mid_price": 0,
"best_bid": None,
"best_ask": None,
"spread": None,
"bids": [],
"asks": [],
}
6.6 get_orderbook_snapshot(token, levels=None)
Compatibility alias for get_snapshot.
snap_a = builder.get_snapshot(26000, levels=5)
snap_b = builder.get_orderbook_snapshot(26000, levels=5)
print(snap_a == snap_b)
Expected output:
True
6.7 get_full_depth(token)
Returns all non-zero bid/ask levels for token.
depth = builder.get_full_depth(26000)
print(depth["found"])
print(depth["best_bid"], depth["best_ask"], depth["spread"])
print(len(depth["bids"]), len(depth["asks"]))
Expected output shape:
{
"token": 26000,
"found": True,
"best_bid": (2250000, 150),
"best_ask": (2250100, 75),
"spread": 100,
"bids": [(2250000, 150), (2249900, 200), ...],
"asks": [(2250100, 75), (2250200, 125), ...],
}
6.8 snapshot_header()
Returns fixed 5-level CSV header.
header = builder.snapshot_header()
print(header)
Expected output:
local_ts,exch_ts,mid_price,bid_price_0,bid_qty_0,ask_price_0,ask_qty_0,bid_price_1,bid_qty_1,ask_price_1,ask_qty_1,bid_price_2,bid_qty_2,ask_price_2,ask_qty_2,bid_price_3,bid_qty_3,ask_price_3,ask_qty_3,bid_price_4,bid_qty_4,ask_price_4,ask_qty_4
6.9 get_snapshot_row(token, levels=None)
Returns one CSV row string matching snapshot_header column order.
Current behavior in code:
- local_ts is set to 0
- exch_ts is set to 0
row = builder.get_snapshot_row(token=26000, levels=5)
print(row)
Expected output (example):
0,0,2250050,2250000,150,2250100,75,2249900,200,2250200,125,2249800,100,2250300,100,2249700,50,2250400,80,2249600,25,2250500,60
7. End-to-end examples
7.1 Cache mode end-to-end
from fastreader import MessageCacheReader, OrderbookBuilder
reader = MessageCacheReader()
loaded = reader.load_to_cache("data/orders_trades.bin")
print("Loaded:", loaded)
print("Summary:", reader.get_cache_summary())
builder = OrderbookBuilder()
processed = builder.build_from_source(reader)
print("Processed:", processed)
snap = builder.get_orderbook_snapshot(token=26000, levels=5)
print("Best bid:", snap["best_bid"])
print("Best ask:", snap["best_ask"])
print("Spread:", snap["spread"])
7.2 Stream mode end-to-end
from fastreader import StreamingBinaryLoader, OrderbookBuilder
stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
builder = OrderbookBuilder()
processed = builder.build_from_source(stream, limit=200000)
print("Processed:", processed)
header = builder.snapshot_header()
row = builder.get_snapshot_row(token=26000, levels=5)
print(header)
print(row)
8. Notes and limitations
- snapshot_header is fixed to 5 levels.
- get_snapshot_row returns 5-level shaped row and currently uses local_ts=0 and exch_ts=0.
- get_next_message returns formatted text, not raw struct data.
- Strict stream parsing now raises errors for invalid/truncated data instead of silently treating all parse issues as EOF.
9. Error examples
Missing file
reader = MessageCacheReader()
reader.load_to_cache("missing.bin")
Expected:
RuntimeError: No such file or directory
Wrong source type for build_from_source
builder = OrderbookBuilder()
builder.build_from_source("wrong")
Expected:
TypeError: build_from_source expects MessageCacheReader or StreamingBinaryLoader
Wrong source type for build_from_list
builder = OrderbookBuilder()
builder.build_from_list("wrong")
Expected:
TypeError: build_from_list expects MessageCacheReader or list[dict] decoded messages
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file orderpulse-0.2.27.tar.gz.
File metadata
- Download URL: orderpulse-0.2.27.tar.gz
- Upload date:
- Size: 16.9 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2c30de3ef7ca62734ab5a7c15d50e02c81f3b5d0a2751313566e8b6a6f5645c8
|
|
| MD5 |
4074891ae841e39eb61959d32d948006
|
|
| BLAKE2b-256 |
b4fede57bd1240142b420c531db4ba0b650dbfcd9d7be374514998c7509503dc
|
File details
Details for the file orderpulse-0.2.27-cp39-cp39-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: orderpulse-0.2.27-cp39-cp39-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 257.4 kB
- Tags: CPython 3.9, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e4fb01715cb48d2c99bb264259f6e25ed528ee3991f9198eb25819a3e92fdf69
|
|
| MD5 |
129c549e660b3c5e68f54c9e093c4848
|
|
| BLAKE2b-256 |
4d64194f0eb68c6f707633a8140fe68f52cbcf452cf4d9c611d9a9108967730c
|