event bus implementation in python with sync and async support
Project description
event-bus
EventBus implementation in python
Examples
sync
from multi_event_bus import EventBus
eb = EventBus(redis_host="127.0.0.1", redis_port=6379)
eb.dispatch("event-name", payload={"num": 1})
eb.subscribe_to("event-name", consumer_id="consumer-1", offset=0)
event, topic = eb.get(consumer_id="consumer-1") # Blocking
print(event.payload) # -> {"num": 1}
async
from multi_event_bus import AsyncEventBus
eb = AsyncEventBus(redis_host="127.0.0.1", redis_port=6379)
eb.dispatch("event-name", payload={"num": 1})
eb.subscribe_to("event-name", consumer_id="consumer-2", offset=0)
event, topic = await eb.get(consumer_id="consumer-2")
print(event.payload) # -> {"num": 1}
register event schema
from multi_event_bus import EventBus
eb = EventBus(redis_host="127.0.0.1", redis_port=6379)
# Enforce json schema of event
json_schema = {
"type": "object",
"properties": {"num": {"type": "string"}}
}
eb.register_event_schema("event-name", schema=json_schema)
eb.dispatch("event-name", payload={"num": "7854"})
eb.subscribe_to("event-name", consumer_id="consumer-3", offset=0)
event, topic = eb.get(consumer_id="consumer-3") # Blocking
print(event.payload) # -> {"num": "7854"}
Development
scripts
poetry run run_pytest
poetry run run_flake8
poetry run run_mypy
poetry run run_black
run tests
poetry run test
run all (test and black)
poetry run all
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
multi_event_bus-0.2.0.tar.gz
(4.2 kB
view hashes)
Built Distribution
Close
Hashes for multi_event_bus-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6a55ae9a7dcdaf294f3f26eb93bf526ed6ef1f76956879a59e79390817c42739 |
|
MD5 | 4ff492e96290cd41c43aae97805f51c2 |
|
BLAKE2b-256 | b3300923f0bdc90d27f2e59254563cda53b340d08c9b42c5375b99ac261ae30c |