Skip to main content

Douyu Live Stream Danmu (弹幕) Collector - A modular, async-capable library for collecting chat messages from Douyu live streams.

Project description

dykit - 斗鱼弹幕采集与分析工具

PostgreSQL 架构,支持实时采集、数据分析和 CSV 导入导出。

v4.0.2 (2026-03-07)


功能特性

  • PostgreSQL 存储:采用 PostgreSQL 作为主要存储后端,支持高并发写入和高性能查询。
  • 工具链:提供 7 个核心子命令(collect, rank, prune, cluster, import, export, init-db)。
  • 数据结构:14 列扁平化数据结构,移除了复杂的 JSONB 字段。
  • CLI 接口:基于 Click 框架,支持环境变量配置与 DSN 连接。
  • 技术栈:使用 psycopg3 驱动和异步 WebSocket 采集。
  • 消息处理:增强的 UTF-8 缓冲区处理,解决断包导致的乱码问题。

系统要求

  • Python 3.12+
  • PostgreSQL 12+
  • uv (推荐) 或 pip

安装

# 使用 uv (推荐)
uv venv
source .venv/bin/activate
uv pip install .

# 或使用 pip
pip install .

快速开始

1. 设置数据库连接 (DSN)

export DYTOOLS_DSN="postgresql://user:pass@localhost:5432/douyu"

2. 初始化数据库

dykit init-db

3. 开始采集

dykit collect -r 6657

4. 查看排行

dykit rank -r 6657 --top 20

Service Management

Managing Long-Running Collectors

dykit supports managing long-running collectors as systemd --user services. This allows background collection that persists across sessions and restarts automatically.

Basic Workflow

# Set your database DSN (required for the service to connect)
export DYTOOLS_DSN="postgresql://douyu:douyu6657@localhost:5432/douyu_danmu"

# Create a service for a specific room (Format: NAME:ROOM_ID)
dykit service create test-room:9999

# List all managed services
dykit service list

# Check status of a specific service
dykit service status test-room-9999

# View recent logs
dykit service logs test-room-9999 --lines 10

# Stop a running service
dykit service stop test-room-9999

# Get the path to the unit file
dykit service where test-room-9999

# Remove the service completely
dykit service remove test-room-9999

Important Notes

  • Persistence: To ensure services keep running after you log out, run loginctl enable-linger $USER.
  • Storage: Service unit files are stored in ~/.config/systemd/user/.
  • Naming: When creating a service with NAME:ROOM_ID, the resulting systemd unit is named dykit-NAME-ROOM_ID.service. Use the NAME-ROOM_ID part with dykit service commands.

命令行参考

数据库管理

init-db

初始化数据库表结构和索引。

dykit init-db

输出示例:

Database schema initialized successfully
Table: danmaku
Indexes: idx_danmaku_room_time, idx_danmaku_user_id, idx_danmaku_msg_type

collect

实时采集直播间弹幕。

  • -r, --room: 直播间 ID
  • -v, --verbose: 打印调试日志
dykit collect -r 6657 -v

数据分析

rank

统计发送消息最多的用户或高频出现的重复弹幕。

  • -r, --room: 直播间 ID
  • --by user|content: 统计维度(默认 user)
  • --top N: 显示前 N 名 (默认 10)
  • --type TYPE: 过滤消息类型 (默认 chatmsg, 可选 dgb 等)
  • --user USERNAME: 按用户名过滤数据集
  • --user-id USER_ID: 按 user_id 过滤数据集
  • --from YYYY-MM-DD: 起始日期
  • --to YYYY-MM-DD: 结束日期(含当天)
  • --last N: 仅基于最近 N 条消息进行统计
  • --first N: 仅基于最早 N 条消息进行统计
  • -o, --output FILE: 导出排名结果 CSV
  • --days N: 统计最近 N 天的数据
# 查看最活跃的用户 (默认)
dykit rank -r 6657 --top 10

# 按用户统计送礼榜
dykit rank -r 6657 --by user --type dgb --top 5

# 查看重复弹幕
dykit rank -r 6657 --by content --top 10

cluster

使用文本相似度算法对弹幕进行聚类,识别重复模式。

  • --type TYPE: 过滤消息类型 (默认 chatmsg)
  • --user USERNAME: 按用户名过滤数据集
  • --user-id USER_ID: 按 user_id 过滤数据集
  • --from YYYY-MM-DD: 起始日期
  • --to YYYY-MM-DD: 结束日期(含当天)
  • --last N: 仅基于最近 N 条消息进行聚类
  • --first N: 仅基于最早 N 条消息进行聚类
  • --days N: 仅基于最近 N 天消息进行聚类
  • --threshold FLOAT: 相似度阈值 (默认 0.6)
  • -o, --output FILE: 将结果保存到 CSV 文件
dykit cluster -r 6657 --threshold 0.5 --limit 50

prune

清理数据库中的重复记录。

dykit prune -r 6657

导入与导出

import

将 CSV 采集文件导入到 PostgreSQL。

