Skip to main content

Lightweight event bus support sync and async, inspired by google guava event bus

Project description

同时支持的同步和异步消息通知的eventbus,灵感来自与google guava eventbus

用法: 1.定义接收消息类型:

class IntMessage(int):
    pass

class DictMessage(dict):
    pass

2.定义需要接收消息的Observer类,需继承subscriber.DefaultSubscriber, 并实现subscribe方法,subscribe方法的参数类型与自定义的消息类型保持一致

class CConcreteObserver(DefaultSubscriber):

    def subscribe(self, msg: msg.IntMessage):
        time.sleep(2)
        print(f"{self.__class__.__name__}#{inspect.stack()[0][3]}: msg:{msg}")

3.根据消息类型,添加接收者

OBSERVER_REGISTRY = [
    {
        msg_type(msg.DictMessage()): [
            subscriber.AConcreteObserver()
        ]
    },
    {
        msg_type(msg.IntMessage()): [
            subscriber.BConcreteObserver(),
            subscriber.CConcreteObserver(),

        ]
    },
]

4.在需要的地方引入eventbus,根据自己的需求选择支持同步的方式(eventbus.EventBus)或异步的方式(eventbus.AsyncEventBus) 在消息接收处,调用notify接口发送消息给对应的所有观察者

sync_event_bus = eventbus.AsyncEventBus()
sync_event_bus.notify(msg.IntMessage(1211))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventbus2-1.0.0.tar.gz (6.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page