Skip to main content

High-performance exchange feed parser and orderflow analytics engine with Rust and Python bindings

Project description

fastreader

fastreader is a Python extension module implemented in Rust with PyO3.

It is designed for binary market data workflows:

  • parse order and trade packets
  • read from cache or stream from disk
  • build token-level order books
  • export snapshots in dict or CSV-row form

The module currently exposes these core classes:

  • MessageCacheReader
  • StreamingBinaryLoader
  • OrderbookBuilder

It also exposes compatibility wrappers:

  • BinaryDataLoader
  • BinaryLoader
  • ReadMsgFromBinary

1. Architecture

The data path is:

Binary file on disk
      |
      +--> MessageCacheReader (load full file into RAM)
      |
      +--> StreamingBinaryLoader (read one message at a time)
                   |
                   v
            OrderbookBuilder
                   |
      +------------+-------------------+
      |                                |
      v                                v
get_snapshot / get_orderbook_snapshot  get_snapshot_row / snapshot_header
(dict output)                          (CSV output)

Reader strategy

Reader Behavior Best for
MessageCacheReader Loads all supported messages into memory repeated analysis, debug, backtest
StreamingBinaryLoader Keeps file handle open and reads sequentially very large files, low memory

2. Install and import

pip install fastreader
from fastreader import MessageCacheReader, StreamingBinaryLoader, OrderbookBuilder

3. Supported message types

Parser supports:

  • T : trade
  • N : new order
  • M : modify order
  • X : cancel order

Space bytes at boundaries are skipped.

4. Class: MessageCacheReader

4.1 Constructor

reader = MessageCacheReader()
print(reader)

Expected output:

<builtins.MessageCacheReader object at 0x...>

4.2 load_to_cache(file_path)

Loads the full file and returns parsed message count.

reader = MessageCacheReader()
count = reader.load_to_cache("data/orders_trades.bin")
print("Loaded:", count)

Expected output (example):

Loaded: 152340

4.3 get_all_messages()

Returns all cached messages as formatted strings.

reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
msgs = reader.get_all_messages()
print(msgs[0])
print(msgs[1])

Expected output (example):

Order Message: SeqNo 1, MsgLen 40, MsgType 'N', ExchTs 1710000000000000000, LocalTs 1710000000000000100, OrderId 10001, Token 26000, Side 'B', Price 2250000, Quantity 75, Missed 0
Trade Message: SeqNo 2, MsgLen 48, MsgType 'T', ExchTs 1710000000000000500, LocalTs 1710000000000000600, BuyOrderId 10001, SellOrderId 10002, Token 26000, Price 2250100, Quantity 50, Missed 0

4.4 get_cache_summary()

Returns dictionary summary of cached data.

reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
summary = reader.get_cache_summary()
print(summary)

Expected output shape:

{
    "file_source": "data/orders_trades.bin",
    "total_messages": 152340,
    "total_orders": 120000,
    "total_trades": 32340,
    "memory_usage_bytes": 7312320,
}

5. Class: StreamingBinaryLoader

5.1 Constructor

stream = StreamingBinaryLoader()
print(stream)

Expected output:

<builtins.StreamingBinaryLoader object at 0x...>

5.2 open_stream(file_path, count_messages=True)

Opens stream source, validates header, resets cursor to start.

  • count_messages=True: pre-scan to count messages, returns count
  • count_messages=False: skip pre-scan, returns 0 (faster startup)
stream = StreamingBinaryLoader()
count = stream.open_stream("data/orders_trades.bin", count_messages=True)
print("Count:", count)

Expected output (example):

Count: 152340

Fast-start mode:

stream = StreamingBinaryLoader()
count = stream.open_stream("data/orders_trades.bin", count_messages=False)
print("Count:", count)

Expected output:

Count: 0

5.3 get_next_message()

Returns one formatted message per call. Returns END at EOF.

stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
print(stream.get_next_message())
print(stream.get_next_message())

Expected output (example):

Order Message: SeqNo 1, MsgLen 40, MsgType 'N', ExchTs ..., LocalTs ..., OrderId ..., Token ..., Side 'B', Price ..., Quantity ..., Missed 0
Trade Message: SeqNo 2, MsgLen 48, MsgType 'T', ExchTs ..., LocalTs ..., BuyOrderId ..., SellOrderId ..., Token ..., Price ..., Quantity ..., Missed 0

