Lightweight event bus support sync and async, inspired by google guava event bus
Project description
同时支持的同步和异步消息通知的eventbus,灵感来自与google guava eventbus
用法: 1.定义接收消息类型:
class IntMessage(int):
pass
class DictMessage(dict):
pass
2.定义需要接收消息的Observer类,需继承subscriber.DefaultSubscriber, 并实现subscribe方法,subscribe方法的参数类型与自定义的消息类型保持一致
class CConcreteObserver(DefaultSubscriber):
def subscribe(self, msg: msg.IntMessage):
time.sleep(2)
print(f"{self.__class__.__name__}#{inspect.stack()[0][3]}: msg:{msg}")
3.根据消息类型,添加接收者
OBSERVER_REGISTRY = [
{
msg_type(msg.DictMessage()): [
subscriber.AConcreteObserver()
]
},
{
msg_type(msg.IntMessage()): [
subscriber.BConcreteObserver(),
subscriber.CConcreteObserver(),
]
},
]
4.在需要的地方引入eventbus,根据自己的需求选择支持同步的方式(eventbus.EventBus)或异步的方式(eventbus.AsyncEventBus) 在消息接收处,调用notify接口发送消息给对应的所有观察者
sync_event_bus = eventbus.AsyncEventBus()
sync_event_bus.notify(msg.IntMessage(1211))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pyeventbus2-1.0.0.tar.gz
(6.9 kB
view details)
File details
Details for the file pyeventbus2-1.0.0.tar.gz
.
File metadata
- Download URL: pyeventbus2-1.0.0.tar.gz
- Upload date:
- Size: 6.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.7.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0461eaa5e2948e9d0ff5ccf51ed5d668a0716d7b4e809d7e92851f97e476e003 |
|
MD5 | 986dbe15187a2698617692056096f32c |
|
BLAKE2b-256 | 8e713f4043f359bf0eb859305589103e3ac6f5fddf3a8dff7948be808de8c3f9 |