Skip to main content

Event system

Project description

PyPi Documentation

About eventkit

Send events between loosely coupled components, compose all kinds of data pipelines.

See the examples and the introduction notebook to get a true feel for the possibilities.

Installation

pip3 install eventkit

Python version 3.6 or higher is required.

Examples

Create an event and connect two listeners

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

Create a simple pipeline

import eventkit as ev

event = (
    ev.Sequence('abcde')
    .map(str.upper)
    .enumerate()
)

print(event.run())  # in Jupyter: await event.list()

Output:

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

Create a pipeline to get a running average and standard deviation

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()

Output:

[(0.00790957852672618, 1.0345673260655333)]

Combine async iterators together

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):
        print(t)

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()

Output:

('X', '1')
('Y', '2')
('Z', '3')

Realtime video analysis pipeline

self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, faces, persons in lastScene:
    ...

Full source code

Distributed computing

The distex library provides a poolmap extension method to put multiple cores or machines to use:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

Inspired by:

Documentation

The complete API documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventkit-0.8.2.tar.gz (25.5 kB view details)

Uploaded Source

Built Distribution

eventkit-0.8.2-py3-none-any.whl (30.6 kB view details)

Uploaded Python 3

File details

Details for the file eventkit-0.8.2.tar.gz.

File metadata

  • Download URL: eventkit-0.8.2.tar.gz
  • Upload date:
  • Size: 25.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.2.tar.gz
Algorithm Hash digest
SHA256 4f29f0acc6c26512b6e235a3aadebbe45e19baedceaa877249bb7465573c6afe
MD5 8232ea50b9129f47dfc1322143e79557
BLAKE2b-256 22c4bbed0b4c395e4be12b8bb535161b9a9d7edf237291c92931fd19bc99ac60

See more details on using hashes here.

File details

Details for the file eventkit-0.8.2-py3-none-any.whl.

File metadata

  • Download URL: eventkit-0.8.2-py3-none-any.whl
  • Upload date:
  • Size: 30.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d7ee1232b4b1a2c7f97b76c1e30485a25d4f01d2cc3f87c6305f57c550e5a447
MD5 61f5843895da59bb3838a7f99fa614f9
BLAKE2b-256 5424d9325939044d26fd23150353f1caf52b89a26c815eec24e52035d52ee806

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page