Event-driven data pipelines
The primary use cases of eventkit are
- to send events between loosely coupled components;
- to compose all kinds of event-driven data pipelines.
The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.
See the examples and the introduction notebook to get a true feel for the possibilities.
pip3 install eventkit
Python version 3.6 or higher is required.
Create an event and connect two listeners
import eventkit as ev def f(a, b): print(a * b) def g(a, b): print(a / b) event = ev.Event() event += f event += g event.emit(10, 5)
Create a simple pipeline
import eventkit as ev event = ( ev.Sequence('abcde') .map(str.upper) .enumerate() ) print(event.run()) # in Jupyter: await event.list()
[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]
Create a pipeline to get a running average and standard deviation
import random import eventkit as ev source = ev.Range(1000).map(lambda i: random.gauss(0, 1)) event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip() print(event.last().run()) # in Jupyter: await event.last()
Combine async iterators together
import asyncio import eventkit as ev async def ait(r): for i in r: await asyncio.sleep(0.1) yield i async def main(): async for t in ev.Zip(ait('XYZ'), ait('123')): print(t) asyncio.get_event_loop().run_until_complete(main()) # in Jupyter: await main()
('X', '1') ('Y', '2') ('Z', '3')
Real-time video analysis pipeline
self.video = VideoStream(conf.CAM_ID) scene = self.video | FaceTracker | SceneAnalyzer lastScene = scene.aiter(skip_to_last=True) async for frame, persons in lastScene: ...
The distex library provides a poolmap extension method to put multiple cores or machines to use:
from distex import Pool import eventkit as ev import bz2 pool = Pool() # await pool # un-comment in Jupyter data = [b'A' * 1000000] * 1000 pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last() print(pipe.run()) # in Jupyter: print(await pipe) pool.shutdown()
The complete API documentation.
Release history Release notifications | RSS feed
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.