High-performance exchange feed parser and orderflow analytics engine with Rust and Python bindings
Project description
OrderPulse / fastreader
A high-performance Python library for reading NSE binary order/trade feed files and building real-time orderbook snapshots.
Written in Rust, exposed to Python via PyO3 — binary parsing and orderbook processing runs entirely in Rust, giving you a fast and simple Python API.
What this library does
- Builds NSE feed file paths dynamically from segment, stream ID, and date.
- Reads NSE binary feed files (CM and FO segments).
- Extracts order and trade messages.
- Supports both RAM-based (cache) and streaming (low-memory) reading.
- Builds token-level orderbook state from decoded messages.
- Returns best bid, best ask, spread, mid price, top levels, full depth, and CSV rows.
- Loads the NSE contract master CSV in Rust and enriches every message with symbol, strike price, option type, expiry, and lot size — no Python CSV parsing.
Architecture
FeedPathBuilder
|
+--> build() construct path string from segment + stream_id + date
+--> build_and_verify() same, but also checks the file exists on disk
|
v
Binary Feed File (.bin)
|
v
Rust Binary Parser
|
+--> MessageCacheReader load all messages into RAM at once
+--> StreamingBinaryLoader read one message at a time from disk
| |
| +--> attach_symbol_master() auto-enrich every get_next_msg() call
|
v
Decoded Order / Trade Messages ← token_symbol / strike_price / option_type / expiry populated
|
v
OrderbookBuilder
|
+--> apply_filter() restrict which message types to process
+--> build_from_source() build from reader (recommended)
+--> build_from_list() build from cache reader or list[dict]
+--> orderbook_add_msg() feed one message at a time manually
|
v
Snapshots / Full Depth / CSV Rows
NSE Contract Master CSV
|
v
SymbolMaster (Rust CSV parser — no Python overhead)
|
+--> load(csv_path) load from explicit path
+--> load_for_date(seg, d, m, y) build path automatically and load
+--> lookup(token) single-token info dict
+--> enrich(msg) populate symbol fields in a msg dict
Classes
| Class | Purpose | When to use |
|---|---|---|
FeedPathBuilder |
Constructs NSE feed file paths | Avoid hardcoded path strings |
MessageCacheReader |
Loads entire file into RAM | Small/medium files, repeated analysis |
StreamingBinaryLoader |
Reads messages one by one from disk | Large files, Jupyter, low memory |
OrderbookBuilder |
Processes messages and queries orderbook state | Snapshot and depth analysis |
SymbolMaster |
Loads contract master CSV, looks up symbol info by token | Enriching messages with instrument metadata |
Installation
Build and install locally with maturin:
maturin develop --release
Build a distributable wheel:
maturin build --release
Import in Python:
from fastreader import (
FeedPathBuilder, MessageCacheReader,
StreamingBinaryLoader, OrderbookBuilder,
SymbolMaster,
)
Message Types
| Type | Meaning |
|---|---|
N |
New order |
M |
Modify order |
X |
Cancel / delete order |
T |
Trade |
Order side values:
| Side | Meaning |
|---|---|
B |
Buy (bid) |
S |
Sell (ask) |
Quick Start
Copy and run this block to get a snapshot from your first NSE file:
from fastreader import FeedPathBuilder, StreamingBinaryLoader, OrderbookBuilder
# 1. Build and verify the file path
b = FeedPathBuilder()
file_path = b.build_and_verify("NSE_FO", stream_id=1, day=21, month=5, year=2026)
print("File:", file_path)
# 2. Open the stream (fast — no full-file scan)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
# 3. Process the first 500 000 messages into the orderbook
builder = OrderbookBuilder()
processed = builder.build_from_source(reader, limit=500_000)
print(f"Processed: {processed} messages")
# 4. Discover active tokens
tokens = builder.get_active_tokens()
print(f"Active tokens: {len(tokens)} | first 5: {tokens[:5]}")
# 5. Query a snapshot
snap = builder.get_snapshot(token=tokens[0], levels=5)
print(snap)
Expected terminal output:
File: /nas/50.30/NSE_FO/Feed_FO_StreamID_1_21_05_2026.bin
Processed: 500000 messages
Active tokens: 261 | first 5: [36687, 40434, 42174, 42175, 42194]
{'token': 36687, 'found': True, 'mid_price': 408562, 'best_bid': (322585, 1740), 'best_ask': (494540, 120), 'spread': 171955, 'bids': [(322585, 1740), (322580, 120)], 'asks': [(494540, 120)]}
Note on
get_next_message(): this method returns a(payload, is_end_of_stream)tuple.is_end_of_streamisTrueonly when the file is exhausted.payload, is_end = reader.get_next_message() print(payload) # "Order Message: SeqNo 1, MsgType 'N', ..." print(is_end) # False
1. FeedPathBuilder
Constructs NSE binary feed file paths from components. Avoids hardcoding paths and keeps naming consistent with the NSE format.
Path format produced:
{base_path}/{FOLDER}/Feed_{SHORT}_StreamID_{id}_{DD}_{MM}_{YYYY}.bin
Default base_path is /nas/50.30.
| Segment input | Folder | Short name |
|---|---|---|
"NSE_CM" or "CM" |
NSE_CM |
CM |
"NSE_FO" or "FO" |
NSE_FO |
FO |
All segment values are case-insensitive.
1.1 Create
from fastreader import FeedPathBuilder
b = FeedPathBuilder()
1.2 build(segment, stream_id, day, month, year, base_path=None)
Constructs and returns a file path string. Does not check whether the file exists on disk.
| Parameter | Type | Required | Description |
|---|---|---|---|
segment |
str |
Yes | "NSE_CM", "CM", "NSE_FO", or "FO" |
stream_id |
int |
Yes | Stream identifier — must be > 0 |
day |
int |
Yes | Day of month, 1–31 |
month |
int |
Yes | Month, 1–12 |
year |
int |
Yes | Four-digit year, 2000–2100 |
base_path |
str |
No | Root directory; defaults to /nas/50.30 |
Returns str. Raises RuntimeError for invalid inputs.
b = FeedPathBuilder()
print(b.build("NSE_CM", stream_id=2, day=29, month=12, year=2025))
# /nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin
print(b.build("CM", stream_id=5, day=3, month=1, year=2025))
# /nas/50.30/NSE_CM/Feed_CM_StreamID_5_03_01_2025.bin
print(b.build("NSE_FO", stream_id=1, day=1, month=6, year=2026, base_path="/mnt/data"))
# /mnt/data/NSE_FO/Feed_FO_StreamID_1_01_06_2026.bin
Validation errors:
b.build("INVALID", stream_id=1, day=1, month=1, year=2026)
# RuntimeError: unknown segment 'INVALID' — expected one of: NSE_CM, CM, NSE_FO, FO
b.build("NSE_CM", stream_id=0, day=1, month=1, year=2026)
# RuntimeError: stream_id must be > 0
b.build("NSE_CM", stream_id=1, day=1, month=13, year=2026)
# RuntimeError: invalid month 13 — must be 1–12
1.3 build_and_verify(segment, stream_id, day, month, year, base_path=None)
Same as build(), but also checks that the file exists on disk. Raises RuntimeError if it does not.
Same parameters as build().
b = FeedPathBuilder()
try:
path = b.build_and_verify("NSE_CM", stream_id=2, day=29, month=12, year=2025)
print("File ready:", path)
except RuntimeError as e:
print("Not found:", e)
Expected output when file exists:
File ready: /nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin
Expected output when file is missing:
Not found: file not found: /nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin
2. MessageCacheReader
Loads all decoded messages from a binary file into RAM. Good for repeated analysis of the same file.
Tip: For very large files use
StreamingBinaryLoaderinstead to avoid high RAM usage.
2.1 Create
from fastreader import MessageCacheReader
reader = MessageCacheReader()
2.2 load_to_cache(file_path)
Reads and decodes the entire binary file into memory.
| Parameter | Type | Description |
|---|---|---|
file_path |
str |
Full path to the binary file |
Returns int — number of messages loaded.
reader = MessageCacheReader()
count = reader.load_to_cache("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin")
print("Loaded:", count)
Expected output:
Loaded: 1250000
2.3 get_all_messages()
Returns all cached messages (orders and trades) as formatted strings.
messages = reader.get_all_messages()
print(messages[0])
print(messages[1])
Expected output:
Order Message: SeqNo 42, MsgLen 10, MsgType 'N', ExchTs 100000, LocalTs 200000, OrderId 55, Token 1001, Side 'B', Price 500, Quantity 100, Missed 0
Trade Message: SeqNo 99, MsgLen 10, MsgType 'T', ExchTs 300000, LocalTs 400000, BuyOrderId 10, SellOrderId 20, Token 5000, Price 750, Quantity 30, Missed 1
2.4 get_order_message()
Returns only order-type messages (N, M, X) as formatted strings.
orders = reader.get_order_message()
print(f"Total orders: {len(orders)}")
print(orders[0])
Expected output:
Total orders: 900000
Order Message: SeqNo 42, MsgLen 10, MsgType 'N', ExchTs 100000, LocalTs 200000, OrderId 55, Token 1001, Side 'B', Price 500, Quantity 100, Missed 0
2.5 get_trade_message()
Returns only trade messages (T) as formatted strings.
trades = reader.get_trade_message()
print(f"Total trades: {len(trades)}")
print(trades[0])
Expected output:
Total trades: 350000
Trade Message: SeqNo 99, MsgLen 10, MsgType 'T', ExchTs 300000, LocalTs 400000, BuyOrderId 10, SellOrderId 20, Token 5000, Price 750, Quantity 30, Missed 1
2.6 get_all_trade_message()
Alias for get_trade_message(). Identical behaviour.
trades = reader.get_all_trade_message()
2.7 get_cache_summary()
Returns a dictionary with file and memory statistics.
| Key | Description |
|---|---|
file_source |
Path of the loaded file |
total_messages |
Total messages in cache |
total_orders |
Order messages count |
total_trades |
Trade messages count |
memory_usage_bytes |
Estimated RAM usage in bytes |
summary = reader.get_cache_summary()
print(summary)
Expected output:
{
'file_source': '/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin',
'total_messages': 1250000,
'total_orders': 900000,
'total_trades': 350000,
'memory_usage_bytes': 80000000
}
3. StreamingBinaryLoader
Reads messages one by one directly from disk without loading everything into RAM. The right choice for large files.
3.1 Create
from fastreader import StreamingBinaryLoader
reader = StreamingBinaryLoader()
3.2 open_stream(file_path, count_messages=True)
Opens a binary file for sequential streaming.
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
str |
Required | Full path to the binary file |
count_messages |
bool |
True |
Whether to scan the whole file to count messages |
Returns int — message count when count_messages=True, or 0 when False.
Tip: For large files always use
count_messages=False. Counting requires a full scan of the file.
reader = StreamingBinaryLoader()
# Fast open — skip counting
count = reader.open_stream("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin", count_messages=False)
print(count) # 0 — counting was skipped, not that the file is empty
# Slow open — includes full count
count = reader.open_stream("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin", count_messages=True)
print(count) # 1250000
3.3 get_next_message()
Reads and returns the next message as a formatted string. Returns "END" at end of file.
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
print(reader.get_next_message())
print(reader.get_next_message())
print(reader.get_next_message())
Expected output:
Order Message: SeqNo 1, MsgLen 10, MsgType 'N', ExchTs 100001, LocalTs 200001, OrderId 10, Token 200, Side 'B', Price 100, Quantity 5, Missed 0
Order Message: SeqNo 2, MsgLen 10, MsgType 'M', ExchTs 100002, LocalTs 200002, OrderId 10, Token 200, Side 'B', Price 102, Quantity 5, Missed 0
Trade Message: SeqNo 3, MsgLen 10, MsgType 'T', ExchTs 300003, LocalTs 400003, BuyOrderId 10, SellOrderId 20, Token 200, Price 100, Quantity 5, Missed 0
Check End of Message in Streaming Reader
StreamingBinaryLoader provides two ways to read messages from a binary feed file.
1. get_next_message()
This function reads the next message from the stream and returns a tuple:
(message, is_end)
---
Where:
message is the formatted message string.
is_end is a boolean value.
is_end == False means a valid message was read.
is_end == True means the end of the file/message stream has been reached.
Example
from fastreader import StreamingBinaryLoader
reader = StreamingBinaryLoader()
file_path = "/path/to/feed_file.bin"
count = reader.open_stream(file_path, count_messages=False)
while True:
message, is_end = reader.get_next_message()
if is_end:
print("End of message stream reached")
break
print(message)
Expected Output
Order Message: SeqNo 1, MsgLen 38, MsgType 'N', ExchTs 100001, LocalTs 200001, OrderId 1001, Token 12345, Side 'B', Price 25000, Quantity 50, Missed 0
Trade Message: SeqNo 2, MsgLen 45, MsgType 'T', ExchTs 100002, LocalTs 200002, BuyOrderId 1001, SellOrderId 1002, Token 12345, Price 25010, Quantity 25, Missed 0
End of message stream reached
is_get_next_message_end()
This function checks whether the next message is the end of the stream.
It returns:
True
when there is no next message.
It returns:
False
when the next message exists.
This function does not permanently move the file cursor. It checks the next message and then resets the cursor back to the same position.
Example :
from fastreader import StreamingBinaryLoader
reader = StreamingBinaryLoader()
file_path = "/path/to/feed_file.bin"
reader.open_stream(file_path, count_messages=False)
print(reader.is_get_next_message_end())
message, is_end = reader.get_next_message()
print(message)
print(is_end)
print(reader.is_get_next_message_end())
Expected Output
False
Order Message: SeqNo 1, MsgLen 38, MsgType 'N', ExchTs 100001, LocalTs 200001, OrderId 1001, Token 12345, Side 'B', Price 25000, Quantity 50, Missed 0
False
False
When all messages are consumed:
while True:
message, is_end = reader.get_next_message()
if is_end:
print(message)
print(is_end)
break
Final Output at End of Stream
END
True
## 3.4 `get_next_msg()`
Reads and returns the next message as a Python dictionary. Returns `None` at end of file.
Use this when you need to inspect individual fields or pass messages to `orderbook_add_msg()`.
```python
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
msg = reader.get_next_msg()
print(msg)
Expected output for an order message:
{
'message_kind': 'order',
'seq_no': 1,
'msg_len': 10,
'stream_id': 2,
'msg_type': 'N',
'exch_ts': 100001,
'local_ts': 200001,
'order_id': 10,
'token': 200,
'order_type': 'B',
'price': 100,
'quantity': 5,
'flags': False
}
Expected output for a trade message:
{
'message_kind': 'trade',
'seq_no': 3,
'msg_len': 10,
'stream_id': 2,
'msg_type': 'T',
'exch_ts': 300003,
'local_ts': 400003,
'buy_order_id': 10,
'sell_order_id': 20,
'token': 200,
'trade_price': 100,
'trade_quantity': 5,
'flags': False
}
At end of file: returns None.
3.5 reset_cursor()
Moves the stream read position back to the start of the file so you can read it again.
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
first = reader.get_next_message()
reader.reset_cursor()
first_again = reader.get_next_message()
print(first == first_again) # True
3.6 attach_symbol_master(sm)
Attaches a loaded SymbolMaster to the stream. After this call every get_next_msg() dict will have its symbol fields automatically populated.
| Parameter | Type | Description |
|---|---|---|
sm |
SymbolMaster |
A loaded SymbolMaster instance |
The lookup runs entirely in Rust — no Python overhead per message.
from fastreader import SymbolMaster, StreamingBinaryLoader
sm = SymbolMaster()
sm.load_for_date("NSE_FO", day=21, month=5, year=2026)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
reader.attach_symbol_master(sm)
msg = reader.get_next_msg()
print(msg['token_symbol']) # e.g. 'FINNIFTY'
print(msg['strike_price']) # e.g. 21700 (rupees)
print(msg['option_type']) # e.g. 'CE'
print(msg['expiry']) # e.g. '26-May-2026'
print(msg['name']) # e.g. 'FINNIFTY26MAY21700CE'
print(msg['lot_size']) # e.g. 40
When no match is found for a token the original None values remain unchanged.
3.7 detach_symbol_master()
Removes the attached SymbolMaster. After this call get_next_msg() returns None for symbol fields again.
reader.detach_symbol_master()
4. OrderbookBuilder
Processes decoded messages and maintains live orderbook state per token. Query snapshots and depth after processing.
4.1 Create
from fastreader import OrderbookBuilder
builder = OrderbookBuilder()
4.2 apply_filter(logic_criteria=None)
Restricts which message types are processed. By default, all types are processed.
| Value | Effect |
|---|---|
None |
Process all message types (default) |
["N", "M", "X"] |
Process only order messages |
["N"] |
Process only new orders |
["T"] |
Process only trades |
builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X"]) # orders only
Clear the filter (process everything again):
builder.apply_filter(None)
4.3 build_from_source(source, limit=None)
Builds the orderbook by reading from a StreamingBinaryLoader or MessageCacheReader.
This is the recommended method for most use cases.
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
reader object | Required | StreamingBinaryLoader or MessageCacheReader |
limit |
int or None |
None (all) |
Maximum number of messages to process |
Returns int — number of messages processed.
from fastreader import StreamingBinaryLoader, OrderbookBuilder
reader = StreamingBinaryLoader()
reader.open_stream("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin", count_messages=False)
builder = OrderbookBuilder()
processed = builder.build_from_source(reader, limit=500000)
print("Processed:", processed)
Expected output:
Processed: 500000
4.4 build_from_list(source)
Builds the orderbook from either a MessageCacheReader or a Python list[dict] of decoded message dictionaries.
Returns int — number of messages processed.
# From MessageCacheReader
reader = MessageCacheReader()
reader.load_to_cache(file_path)
builder = OrderbookBuilder()
processed = builder.build_from_list(reader)
print("Processed:", processed)
# Processed: 1250000
# From a list of message dicts
messages = [
{
"msg_type": "N", "exch_ts": 100000, "order_id": 1,
"token": 777, "order_type": "B", "price": 1000,
"quantity": 40, "local_ts": 200000, "flags": False,
},
{
"msg_type": "N", "exch_ts": 100001, "order_id": 2,
"token": 777, "order_type": "S", "price": 1100,
"quantity": 15, "local_ts": 200001, "flags": False,
},
]
builder = OrderbookBuilder()
processed = builder.build_from_list(messages)
print("Processed:", processed)
# Processed: 2
4.5 orderbook_add_msg(msg)
Processes exactly one decoded message dictionary returned by reader.get_next_msg().
| Return | Meaning |
|---|---|
True |
Message accepted and applied to orderbook |
False |
Message skipped by filter or business rules |
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
msg = reader.get_next_msg()
if msg is not None:
result = builder.orderbook_add_msg(msg)
print("Accepted:", result)
# Accepted: True
Loop example:
while True:
msg = reader.get_next_msg()
if msg is None:
break
builder.orderbook_add_msg(msg)
Note: Pass one message dict — not the reader object.
Wrong:
builder.orderbook_add_msg(reader)→TypeErrorRight:
builder.orderbook_add_msg(reader.get_next_msg())
4.6 get_active_tokens()
Returns a sorted list of all token IDs seen during processing. Use this to discover which instruments are in your data before querying snapshots.
Returns list[int].
tokens = builder.get_active_tokens()
print(f"Active tokens ({len(tokens)} total):", tokens[:10])
Expected output:
Active tokens (261 total): [36687, 40434, 42174, 42175, 42194, 42195, 42219, 42244, 42258, 42259]
Get active tokens with symbol names:
from fastreader import FeedPathBuilder, StreamingBinaryLoader, OrderbookBuilder, SymbolMaster
# Build feed path and process orderbook
b = FeedPathBuilder()
file_path = b.build_and_verify("NSE_FO", stream_id=1, day=21, month=5, year=2026)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
processed = builder.build_from_source(reader, limit=500000)
tokens = builder.get_active_tokens()
# Load contract master for the same date/segment
sm = SymbolMaster()
loaded = sm.load_for_date("NSE_FO", day=21, month=5, year=2026)
# Token -> symbol mapping
token_symbol = []
for t in tokens:
info = sm.lookup(token=t) # returns dict
sym = info["symbol"] if info["found"] else None
token_symbol.append((t, sym))
print(f"Processed: {processed}")
print(f"Loaded contracts: {loaded}")
print(f"Active tokens: {len(tokens)}")
print("First 10 token-symbol pairs:")
for row in token_symbol[:10]:
print(row)
# snapshot still uses token
if tokens:
print(builder.get_snapshot(token=tokens[0], levels=5))
Expected output (example):
Processed: 500000
Loaded contracts: 95632
Active tokens: 261
First 10 token-symbol pairs:
(36687, '011NSETEST')
(40434, 'FINNIFTY')
(42174, 'NIFTY')
(42175, 'NIFTY')
...
{'token': 36687, 'found': True, 'mid_price': 0, 'best_bid': None, 'best_ask': None, 'spread': None, 'bids': [], 'asks': []}
Find tokens that have live bid or ask depth:
live = [t for t in builder.get_active_tokens()
if builder.get_snapshot(token=t, levels=1).get('best_bid') is not None
or builder.get_snapshot(token=t, levels=1).get('best_ask') is not None]
print(f"Tokens with depth: {len(live)}")
print(builder.get_snapshot(token=live[0], levels=5))
Always use
get_active_tokens()first when working with a new file — token numbers are NSE-specific and are not guessable.
4.7 get_snapshot(token, levels=None)
Returns the top bid and ask levels for a token.
| Parameter | Type | Default | Description |
|---|---|---|---|
token |
int |
Required | Instrument token |
levels |
int or None |
5 |
Number of price levels to return |
Returned dictionary:
| Key | Type | Description |
|---|---|---|
token |
int |
Requested token |
found |
bool |
Whether the token has data |
mid_price |
int |
(best_bid_price + best_ask_price) / 2 (in paise) |
best_bid |
(int, int) or None |
Best bid (price, quantity) |
best_ask |
(int, int) or None |
Best ask (price, quantity) |
spread |
int or None |
best_ask_price − best_bid_price |
bids |
list[(int, int)] |
Top bid levels, best first |
asks |
list[(int, int)] |
Top ask levels, best first |
All prices are in paise (1 rupee = 100 paise).
snapshot = builder.get_snapshot(token=40434, levels=5)
print(snapshot)
Expected output (token found):
{
'token': 40434,
'found': True,
'mid_price': 408562,
'best_bid': (322585, 1740),
'best_ask': (494540, 120),
'spread': 171955,
'bids': [(322585, 1740), (322580, 120)],
'asks': [(494540, 120)]
}
Expected output (token not found):
{
'token': 99999,
'found': False,
'mid_price': 0,
'best_bid': None,
'best_ask': None,
'spread': None,
'bids': [],
'asks': []
}
4.8 get_orderbook_snapshot(token, levels=None)
Alias for get_snapshot(). Identical behaviour and output.
snapshot = builder.get_orderbook_snapshot(token=40434, levels=5)
4.9 get_full_depth(token)
Returns all available bid and ask levels for a token — no top-N limit.
| Parameter | Type | Description |
|---|---|---|
token |
int |
Instrument token |
| Key | Description |
|---|---|
token |
Requested token |
found |
Whether the token has data |
best_bid |
Best bid level (price, qty) |
best_ask |
Best ask level (price, qty) |
spread |
Ask price − bid price |
bids |
All bid levels, best first |
asks |
All ask levels, best first |
depth = builder.get_full_depth(token=40434)
print(depth)
Expected output:
{
'token': 40434,
'found': True,
'best_bid': (322585, 1740),
'best_ask': (494540, 120),
'spread': 171955,
'bids': [(322585, 1740), (322580, 120), (322575, 300)],
'asks': [(494540, 120), (495000, 250)]
}
4.10 snapshot_header()
Returns the CSV column header string for snapshot rows.
print(builder.snapshot_header())
Expected output:
local_ts,exch_ts,mid_price,bid_price_0,bid_qty_0,ask_price_0,ask_qty_0,bid_price_1,bid_qty_1,ask_price_1,ask_qty_1,bid_price_2,bid_qty_2,ask_price_2,ask_qty_2,bid_price_3,bid_qty_3,ask_price_3,ask_qty_3,bid_price_4,bid_qty_4,ask_price_4,ask_qty_4
4.11 get_snapshot_row(token, levels=None)
Returns one CSV-formatted data row for a token snapshot. Pair with snapshot_header() to write CSV files.
| Parameter | Type | Default | Description |
|---|---|---|---|
token |
int |
Required | Instrument token |
levels |
int or None |
5 |
Number of price levels |
Returns str.
print(builder.snapshot_header())
print(builder.get_snapshot_row(token=40434, levels=5))
Expected output:
local_ts,exch_ts,mid_price,bid_price_0,bid_qty_0,ask_price_0,ask_qty_0,...
0,0,408562,322585,1740,494540,120,322580,120,0,0,0,0,0,0,0,0,0,0,0,0,0,0
5. SymbolMaster
Loads the NSE FO or CM contract master CSV entirely in Rust and provides fast O(1) token → symbol lookups. Use it to populate token_symbol, strike_price, option_type, expiry, lot_size, and name on every decoded message.
5.1 Create
from fastreader import SymbolMaster
sm = SymbolMaster()
5.2 load(csv_path)
Loads the contract master from an explicit file path.
| Parameter | Type | Description |
|---|---|---|
csv_path |
str |
Full path to the NSE contract master CSV |
Returns int — number of contracts loaded.
count = sm.load("/nas/50.30/CONTRACT/21_05_2026/NSE_FO_contract_21052026.csv")
print(count) # 95632
Raises RuntimeError if the file cannot be opened or a required column is missing.
5.3 load_for_date(segment, day, month, year, base_path=None)
Builds the standard NSE contract master path from date components and loads it.
Path pattern produced:
{base_path}/CONTRACT/{DD}_{MM}_{YYYY}/NSE_{FO|CM}_contract_{DD}{MM}{YYYY}.csv
| Parameter | Type | Default | Description |
|---|---|---|---|
segment |
str |
Required | "NSE_FO", "FO", "NSE_CM", or "CM" |
day |
int |
Required | Day of month, 1–31 |
month |
int |
Required | Month, 1–12 |
year |
int |
Required | Four-digit year |
base_path |
str |
"/nas/50.30" |
Root directory |
Returns int — number of contracts loaded.
sm = SymbolMaster()
count = sm.load_for_date("NSE_FO", day=21, month=5, year=2026)
print(count) # 95632
print(sm) # SymbolMaster(contracts=95632)
5.4 lookup(token)
Returns a dictionary with full contract metadata for a token.
| Parameter | Type | Description |
|---|---|---|
token |
int |
Instrument token ID |
Returned dictionary:
| Key | Type | Description |
|---|---|---|
token |
int |
The requested token |
found |
bool |
Whether the token is in the loaded master |
symbol |
str or None |
Ticker symbol, e.g. "FINNIFTY" |
name |
str or None |
Full instrument name, e.g. "FINNIFTY26MAY21700CE" |
option_type |
str or None |
"CE", "PE", or "XX" (futures) |
strike |
int or None |
Strike price in rupees (-1 for futures) |
expiry |
str or None |
Expiry date, e.g. "26-May-2026" |
lot_size |
int or None |
Lot size |
info = sm.lookup(token=40434)
print(info)
Expected output (token found):
{
'token': 40434,
'found': True,
'symbol': 'FINNIFTY',
'name': 'FINNIFTY26MAY21700CE',
'option_type': 'CE',
'strike': 21700,
'expiry': '26-May-2026',
'lot_size': 40
}
Expected output (token not in master):
{'token': 99999, 'found': False, 'symbol': None, 'name': None,
'option_type': None, 'strike': None, 'expiry': None, 'lot_size': None}
5.5 enrich(msg)
Populates symbol fields directly on a message dict returned by get_next_msg(). Modifies the dict in place.
Added / overwritten keys: token_symbol, strike_price, option_type, expiry, lot_size, name.
Returns True if the token was found and the dict was enriched, False if not found.
| Parameter | Type | Description |
|---|---|---|
msg |
dict |
A message dict from get_next_msg() |
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
msg = reader.get_next_msg()
found = sm.enrich(msg)
if found:
print(msg['token_symbol'], msg['strike_price'], msg['option_type'])
# FINNIFTY 21700 CE
No-op when the token is not in the loaded master — the dict is not modified.
for msg in iter(reader.get_next_msg, None):
sm.enrich(msg)
# all known tokens are now enriched; unknown tokens unchanged
Tip: For bulk streaming use
attach_symbol_master()instead (section 3.6) — it enriches every message automatically so you do not have to callenrich()in a loop.
5.6 len() and repr()
print(len(sm)) # 95632
print(sm) # SymbolMaster(contracts=95632)
Recommended Workflows
Workflow A: Large file — fastest approach
from fastreader import FeedPathBuilder, StreamingBinaryLoader, OrderbookBuilder
b = FeedPathBuilder()
file_path = b.build_and_verify("NSE_FO", stream_id=1, day=21, month=5, year=2026)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
processed = builder.build_from_source(reader, limit=500000)
tokens = builder.get_active_tokens()
print(f"Processed {processed} messages, {len(tokens)} tokens")
print(builder.get_snapshot(token=tokens[0], levels=5))
Workflow B: Load once, analyse many times
from fastreader import MessageCacheReader, OrderbookBuilder
reader = MessageCacheReader()
reader.load_to_cache("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025.bin")
print(reader.get_cache_summary())
builder = OrderbookBuilder()
builder.build_from_source(reader)
tokens = builder.get_active_tokens()
print(builder.get_snapshot(token=tokens[0], levels=5))
Workflow C: Process messages one at a time
from fastreader import StreamingBinaryLoader, OrderbookBuilder
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
while True:
msg = reader.get_next_msg()
if msg is None:
break
builder.orderbook_add_msg(msg)
tokens = builder.get_active_tokens()
print(builder.get_snapshot(token=tokens[0], levels=5))
Workflow D: Orders only (no trades)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X"])
processed = builder.build_from_source(reader)
print("Order messages processed:", processed)
Workflow F: Enrich streaming messages with symbol metadata
from fastreader import FeedPathBuilder, StreamingBinaryLoader, OrderbookBuilder, SymbolMaster
# Load contract master once
sm = SymbolMaster()
sm.load_for_date("NSE_FO", day=21, month=5, year=2026)
print(sm) # SymbolMaster(contracts=95632)
# Attach to the stream — all get_next_msg() calls will be auto-enriched
b = FeedPathBuilder()
file_path = b.build_and_verify("NSE_FO", stream_id=1, day=21, month=5, year=2026)
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
reader.attach_symbol_master(sm)
# Also build the orderbook in the same pass
builder = OrderbookBuilder()
for _ in range(500_000):
msg = reader.get_next_msg()
if msg is None:
break
builder.orderbook_add_msg(msg)
# msg already has: token_symbol, strike_price, option_type, expiry, lot_size, name
# Snapshot with symbol info
for token in builder.get_active_tokens()[:5]:
snap = builder.get_snapshot(token=token, levels=3)
info = sm.lookup(token=token)
print(f"{info['symbol']:>12} {info['name']:<25} {info['option_type']} "
f"strike={info['strike']:>6} bid={snap['best_bid']} ask={snap['best_ask']}")
Expected output:
011NSETEST 011NSETEST36DECFUT XX strike= 0 bid=None ask=None
FINNIFTY FINNIFTY26MAY21700CE CE strike= 21700 bid=(322585, 1740) ask=(494540, 120)
NIFTY NIFTY2660921350CE CE strike= 21350 bid=... ask=...
Workflow E: Export all tokens to CSV
from fastreader import StreamingBinaryLoader, OrderbookBuilder
reader = StreamingBinaryLoader()
reader.open_stream(file_path, count_messages=False)
builder = OrderbookBuilder()
builder.build_from_source(reader, limit=500000)
tokens = builder.get_active_tokens()
with open("snapshots.csv", "w") as f:
f.write(builder.snapshot_header() + "\n")
for token in tokens:
f.write(builder.get_snapshot_row(token=token, levels=5) + "\n")
print(f"Wrote {len(tokens)} rows to snapshots.csv")
Common Mistakes
found: False or empty bids / asks
Token was never seen in processed messages, or all its orders were cancelled before you called the snapshot.
Fix: Use get_active_tokens() first, then pick a token from that list.
tokens = builder.get_active_tokens()
print(builder.get_snapshot(token=tokens[0], levels=5)) # guaranteed to exist
If bids and asks are still empty even on a known token, you processed the full day and end-of-day cancellations cleared the book. Process fewer messages to capture mid-session state.
Using a future date in build_and_verify()
b.build_and_verify("NSE_FO", stream_id=1, day=1, month=6, year=2026)
# RuntimeError: file not found — the file doesn't exist yet
Use the actual latest available date. Files are written daily.
count_messages=True on a large file is slow
reader.open_stream(large_file, count_messages=True) # full file scan before you can read
Fix:
reader.open_stream(large_file, count_messages=False) # opens immediately
Error Reference
| Situation | Error message |
|---|---|
File does not exist (open_stream) |
RuntimeError: No such file or directory |
File does not exist (build_and_verify) |
RuntimeError: file not found: /path/to/file.bin |
| Unknown segment | RuntimeError: unknown segment 'X' — expected one of: NSE_CM, CM, NSE_FO, FO |
stream_id is 0 |
RuntimeError: stream_id must be > 0 |
| Invalid month | RuntimeError: invalid month 13 — must be 1–12 |
SymbolMaster.load() — file not found |
RuntimeError: cannot open /path/to/file.csv: No such file or directory |
SymbolMaster.load() — column missing |
RuntimeError: column 'FinInstrmId' not found in /path/to/file.csv |
SymbolMaster.enrich() — not a dict |
TypeError: enrich() expects a message dict from get_next_msg() |
SymbolMaster.enrich() — no token key |
TypeError: msg dict missing 'token' key |
load_for_date() unknown segment |
RuntimeError: unknown segment 'X' — expected NSE_FO, FO, NSE_CM, or CM |
| Invalid day | RuntimeError: invalid day 0 — must be 1–31 |
| Invalid year | RuntimeError: invalid year 1999 — must be 2000–2100 |
Wrong object to orderbook_add_msg |
TypeError: orderbook_add_msg expects one message dict from get_next_msg() |
Wrong object to build_from_source |
TypeError: build_from_source expects MessageCacheReader or StreamingBinaryLoader |
Full API Reference
FeedPathBuilder
| Method | Signature | Returns | Description |
|---|---|---|---|
build |
(segment, stream_id, day, month, year, base_path=None) |
str |
Construct path string |
build_and_verify |
(segment, stream_id, day, month, year, base_path=None) |
str |
Construct path and verify file exists |
MessageCacheReader
| Method | Signature | Returns | Description |
|---|---|---|---|
load_to_cache |
(file_path) |
int |
Load binary file into RAM |
get_all_messages |
() |
list[str] |
All messages as formatted strings |
get_order_message |
() |
list[str] |
Order messages only |
get_trade_message |
() |
list[str] |
Trade messages only |
get_all_trade_message |
() |
list[str] |
Alias for get_trade_message() |
get_cache_summary |
() |
dict |
File stats and memory usage |
StreamingBinaryLoader
| Method | Signature | Returns | Description |
|---|---|---|---|
open_stream |
(file_path, count_messages=True) |
int |
Open file for streaming |
get_next_message |
() |
str |
Next message as formatted string, or "END" |
get_next_msg |
() |
dict or None |
Next message as Python dict, or None at EOF |
reset_cursor |
() |
None |
Rewind stream to start of file |
OrderbookBuilder
| Method | Signature | Returns | Description |
|---|---|---|---|
apply_filter |
(logic_criteria=None) |
None |
Filter message types to process |
build_from_source |
(source, limit=None) |
int |
Build from reader object (recommended) |
build_from_list |
(source) |
int |
Build from cache reader or list of dicts |
orderbook_add_msg |
(msg) |
bool |
Process one decoded message dict |
get_active_tokens |
() |
list[int] |
All token IDs seen during processing |
get_snapshot |
(token, levels=None) |
dict |
Top-N bid/ask levels for a token |
get_orderbook_snapshot |
(token, levels=None) |
dict |
Alias for get_snapshot() |
get_full_depth |
(token) |
dict |
All bid/ask levels for a token |
snapshot_header |
() |
str |
CSV column header |
get_snapshot_row |
(token, levels=None) |
str |
CSV data row for a token |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file orderpulse-0.2.41.tar.gz.
File metadata
- Download URL: orderpulse-0.2.41.tar.gz
- Upload date:
- Size: 60.9 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29faa44f86d51c14e1202c7955d006c851d3cb0f5aa90dd5ed7d92852815870e
|
|
| MD5 |
4f845b1a123b1f2029ee34b6466f7eb0
|
|
| BLAKE2b-256 |
ae40ff6bf56701a95b6a0032b2e62a292fb3f58b19ba3fc3fa9c5069fc51195c
|
File details
Details for the file orderpulse-0.2.41-cp39-cp39-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: orderpulse-0.2.41-cp39-cp39-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 313.6 kB
- Tags: CPython 3.9, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ff0ecf9248d726b858c859014eadbf0d087244d9ff4e5b4b2d9384f5be3734cf
|
|
| MD5 |
b206182b25c84ef5a835ce2c36cfcb16
|
|
| BLAKE2b-256 |
08cb02f972dd53cb83047ad9671c4224126a156f4824d41b3048bcb9f2a3cad2
|