Skip to main content

Event-driven data pipelines

Project description

Travis build PyPi Documentation

Introduction

The primary use cases of eventmix are

  • to send events between loosely coupled components;

  • to compose all kinds of event-driven data pipelines.

The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.

See the examples and the introduction notebook to get a true feel for the possibilities.

Installation

pip3 install eventkit

Python version 3.6 or higher is required.

Examples

Create an event and connect two listeners

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

Create a simple pipeline

import eventkit as ev

event = (
    ev.Sequence('abcde')
    .map(str.upper)
    .enumerate()
)

print(event.run())  # in Jupyter: await event.list()

Output:

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

Create a pipeline to get a running average and standard deviation

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()

Output:

[(0.00790957852672618, 1.0345673260655333)]

Combine async iterators together

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):
        print(t)

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()

Output:

('X', '1')
('Y', '2')
('Z', '3')

Real-time video analysis pipeline

self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:
    ...

Full source code

Distributed computing

The distex library provides a poolmap extension method to put multiple cores or machines to use:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

Inspired by:

Documentation

The complete API documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventkit-0.8.4.tar.gz (26.1 kB view details)

Uploaded Source

Built Distribution

eventkit-0.8.4-py3-none-any.whl (31.0 kB view details)

Uploaded Python 3

File details

Details for the file eventkit-0.8.4.tar.gz.

File metadata

  • Download URL: eventkit-0.8.4.tar.gz
  • Upload date:
  • Size: 26.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.4.tar.gz
Algorithm Hash digest
SHA256 5800d674b3b61b14bf0a69a40a0937663a3b0ac6cffd7a02248f7a7d4508c8bd
MD5 dc5e38a998c58ba8aa9cc4b9d4013ef5
BLAKE2b-256 923e17f2fd0fdbeaed9c4594e05c6f1c280e357db3f3ae0b6b9ef91ec684a68a

See more details on using hashes here.

File details

Details for the file eventkit-0.8.4-py3-none-any.whl.

File metadata

  • Download URL: eventkit-0.8.4-py3-none-any.whl
  • Upload date:
  • Size: 31.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.2

File hashes

Hashes for eventkit-0.8.4-py3-none-any.whl
Algorithm Hash digest
SHA256 082a62be683e3594c9fc89722000b6754582c3148e9ed3c150d055f48f0faf10
MD5 4fea2c78f4e62a16373ea70e3360b092
BLAKE2b-256 afea4bc9ebf0f216281ccad5afee25bfc43d347aeecf97bd438d6c5966288224

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page