Douyu Live Stream Danmu (弹幕) Collector - A modular, async-capable library for collecting chat messages from Douyu live streams.
Project description
dykit - 斗鱼弹幕采集与分析工具
PostgreSQL 架构,支持实时采集、数据分析和 CSV 导入导出。
v4.0.0 (2026-03-04)
功能特性
- PostgreSQL 存储:采用 PostgreSQL 作为主要存储后端,支持高并发写入和高性能查询。
- 工具链:提供 7 个核心子命令(collect, rank, prune, cluster, import, export, init-db)。
- 数据结构:14 列扁平化数据结构,移除了复杂的 JSONB 字段。
- CLI 接口:基于 Click 框架,支持环境变量配置与 DSN 连接。
- 技术栈:使用 psycopg3 驱动和异步 WebSocket 采集。
- 消息处理:增强的 UTF-8 缓冲区处理,解决断包导致的乱码问题。
系统要求
- Python 3.12+
- PostgreSQL 12+
- uv (推荐) 或 pip
安装
# 使用 uv (推荐)
uv venv
source .venv/bin/activate
uv pip install .
# 或使用 pip
pip install .
快速开始
1. 设置数据库连接 (DSN)
export DYTOOLS_DSN="postgresql://user:pass@localhost:5432/douyu"
2. 初始化数据库
dykit init-db
3. 开始采集
dykit collect -r 6657
4. 查看排行
dykit rank -r 6657 --top 20
Service Management
Managing Long-Running Collectors
dykit supports managing long-running collectors as systemd --user services. This allows background collection that persists across sessions and restarts automatically.
Basic Workflow
# Set your database DSN (required for the service to connect)
export DYTOOLS_DSN="postgresql://douyu:douyu6657@localhost:5432/douyu_danmu"
# Create a service for a specific room (Format: NAME:ROOM_ID)
dykit service create test-room:9999
# List all managed services
dykit service list
# Check status of a specific service
dykit service status test-room-9999
# View recent logs
dykit service logs test-room-9999 --lines 10
# Stop a running service
dykit service stop test-room-9999
# Get the path to the unit file
dykit service where test-room-9999
# Remove the service completely
dykit service remove test-room-9999
Important Notes
- Persistence: To ensure services keep running after you log out, run
loginctl enable-linger $USER. - Storage: Service unit files are stored in
~/.config/systemd/user/. - Naming: When creating a service with
NAME:ROOM_ID, the resulting systemd unit is nameddykit-NAME-ROOM_ID.service. Use theNAME-ROOM_IDpart withdykit servicecommands.
命令行参考
数据库管理
init-db
初始化数据库表结构和索引。
dykit init-db
输出示例:
Database schema initialized successfully
Table: danmaku
Indexes: idx_danmaku_room_time, idx_danmaku_user_id, idx_danmaku_msg_type
collect
实时采集直播间弹幕。
-r, --room: 直播间 ID-v, --verbose: 打印调试日志
dykit collect -r 6657 -v
数据分析
rank
统计发送消息最多的用户或高频出现的重复弹幕。
-r, --room: 直播间 ID--by user|content: 统计维度(默认 user)--top N: 显示前 N 名 (默认 10)--type TYPE: 过滤消息类型 (默认 chatmsg, 可选 dgb 等)--user USERNAME: 按用户名过滤数据集--user-id USER_ID: 按 user_id 过滤数据集--from YYYY-MM-DD: 起始日期--to YYYY-MM-DD: 结束日期(含当天)--last N: 仅基于最近 N 条消息进行统计--first N: 仅基于最早 N 条消息进行统计-o, --output FILE: 导出排名结果 CSV--days N: 统计最近 N 天的数据
# 查看最活跃的用户 (默认)
dykit rank -r 6657 --top 10
# 按用户统计送礼榜
dykit rank -r 6657 --by user --type dgb --top 5
# 查看重复弹幕
dykit rank -r 6657 --by content --top 10
cluster
使用文本相似度算法对弹幕进行聚类,识别重复模式。
--type TYPE: 过滤消息类型 (默认 chatmsg)--user USERNAME: 按用户名过滤数据集--user-id USER_ID: 按 user_id 过滤数据集--from YYYY-MM-DD: 起始日期--to YYYY-MM-DD: 结束日期(含当天)--last N: 仅基于最近 N 条消息进行聚类--first N: 仅基于最早 N 条消息进行聚类--days N: 仅基于最近 N 天消息进行聚类--threshold FLOAT: 相似度阈值 (默认 0.6)-o, --output FILE: 将结果保存到 CSV 文件
dykit cluster -r 6657 --threshold 0.5 --limit 50
prune
清理数据库中的重复记录。
dykit prune -r 6657
导入与导出
import
将 CSV 采集文件导入到 PostgreSQL。
dykit import data.csv -r 6657
export
将数据库数据导出为 CSV 文件。
dykit export -r 6657 -o backup.csv
数据库字段
dykit 将所有消息存储在 danmaku 表中:
| 列名 | 类型 | 说明 |
|---|---|---|
| timestamp | TIMESTAMP | 接收时间 |
| room_id | TEXT | 直播间 ID |
| msg_type | TEXT | 消息类型 (chatmsg, dgb, uenter 等) |
| user_id | TEXT | 用户 UID |
| username | TEXT | 用户昵称 |
| content | TEXT | 消息内容 |
| user_level | INTEGER | 用户等级 |
| gift_id | TEXT | 礼物 ID (可选) |
| gift_count | INTEGER | 礼物数量 (可选) |
| gift_name | TEXT | 礼物名称 (可选) |
| badge_level | INTEGER | 粉丝牌等级 (可选) |
| badge_name | TEXT | 粉丝牌名称 (可选) |
| noble_level | INTEGER | 贵族等级 (可选) |
| avatar_url | TEXT | 头像 URL (可选) |
Python API
import asyncio
from dykit.storage import PostgreSQLStorage
from dykit.collectors import AsyncCollector
async def main():
storage = PostgreSQLStorage(
room_id=6657,
host='localhost',
port=5432,
database='douyu',
user='douyu',
password='pass'
)
with storage:
collector = AsyncCollector(6657, storage)
try:
await collector.connect()
except KeyboardInterrupt:
await collector.stop()
if __name__ == "__main__":
asyncio.run(main())
项目结构
dykit/
├── __main__.py # CLI 入口
├── types.py # 数据类定义
├── protocol.py # 协议解析
├── collectors/
│ └── async_.py # 异步采集器
├── storage/
│ ├── postgres.py # PostgreSQL 实现
│ └── csv.py # CSV 导入导出
└── tools/ # 分析工具
├── rank.py # 排行榜 (支持用户和内容双模式)
├── prune.py # 去重
└── cluster.py # 相似度聚类
常见问题
Q: 如何配置数据库?
A: 使用环境变量 DYTOOLS_DSN 或参数 --dsn 指定 PostgreSQL 连接字符串。
Q: CSV 文件去哪了?
A: v4.0.0 默认使用数据库。如果需要 CSV,请在采集后运行 export 命令。
Q: 兼容旧版 CSV 吗?
A: 兼容。使用 import 命令即可将旧版 8 列格式的数据导入数据库。
TODO
- 保存更多字段 — 利用 raw_data JSONB 字段提取额外信息(如弹幕颜色、特殊标识等)
- systemd 服务管理 — 添加 systemd user service unit 文件用于后台采集
- 历史数据迁移 — 已完成 room_id 统一迁移,迁移脚本已从仓库移除
- construct typing 跟踪 — 关注上游 issue https://github.com/construct/construct/issues/1125 ,上游提供官方 typing/stub 后评估移除本地
typings/construct临时桩
Collector Keepalive Contract
- Do NOT enable
websocketsbuilt-in keepalive (ping_interval/ping_timeout) for Douyu collection. - Collector liveness policy is:
- protocol heartbeat: send
mrkleveryWS_DOUYU_HEARTBEAT_SECONDS - idle detection: reconnect when no messages within
WS_READ_IDLE_TIMEOUT_SECONDS
- protocol heartbeat: send
- Regression guard:
tests/test_collector_keepalive_contract.pyasserts connect kwargs keepping_interval=Noneandping_timeout=None, and asserts heartbeat loop sendsmrkl.
仅供学习研究使用。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dykit-4.0.0.tar.gz.
File metadata
- Download URL: dykit-4.0.0.tar.gz
- Upload date:
- Size: 52.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f5ec511706aab1711bb68dac661f5757f352d45d248811e00eb10551add13c91
|
|
| MD5 |
78ae16fe1deed547f7d3f55458ff795a
|
|
| BLAKE2b-256 |
368e8aea36b24c69bd6d0ea3358ca42b87969c062707adb2a0ec5ae192831186
|
Provenance
The following attestation bundles were made for dykit-4.0.0.tar.gz:
Publisher:
publish.yml on Joxos/dykit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dykit-4.0.0.tar.gz -
Subject digest:
f5ec511706aab1711bb68dac661f5757f352d45d248811e00eb10551add13c91 - Sigstore transparency entry: 1056885302
- Sigstore integration time:
-
Permalink:
Joxos/dykit@ed9aa69ca68bc66cec45cc0b4094536093ff1e48 -
Branch / Tag:
refs/tags/v4.0.1 - Owner: https://github.com/Joxos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ed9aa69ca68bc66cec45cc0b4094536093ff1e48 -
Trigger Event:
push
-
Statement type:
File details
Details for the file dykit-4.0.0-py3-none-any.whl.
File metadata
- Download URL: dykit-4.0.0-py3-none-any.whl
- Upload date:
- Size: 55.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
68fe3f0d771b95c3a89a200353594a028b6f9a5deab70f36d3381aed77de9b48
|
|
| MD5 |
1759876fb7285ca8b1d018097363e840
|
|
| BLAKE2b-256 |
902d58466e43fce356b52fc88adaa60f110df2224241b607b594ab3f066fcfe9
|
Provenance
The following attestation bundles were made for dykit-4.0.0-py3-none-any.whl:
Publisher:
publish.yml on Joxos/dykit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dykit-4.0.0-py3-none-any.whl -
Subject digest:
68fe3f0d771b95c3a89a200353594a028b6f9a5deab70f36d3381aed77de9b48 - Sigstore transparency entry: 1056885304
- Sigstore integration time:
-
Permalink:
Joxos/dykit@ed9aa69ca68bc66cec45cc0b4094536093ff1e48 -
Branch / Tag:
refs/tags/v4.0.1 - Owner: https://github.com/Joxos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ed9aa69ca68bc66cec45cc0b4094536093ff1e48 -
Trigger Event:
push
-
Statement type: