Skip to main content

A simple library for managing events through an asynchronous queue

Project description

https://travis-ci.com/mpyatishev/aioevents.svg?branch=master https://codecov.io/gh/mpyatishev/aioevents/branch/master/graph/badge.svg https://img.shields.io/pypi/v/aioevents-ng.svg

A simple library for managing events through an asynchronous queue

Installation

pip install aioevents-ng

Usage example

import asyncio

from dataclasses import dataclass

import aioevents


@dataclass
class MyEvent(aioevents.Event):
   payload: str


@aioevents.manager.register(MyEvent)
async def event_hadler(event: aioevents.Event):
   print(f"recieved: {event}")


async def produce():
   async with aioevents.events as events:
      await events.publish(MyEvent("Hello!"))


async def main():
   aioevents.start(asyncio.get_event_loop())

   await produce()

   print('stopping worker')
   aioevents.stop()

   # wait for all coroutines
   await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(main())

License

aioevents library is offered under Apache 2 license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioevents-ng-0.0.9.tar.gz (3.5 kB view hashes)

Uploaded Source

Built Distribution

aioevents_ng-0.0.9-py3-none-any.whl (7.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page