Remote Graia Broadcast.
Project description
安装
pip install calamity
pdm add calamity
特性
- 兼容 Graia Broadcast
- 兼容 Lacia
使用
入门
Server 端
from calamity import Calamity
Calamity.server()
Client 端
- 客户端 A
import asyncio
from calamity import Calamity
from graia.broadcast import Broadcast, Dispatchable, BaseDispatcher, DispatcherInterface
loop = asyncio.get_event_loop()
bcc = Broadcast(loop=loop)
bcc = Calamity.patch()
class ExampleEvent(Dispatchable):
class Dispatcher(BaseDispatcher):
@staticmethod
async def catch(interface: DispatcherInterface):
if interface.annotation is str:
return "Calamity"
@bcc.receiver(ExampleEvent, remote=True)
async def event_receiver(event: str):
print("Hello Broadcast!", event)
@bcc.receiver(ExampleEvent)
async def event_receiver1(event: str):
print("Hello Lacia!", event)
async def main():
bcc.postEvent(ExampleEvent())
loop.run_until_complete(main())
loop.run_forever()
- 客户端 B
import asyncio
from calamity import Calamity
from graia.broadcast import Broadcast, Dispatchable, BaseDispatcher, DispatcherInterface
loop = asyncio.get_event_loop()
bcc = Broadcast(loop=loop)
bcc = Calamity.patch(debug=True)
class ExampleEvent(Dispatchable):
class Dispatcher(BaseDispatcher):
@staticmethod
async def catch(interface: DispatcherInterface):
if interface.annotation is str:
return "Calamity"
@bcc.receiver(ExampleEvent)
async def event_receiver(event: str):
print("Hello Broadcast!", event)
@bcc.receiver(ExampleEvent)
async def event_receiver1(event: str):
print("Hello Lacia!", event)
async def main():
bcc.postEvent(ExampleEvent(), remote=True)
loop.run_until_complete(main())
loop.run_forever()
提示
bcc = Broadcast(loop=loop) # 声明 bcc
bcc = Calamity.patch(debug=True) # remote bcc
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
calamity-0.1.0.tar.gz
(4.7 kB
view hashes)