Skip to main content

No project description provided

Project description

B 站弹幕监听框架

特点

  • 简单,只需房间号即可监听
  • 异步,io 不阻塞,及时获取消息

B 站直播弹幕 websocket 协议分析

PROTOCOL 分析

快速开始

  1. 安装

    pip install blive

  2. 创建 app

    from blive import  BLiver
    
    app = BLiver(123) #123为房间号
    
  3. 创建处理器

    from blive import  BLiver, Events, BLiverCtx
    from blive.msg import DanMuMsg
    
    app = BLiver(123)
    
    # 标记该方法监听弹幕消息,更多消息类型请参考 Events 类源代码
    @app.on(Events.DANMU_MSG)
    async def listen_danmu(ctx: BLiverCtx):
        danmu = DanMuMsg(ctx.body) #ctx.body 套上相应的消息操作类即可得到消息的基本内容,也可直接操作 ctx.body
        print(danmu.content)
        print(danmu.sender)
        print(danmu.timestamp)
    
  4. 运行

    from blive import  BLiver, Events, BLiverCtx
    from blive.msg import DanMuMsg
    
    app = BLiver(123)
    
    @app.on(Events.DANMU_MSG)
    async def listen_danmu(ctx: BLiverCtx):
        danmu = DanMuMsg(ctx.body)
        print(danmu.content)
        print(danmu.sender)
        print(danmu.timestamp)
    
    app.run() # 运行app!
    

同时监听多个直播间

import asyncio
from blive import BLiver, Events, BLiverCtx
from blive.msg import DanMuMsg


# 定义弹幕事件handler
async def listen(ctx: BLiverCtx):
   danmu = DanMuMsg(ctx.body)
   print(
      f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
   )


async def main():
   # 两个直播间
   ke = BLiver(605)
   azi = BLiver(510)

   # 注册handler
   ke.on(Events.DANMU_MSG, listen)
   azi.on(Events.DANMU_MSG, listen)

   # 以异步task的形式运行
   task1 = ke.run_as_task()
   task2 = azi.run_as_task()

   # await 两个任务
   await asyncio.gather(*[task1, task2])


if __name__ == "__main__":
   loop = asyncio.get_event_loop()
   loop.run_until_complete(main()) 

作为协议解析工具在其他地方使用(伪代码)

from blive.core import BWS_MsgPackage

packman = BWS_MsgPackage() # 实例化一个消息包处理器

while True:
   data = ws.receive() # 当收到消息时
   msg = packman.unpack(data) # 使用packman解析消息,返回一个形如 [(header,body), (header,body), ... ] 数组
   print(msg)

与 fastapi (其他asyncio生态框架) 配合使用

from fastapi import FastAPI
from blive import BLiver,Events
from blive.msg import DanMuMsg

app = FastAPI()

BLIVER_POOL = {}


# 定义弹幕事件handler
async def handler(ctx):
   danmu = DanMuMsg(ctx.body)
   print(
      f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
   )

def create_bliver(roomid):
    b = BLiver(roomid)
    b.on(Events.DANMU_MSG,handler)
    return b


@app.get("/create")
async def create_new_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if not room:
        b = create_bliver(roomid)
        BLIVER_POOL[roomid] = b.run_as_task() # 启动监听
    return {"msg":"创建一个新直播间弹幕监听成功"}


@app.get("/del")
async def rm_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if room:
        room.cancel()
        BLIVER_POOL.pop(roomid)
    return {"msg":"移除直播间弹幕监听成功"}


@app.get("/show")
async def show():
    return list(BLIVER_POOL.keys())

项目简介

  • blive 文件夹为框架代码

    • core.py 为B站ws直播聊天室协议包处理的核心代码

    • eeframework.py 为框架代码

    • msg.py 为消息操作类代码

  • example/app.py 以框架形式运行例子

  • example/multi_room.py 同时监听多个直播间的实现

  • example/with_fastapi.py 与fastapi 配合使用的例子

TODO

  • 更多的消息操作类(欢迎各位提pr)
  • 尝试加入中间件架构(目前感觉需求不大)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

blive-0.2.5.tar.gz (13.1 kB view hashes)

Uploaded Source

Built Distribution

blive-0.2.5-py3-none-any.whl (13.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page