Skip to main content

A library for real-time tick-level data slice feed

Project description

l2data_feeder

一个用于实时 Level2 行情数据处理的高效 Python 库

简介

l2data_feeder 是一个专为金融市场 Level2 数据处理设计的 Python 库。它能够从 Level2 行情数据文件中读取 tick、订单和成交数据,并按照指定的时间间隔将这些数据聚合到 Slice 对象中,便于后续分析和处理。

该库基于 l2data_reader 模块实现,支持处理由 protobuf 定义的 Level2 行情数据结构,包括 SecuDepthMarketDataTransactionEntrustDataTransactionTradeData 等。

特性

  • 数据预取模式:可以预先读取指定数量的消息,快速获取历史数据
  • 实时更新模式:支持实时读取新数据,通过生成器方式返回每个时间片的数据
  • 灵活的时间间隔设置:可自定义时间片的间隔(毫秒级)
  • 多股票并行处理:自动按股票代码聚合数据
  • 完整的数据结构:保留原始 Level2 数据的所有字段信息

依赖

本库依赖于 l2data-reader 库,用于读取 Level2 行情数据。安装时会自动安装此依赖。

安装

pip install l2data_feeder

快速开始

基本用法

from l2data_feeder import Level2SliceFeeder

# 初始化数据源
index_file = "path/to/index.idx"
data_file = "path/to/data.bin"
header_file = "path/to/header.hdr"

# 创建 Level2SliceFeeder 实例
feeder = Level2SliceFeeder(
    index_file=index_file,
    data_file=data_file,
    header_file=header_file,
    time_slice_ms=100,  # 时间片间隔为100毫秒
    prefetch_count=100  # 预取100条消息
)

# 预取数据
slices = feeder.prefetch()
print(f"预取了 {len(slices)} 个时间片")

# 打印第一个时间片的信息
first_slice = slices[0]
for symbol, snapshot in first_slice.Ticks.items():
    print(f"股票: {symbol}")
    if snapshot.Tick:
        print(f"  最新价: {snapshot.Tick.last_price}")
        print(f"  成交量: {snapshot.Tick.volume}")
    print(f"  委托数量: {len(snapshot.Orders)}")
    print(f"  成交数量: {len(snapshot.Transactions)}")

实时模式

# 创建实时模式的 Level2SliceFeeder
realtime_feeder = Level2SliceFeeder(
    index_file=index_file,
    data_file=data_file,
    header_file=header_file,
    time_slice_ms=100,
    prefetch_count=0,  # 不预取数据
    realtime=True      # 启用实时模式
)

# 实时处理数据
for slice_data in realtime_feeder.feed():
    # 处理每个时间片的数据
    for symbol, snapshot in slice_data.Ticks.items():
        if snapshot.Tick:
            # 处理 tick 数据
            process_tick(snapshot.Tick)
        
        # 处理委托数据
        for order in snapshot.Orders:
            process_order(order)
        
        # 处理成交数据
        for trade in snapshot.Transactions:
            process_trade(trade)

使用辅助函数create_feeder, 更方便地使用库:

from l2data_feeder import create_feeder

# 创建 feeder 实例
feeder = create_feeder(
    index_file="path/to/index.idx",
    data_file="path/to/data.bin",
    header_file="path/to/header.hdr",
    slice_interval_ms=100,
    realtime=True
)

# 使用 feeder
for slice_data in feeder.run():
    # 处理每个时间片的数据
    pass

完整示例

from l2data_feeder import Level2SliceFeeder
import time

def main():
    # 初始化数据源
    index_file = "path/to/index.idx"
    data_file = "path/to/data.bin"
    header_file = "path/to/header.hdr"
    
    # 创建 Level2SliceFeeder 实例
    feeder = Level2SliceFeeder(
        index_file=index_file,
        data_file=data_file,
        header_file=header_file,
        time_slice_ms=100,
        prefetch_count=100,
        realtime=True
    )
    
    # 预取数据
    print("开始预取数据...")
    slices = feeder.prefetch()
    print(f"预取完成,共 {len(slices)} 个时间片")
    
    # 打印预取数据的统计信息
    symbols = set()
    tick_count = 0
    order_count = 0
    trade_count = 0
    
    for slice_data in slices:
        for symbol, snapshot in slice_data.Ticks.items():
            symbols.add(symbol)
            if snapshot.Tick:
                tick_count += 1
            order_count += len(snapshot.Orders)
            trade_count += len(snapshot.Transactions)
    
    print(f"预取数据统计:")
    print(f"  股票数量: {len(symbols)}")
    print(f"  Tick数量: {tick_count}")
    print(f"  委托数量: {order_count}")
    print(f"  成交数量: {trade_count}")
    
    # 进入实时模式
    print("\n进入实时模式,按 'q' 退出,按 'y' 打印当前时间片详情:")
    try:
        for slice_data in feeder.feed():
            # 打印简要信息
            symbols_in_slice = list(slice_data.Ticks.keys())
            print(f"\r当前时间片包含 {len(symbols_in_slice)} 个股票: {', '.join(symbols_in_slice[:3])}{'...' if len(symbols_in_slice) > 3 else ''}", end="")
            
            # 检查用户输入
            user_input = input("\n是否打印详情? (y/n/q): ")
            if user_input.lower() == 'q':
                break
            elif user_input.lower() == 'y':
                # 打印详情
                for symbol, snapshot in slice_data.Ticks.items():
                    print(f"股票: {symbol}")
                    if snapshot.Tick:
                        print(f"  时间: {snapshot.Tick.time}")
                        print(f"  最新价: {snapshot.Tick.last_price}")
                        print(f"  开盘价: {snapshot.Tick.open_price}")
                        print(f"  最高价: {snapshot.Tick.high_price}")
                        print(f"  最低价: {snapshot.Tick.low_price}")
                        print(f"  成交量: {snapshot.Tick.volume}")
                    print(f"  委托数量: {len(snapshot.Orders)}")
                    print(f"  成交数量: {len(snapshot.Transactions)}")
            
            # 模拟处理时间
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\n程序被中断")
    finally:
        feeder.stop()
        print("数据读取已停止")

if __name__ == "__main__":
    main()

API 参考

Level2SliceFeeder

主要类,用于读取和处理 Level2 行情数据。

参数

  • index_file (str): 索引文件路径
  • data_file (str): 数据文件路径
  • header_file (str): 头文件路径
  • time_slice_ms (int, 可选): 时间片间隔,单位毫秒,默认为 100
  • prefetch_count (int, 可选): 预取消息数量,0 表示不预取,-1 表示全部读取,默认为 0
  • realtime (bool, 可选): 是否启用实时模式,默认为 False
  • logger (logging.Logger, 可选): 日志记录器,默认为 None

方法

  • prefetch(): 预取数据,返回 Slice 对象列表
  • feed(): 生成器方法,实时返回每个时间片的 Slice 对象
  • stop(): 停止数据读取

Slice

表示一个时间片的数据集合。

属性

  • Ticks (Dict[str, Snapshot]): 按股票代码索引的 Snapshot 对象字典

Snapshot

表示一个股票在特定时间点的快照数据。

属性

  • Symbol (str): 股票代码
  • Tick (SecuDepthMarketData): Tick 数据
  • Orders (List[TransactionEntrustData]): 委托数据列表
  • Transactions (List[TransactionTradeData]): 成交数据列表

构建与发布

要构建并发布此包到 PyPI,请按照以下步骤操作:

  1. 确保已安装必要的工具:

    pip install build twine
    
  2. 构建包:

    python -m build
    
  3. 上传到 PyPI:

    python -m twine upload dist/*
    

许可证

MIT

贡献

欢迎提交 Pull Request 或创建 Issue 来帮助改进此项目。

联系方式

如有问题,请通过 GitHub Issues 联系我们。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

l2data_feeder-0.1.0.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

l2data_feeder-0.1.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file l2data_feeder-0.1.0.tar.gz.

File metadata

  • Download URL: l2data_feeder-0.1.0.tar.gz
  • Upload date:
  • Size: 8.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for l2data_feeder-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8386f7683425b556280c0f37a5966b976b346b3217f1100fe0234b0b6ac33c4b
MD5 7b60d2d1e7c2f039939e1a8c3a991037
BLAKE2b-256 0ba1787959642bbb0c1886db199eceb706e61e647c3aa0d2c2c7fdb02ce1d36b

See more details on using hashes here.

File details

Details for the file l2data_feeder-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: l2data_feeder-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for l2data_feeder-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dbea795b845c0c5ae447f6639867868caeb876ba357476eb30017a347ee5a38b
MD5 2886dd1caa743b892e660d60594bb4ee
BLAKE2b-256 fd2991106293fd8ca62b1e3ac290cf6b9bbb22ac65bb54c353ce9b97f50b8ffb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page