Skip to main content

Event-driven data pipelines

Project description

Travis build PyPi Documentation


The primary use cases of eventkit are

  • to send events between loosely coupled components;
  • to compose all kinds of event-driven data pipelines.

The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.

See the examples and the introduction notebook to get a true feel for the possibilities.


pip3 install eventkit

Python version 3.6 or higher is required.


Create an event and connect two listeners

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

Create a simple pipeline

import eventkit as ev

event = (

print(  # in Jupyter: await event.list()


[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

Create a pipeline to get a running average and standard deviation

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()


[(0.00790957852672618, 1.0345673260655333)]

Combine async iterators together

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()


('X', '1')
('Y', '2')
('Z', '3')

Real-time video analysis pipeline = VideoStream(conf.CAM_ID)
scene = | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:

Full source code

Distributed computing

The distex library provides a poolmap extension method to put multiple cores or machines to use:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(  # in Jupyter: print(await pipe)


The complete API documentation.

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for eventkit, version 0.8.6
Filename, size File type Python version Upload date Hashes
Filename, size eventkit-0.8.6-py3-none-any.whl (31.1 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size eventkit-0.8.6.tar.gz (26.2 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page