Skip to main content

Event-driven data pipelines

Project description

Travis build PyPi Documentation

Introduction

The primary use cases of eventmix are

  • to send events between loosely coupled components;

  • to compose all kinds of event-driven data pipelines.

The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.

See the examples and the introduction notebook to get a true feel for the possibilities.

Installation

pip3 install eventkit

Python version 3.6 or higher is required.

Examples

Create an event and connect two listeners

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

Create a simple pipeline

import eventkit as ev

event = (
    ev.Sequence('abcde')
    .map(str.upper)
    .enumerate()
)

print(event.run())  # in Jupyter: await event.list()

Output:

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

Create a pipeline to get a running average and standard deviation

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()

Output:

[(0.00790957852672618, 1.0345673260655333)]

Combine async iterators together

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):
        print(t)

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()

Output:

('X', '1')
('Y', '2')
('Z', '3')

Realtime video analysis pipeline

self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, faces, persons in lastScene:
    ...

Full source code

Distributed computing

The distex library provides a poolmap extension method to put multiple cores or machines to use:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

Inspired by:

Documentation

The complete API documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventkit-0.8.3.tar.gz (26.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventkit-0.8.3-py3-none-any.whl (30.9 kB view details)

Uploaded Python 3

File details

Details for the file eventkit-0.8.3.tar.gz.

File metadata

  • Download URL: eventkit-0.8.3.tar.gz
  • Upload date:
  • Size: 26.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.3.tar.gz
Algorithm Hash digest
SHA256 1bae5afeb37c98cda273cbb9374c4eafa76be0f9ba4f4ac3cb42c688a373422e
MD5 c219a05414a45ec0fe9cb2aa96a37d54
BLAKE2b-256 6ba20df01762f4095cd06464e4fe179b2937a91e008e35c8a50a3f8062860735

See more details on using hashes here.

File details

Details for the file eventkit-0.8.3-py3-none-any.whl.

File metadata

  • Download URL: eventkit-0.8.3-py3-none-any.whl
  • Upload date:
  • Size: 30.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7edc349cfa46fea86c7409346d7e28838f4ad2a2d59497314ae4a9fbe3882916
MD5 499eb3e1f908dde2ce3e0ce44ba684f4
BLAKE2b-256 7275b0a2c985aaedb5131dd76486fb266a612f6c68200e5154f17ba280bf9fdc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page