EOF behavior:

while True:
    m = stream.get_next_message()
    if m == "END":
        print(m)
        break

Expected output tail:

END

5.4 reset_cursor()

Moves stream cursor back to file start.

stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)
a = stream.get_next_message()
_ = stream.get_next_message()
stream.reset_cursor()
b = stream.get_next_message()
print(a)
print(b)

Expected behavior:

  • first printed line and second printed line are the same first message

6. Class: OrderbookBuilder

OrderbookBuilder applies order book logic using data from:

  • MessageCacheReader
  • StreamingBinaryLoader
  • decoded list[dict] payloads

6.1 Constructor

builder = OrderbookBuilder()
print(builder)

Expected output:

<builtins.OrderbookBuilder object at 0x...>

6.2 apply_filter(logic_criteria=None)

Sets message-type filter by first character of each string.

builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X"])   # order lifecycle only
builder.apply_filter(["T"])               # trades only
builder.apply_filter(None)                  # clear filter

6.3 build_from_list(source)

Accepts:

  • MessageCacheReader
  • list[dict] decoded messages

Returns processed count after filter.

Example A: from MessageCacheReader

reader = MessageCacheReader()
reader.load_to_cache("data/orders_trades.bin")
builder = OrderbookBuilder()
processed = builder.build_from_list(reader)
print("Processed:", processed)

Expected output (example):

Processed: 152340

Example B: from decoded list[dict]

builder = OrderbookBuilder()

decoded = [
    {
        "msg_type": "N",
        "exch_ts": 1710000000000000000,
        "order_id": 1,
        "token": 26000,
        "order_type": "B",
        "price": 2250000,
        "quantity": 150,
        "local_ts": 1710000000000000100,
        "flags": False,
    },
    {
        "msg_type": "N",
        "exch_ts": 1710000000000000200,
        "order_id": 2,
        "token": 26000,
        "order_type": "S",
        "price": 2250100,
        "quantity": 75,
        "local_ts": 1710000000000000300,
        "flags": False,
    },
]

processed = builder.build_from_list(decoded)
print("Processed:", processed)

Expected output:

Processed: 2

6.4 build_from_source(source, limit=None)

Accepts:

  • MessageCacheReader
  • StreamingBinaryLoader

Returns processed count after filter.

stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)

builder = OrderbookBuilder()
processed = builder.build_from_source(stream, limit=10000)
print("Processed:", processed)

Expected output:

Processed: 10000

6.5 get_snapshot(token, levels=None)

Returns top-level snapshot dict.

snapshot = builder.get_snapshot(token=26000, levels=5)
print(snapshot)

Expected output shape:

{
    "token": 26000,
    "found": True,
    "mid_price": 2250050,
    "best_bid": (2250000, 150),
    "best_ask": (2250100, 75),
    "spread": 100,
    "bids": [(2250000, 150), (2249900, 200), (2249800, 100), (2249700, 50), (2249600, 25)],
    "asks": [(2250100, 75), (2250200, 125), (2250300, 100), (2250400, 80), (2250500, 60)],
}

Unknown token output shape:

{
    "token": 999999,
    "found": False,
    "mid_price": 0,
    "best_bid": None,
    "best_ask": None,
    "spread": None,
    "bids": [],
    "asks": [],
}

6.6 get_orderbook_snapshot(token, levels=None)

Compatibility alias for get_snapshot.

snap_a = builder.get_snapshot(26000, levels=5)
snap_b = builder.get_orderbook_snapshot(26000, levels=5)
print(snap_a == snap_b)

Expected output:

True

6.7 get_full_depth(token)

Returns all non-zero bid/ask levels for token.

depth = builder.get_full_depth(26000)
print(depth["found"])
print(depth["best_bid"], depth["best_ask"], depth["spread"])
print(len(depth["bids"]), len(depth["asks"]))

Expected output shape:

{
    "token": 26000,
    "found": True,
    "best_bid": (2250000, 150),
    "best_ask": (2250100, 75),
    "spread": 100,
    "bids": [(2250000, 150), (2249900, 200), ...],
    "asks": [(2250100, 75), (2250200, 125), ...],
}

6.8 snapshot_header()

