A simple library for managing events through an asynchronous queue
Project description
A simple library for managing events through an asynchronous queue
Installation
pip install aioevents-ng
Note: for python 3.6 you need to install dataclasses
pip install dataclasses
Usage example
import asyncio
from dataclasses import dataclass
import aioevents
@dataclass
class MyEvent(aioevents.Event):
payload: str
@aioevents.manager.register(MyEvent)
async def event_hadler(event: aioevents.Event):
print(f"recieved: {event}")
async def produce():
async with aioevents.events as events:
await events.publish(MyEvent("Hello!"))
async def main():
aioevents.start(asyncio.get_event_loop())
await produce()
print('stopping worker')
aioevents.stop()
# wait for all coroutines
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
License
aioevents library is offered under Apache 2 license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aioevents-ng-0.0.3.tar.gz
(3.1 kB
view hashes)
Built Distribution
Close
Hashes for aioevents_ng-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3eb8d9e3d1c8034788d40cee59b3fb45d4c5bf4714e4040f76471072070ac22d |
|
MD5 | b204b2d5db59fa6e25b707b131004d31 |
|
BLAKE2b-256 | 0f5668c2e510f5eec4371e273d71c13893460fdb57b106a25bcf597f61a8de7f |