dykit import data.csv -r 6657

export

将数据库数据导出为 CSV 文件。

dykit export -r 6657 -o backup.csv

数据库字段

dykit 将所有消息存储在 danmaku 表中:

列名 类型 说明
timestamp TIMESTAMP 接收时间
room_id TEXT 直播间 ID
msg_type TEXT 消息类型 (chatmsg, dgb, uenter 等)
user_id TEXT 用户 UID
username TEXT 用户昵称
content TEXT 消息内容
user_level INTEGER 用户等级
gift_id TEXT 礼物 ID (可选)
gift_count INTEGER 礼物数量 (可选)
gift_name TEXT 礼物名称 (可选)
badge_level INTEGER 粉丝牌等级 (可选)
badge_name TEXT 粉丝牌名称 (可选)
noble_level INTEGER 贵族等级 (可选)
avatar_url TEXT 头像 URL (可选)

Python API

import asyncio
from dykit.storage import PostgreSQLStorage
from dykit.collectors import AsyncCollector

async def main():
    storage = PostgreSQLStorage(
        room_id=6657,
        host='localhost',
        port=5432,
        database='douyu',
        user='douyu',
        password='pass'
    )
    
    with storage:
        collector = AsyncCollector(6657, storage)
        try:
            await collector.connect()
        except KeyboardInterrupt:
            await collector.stop()

if __name__ == "__main__":
    asyncio.run(main())

项目结构

dykit/
├── __main__.py          # CLI 入口
├── types.py             # 数据类定义
├── protocol.py          # 协议解析
├── collectors/
│   └── async_.py        # 异步采集器
├── storage/
│   ├── postgres.py      # PostgreSQL 实现
│   └── csv.py           # CSV 导入导出
└── tools/               # 分析工具
    ├── rank.py          # 排行榜 (支持用户和内容双模式)
    ├── prune.py         # 去重
    └── cluster.py       # 相似度聚类

常见问题

Q: 如何配置数据库?
A: 使用环境变量 DYTOOLS_DSN 或参数 --dsn 指定 PostgreSQL 连接字符串。

Q: CSV 文件去哪了?
A: v4.0.2 默认使用数据库。如果需要 CSV,请在采集后运行 export 命令。

Q: 兼容旧版 CSV 吗?
A: 兼容。使用 import 命令即可将旧版 8 列格式的数据导入数据库。


TODO

  • 保存更多字段 — 利用 raw_data JSONB 字段提取额外信息(如弹幕颜色、特殊标识等)
  • systemd 服务管理 — 添加 systemd user service unit 文件用于后台采集
  • 历史数据迁移 — 已完成 room_id 统一迁移,迁移脚本已从仓库移除
  • construct typing 跟踪 — 关注上游 issue https://github.com/construct/construct/issues/1125 ,上游提供官方 typing/stub 后评估移除本地 typings/construct 临时桩

Collector Keepalive Contract

  • Do NOT enable websockets built-in keepalive (ping_interval / ping_timeout) for Douyu collection.
  • Collector liveness policy is:
    • protocol heartbeat: send mrkl every WS_DOUYU_HEARTBEAT_SECONDS
    • idle detection: reconnect when no messages within WS_READ_IDLE_TIMEOUT_SECONDS
  • Regression guard:
    • tests/test_collector_keepalive_contract.py asserts connect kwargs keep ping_interval=None and ping_timeout=None, and asserts heartbeat loop sends mrkl.

仅供学习研究使用。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dykit-4.0.2.tar.gz (56.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dykit-4.0.2-py3-none-any.whl (60.1 kB view details)

Uploaded Python 3

File details

Details for the file dykit-4.0.2.tar.gz.

File metadata

  • Download URL: dykit-4.0.2.tar.gz
  • Upload date:
  • Size: 56.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dykit-4.0.2.tar.gz
Algorithm Hash digest
SHA256 27c9f101174e4e3e8fa2d25fc736614277dd5281418aabfde1c4cb45b05f1236
MD5 9f1f668532f8a8c0b2fb93c8af393650
BLAKE2b-256 70930c42d0dac397bcb0640bb494a14a3143934f05f25f3a49a0f37502bca20e

See more details on using hashes here.

Provenance

The following attestation bundles were made for dykit-4.0.2.tar.gz:

Publisher: publish.yml on Joxos/dykit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dykit-4.0.2-py3-none-any.whl.

File metadata

  • Download URL: dykit-4.0.2-py3-none-any.whl
  • Upload date:
  • Size: 60.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dykit-4.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 991e6669ff001afb48e1182144048976af77fdfca7ac22fbdb9c6734957dfe1b
MD5 f2e72389685797e77cb9c6956eff38b1
BLAKE2b-256 3d8bb991f3f6947ff1f8ea67e33fd0584bcc03d8fe0b2f89da97e5bc94511e1f

See more details on using hashes here.

Provenance

The following attestation bundles were made for dykit-4.0.2-py3-none-any.whl:

Publisher: publish.yml on Joxos/dykit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page