Returns fixed 5-level CSV header.

header = builder.snapshot_header()
print(header)

Expected output:

local_ts,exch_ts,mid_price,bid_price_0,bid_qty_0,ask_price_0,ask_qty_0,bid_price_1,bid_qty_1,ask_price_1,ask_qty_1,bid_price_2,bid_qty_2,ask_price_2,ask_qty_2,bid_price_3,bid_qty_3,ask_price_3,ask_qty_3,bid_price_4,bid_qty_4,ask_price_4,ask_qty_4

6.9 get_snapshot_row(token, levels=None)

Returns one CSV row string matching snapshot_header column order.

Current behavior in code:

  • local_ts is set to 0
  • exch_ts is set to 0
row = builder.get_snapshot_row(token=26000, levels=5)
print(row)

Expected output (example):

0,0,2250050,2250000,150,2250100,75,2249900,200,2250200,125,2249800,100,2250300,100,2249700,50,2250400,80,2249600,25,2250500,60

7. End-to-end examples

7.1 Cache mode end-to-end

from fastreader import MessageCacheReader, OrderbookBuilder

reader = MessageCacheReader()
loaded = reader.load_to_cache("data/orders_trades.bin")
print("Loaded:", loaded)
print("Summary:", reader.get_cache_summary())

builder = OrderbookBuilder()
processed = builder.build_from_source(reader)
print("Processed:", processed)

snap = builder.get_orderbook_snapshot(token=26000, levels=5)
print("Best bid:", snap["best_bid"])
print("Best ask:", snap["best_ask"])
print("Spread:", snap["spread"])

7.2 Stream mode end-to-end

from fastreader import StreamingBinaryLoader, OrderbookBuilder

stream = StreamingBinaryLoader()
stream.open_stream("data/orders_trades.bin", count_messages=False)

builder = OrderbookBuilder()
processed = builder.build_from_source(stream, limit=200000)
print("Processed:", processed)

header = builder.snapshot_header()
row = builder.get_snapshot_row(token=26000, levels=5)
print(header)
print(row)

8. Notes and limitations

  • snapshot_header is fixed to 5 levels.
  • get_snapshot_row returns 5-level shaped row and currently uses local_ts=0 and exch_ts=0.
  • get_next_message returns formatted text, not raw struct data.
  • Strict stream parsing now raises errors for invalid/truncated data instead of silently treating all parse issues as EOF.

9. Error examples

Missing file

reader = MessageCacheReader()
reader.load_to_cache("missing.bin")

Expected:

RuntimeError: No such file or directory

Wrong source type for build_from_source

builder = OrderbookBuilder()
builder.build_from_source("wrong")

Expected:

TypeError: build_from_source expects MessageCacheReader or StreamingBinaryLoader

Wrong source type for build_from_list

builder = OrderbookBuilder()
builder.build_from_list("wrong")

Expected:

TypeError: build_from_list expects MessageCacheReader or list[dict] decoded messages

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orderpulse-0.2.27.tar.gz (16.9 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

orderpulse-0.2.27-cp39-cp39-manylinux_2_34_x86_64.whl (257.4 kB view details)

Uploaded CPython 3.9manylinux: glibc 2.34+ x86-64

File details

Details for the file orderpulse-0.2.27.tar.gz.

File metadata

  • Download URL: orderpulse-0.2.27.tar.gz
  • Upload date:
  • Size: 16.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.12.4

File hashes

Hashes for orderpulse-0.2.27.tar.gz
Algorithm Hash digest
SHA256 2c30de3ef7ca62734ab5a7c15d50e02c81f3b5d0a2751313566e8b6a6f5645c8
MD5 4074891ae841e39eb61959d32d948006
BLAKE2b-256 b4fede57bd1240142b420c531db4ba0b650dbfcd9d7be374514998c7509503dc

See more details on using hashes here.

File details

Details for the file orderpulse-0.2.27-cp39-cp39-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for orderpulse-0.2.27-cp39-cp39-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 e4fb01715cb48d2c99bb264259f6e25ed528ee3991f9198eb25819a3e92fdf69
MD5 129c549e660b3c5e68f54c9e093c4848
BLAKE2b-256 4d64194f0eb68c6f707633a8140fe68f52cbcf452cf4d9c611d9a9108967730c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page