a event bus
Project description
fancy-event
Usage
import fancy.eventbus as fe
class MyEvent:
value: int
def __init__(self, value):
self.value = value
class MyEvent2:
pass
class Target:
def __init__(self):
fe.register(self)
@fe.SubscribeEvent()
def on_start(self, event: fe.StartEvent):
print("start!")
fe.post(MyEvent(1))
@fe.SubscribeEvent()
def my_event(self, event: MyEvent):
print("my_event", event.value)
fe.post(MyEvent2())
fe.post(self)
@fe.SubscribeEvent()
def my_event2(self, event: MyEvent2):
print("my_event2")
raise RuntimeError("an error")
@fe.SubscribeEvent(priority=fe.EventPriority.HIGHEST) # execute before my_event2
def self_as_event(self, event: "Target"): # forwarding ref
print("self_as_event", event is self)
@fe.SubscribeEvent()
def error_handler(self, e: RuntimeError):
print("handle by runtime error", e)
@fe.SubscribeEvent(priority=fe.EventPriority.LOW) # method name should different
def error_handler2(self, e: Exception):
print("handle by exception", e)
return False # returned false will cancel the event
@fe.SubscribeEvent(priority=fe.EventPriority.LOWEST)
def error_handler3(self, e: BaseException):
print("this should be canceled.", e)
loop = fe.EventLoop()
# registers
Target()
loop.start()
start!
my_event 1
self_as_event True
my_event2
handle by runtime error an error
handle by exception an error
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
fancy-eventbus-0.1.0a0.tar.gz
(7.5 kB
view hashes)
Built Distribution
Close
Hashes for fancy_eventbus-0.1.0a0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7a9b9903c0d9b4b11e4ae35be0661a4329a5d55046346b6eac21b71d81aaadb2 |
|
MD5 | 16c2f89009959ed136b1732ef21f23c6 |
|
BLAKE2b-256 | 16b0d8941c6b9c69df0b9c4a248479f107b547ae17840cf1b5b5bd79f48a9d9b |