Skip to main content

Event-driven data pipelines

Project description

Build PyPi Documentation

Introduction

The primary use cases of eventkit are

  • to send events between loosely coupled components;

  • to compose all kinds of event-driven data pipelines.

The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.

See the examples and the introduction notebook to get a true feel for the possibilities.

Note: The eventkit pypi name is now aeventkit but we still import eventkit

Installation

pip3 install aeventkit

Python version 3.10+ is required.

Examples

Create an event and connect two listeners

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

Create a simple pipeline

import eventkit as ev

event = (
    ev.Sequence('abcde')
    .map(str.upper)
    .enumerate()
)

print(event.run())  # in Jupyter: await event.list()

Output:

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

Create a pipeline to get a running average and standard deviation

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()

Output:

[(0.00790957852672618, 1.0345673260655333)]

Combine async iterators together

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):
        print(t)

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()

Output:

('X', '1')
('Y', '2')
('Z', '3')

Real-time video analysis pipeline

self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:
    ...

Full source code

Distributed computing

The distex library provides a poolmap extension method to put multiple cores or machines to use:

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

Inspired by:

Documentation

The complete API documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aeventkit-2.1.0.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

aeventkit-2.1.0-py3-none-any.whl (26.7 kB view details)

Uploaded Python 3

File details

Details for the file aeventkit-2.1.0.tar.gz.

File metadata

  • Download URL: aeventkit-2.1.0.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.5 Darwin/23.5.0

File hashes

Hashes for aeventkit-2.1.0.tar.gz
Algorithm Hash digest
SHA256 4e7d81bb0a67227121da50a23e19e5bbf13eded541a9f4857eeb6b7b857b738a
MD5 cd360a8a55e42ba9502f17f6215b54a1
BLAKE2b-256 5c8cc08db1a1910f8d04ec6a524de522edd0bac181bdf94dbb01183f7685cd77

See more details on using hashes here.

File details

Details for the file aeventkit-2.1.0-py3-none-any.whl.

File metadata

  • Download URL: aeventkit-2.1.0-py3-none-any.whl
  • Upload date:
  • Size: 26.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.12.5 Darwin/23.5.0

File hashes

Hashes for aeventkit-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 962d43f79e731ac43527f2d0defeed118e6dbaa85f1487f5667540ebb8f00729
MD5 54023dc1498c2e7bb080fc3ef753e2fb
BLAKE2b-256 8d8c2a4b912b1afa201b25bdd0f5bccf96d5a8b5dccb6131316a8dd2d9cabcc1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page