Skip to main content

Build event-driven workflows with python functions

Project description

drive-events

Build event-driven workflows with python async functions

Install

Install from PyPi

pip install drive-events

Install from source

# clone this repo first
cd drive-events
pip install -e .

Quick Start

A hello world example:

import asyncio
from drive_events import EventInput, default_drive


@default_drive.make_event
async def hello(event: EventInput, global_ctx):
    print("hello")

@default_drive.listen_group([hello])
async def world(event: EventInput, global_ctx):
    print("world")

asyncio.run(default_drive.invoke_event(hello))

In this example, The return of hello event will trigger world event.

To make an event function, there are few elements:

  • Input Signature: must be (event: EventInput, global_ctx)
    • EventInput is the returns of the listening groups.
    • global_ctx is set by you when invoking events, it can be anything and default to None
  • Make sure you decorate the function with @default_drive.make_event or @default_drive.listen_group([EVENT,...])

Then, run your workflow from any event:

await default_drive.invoke_event(EVENT, EVENT_INPUT, GLOBAL_CTX)

Check out examples for more user cases!

Features

Multi-Recv

drive_events allow an event to be triggered only when a group of events are produced:

code snippet
@default_drive.make_event
async def start(event: EventInput, global_ctx):
    print("start")
    
@default_drive.listen_group([start])
async def hello(event: EventInput, global_ctx):
    return 1


@default_drive.listen_group([start])
async def world(event: EventInput, global_ctx):
    return 2


@default_drive.listen_group([hello, world])
async def adding(event: EventInput, global_ctx):
    results = event.results
    print("adding", hello, world)
    return results[hello.id] + results[world.id]


results = asyncio.run(default_drive.invoke_event(start))
assert results[adding.id] == 3

Parallel

drive_events is perfect for workflows that have many network IO that can be awaited in parallel. If two events are listened to the same group of events, then they will be triggered at the same time:

code snippet
@default_drive.make_event
async def start(event: EventInput, global_ctx):
    print("start")

@default_drive.listen_group([start])
async def hello(event: EventInput, global_ctx):
    print(datetime.now(), "hello")
    await asyncio.sleep(0.2)
    print(datetime.now(), "hello done")


@default_drive.listen_group([start])
async def world(event: EventInput, global_ctx):
    print(datetime.now(), "world")
    await asyncio.sleep(0.2)
    print(datetime.now(), "world done")


asyncio.run(default_drive.invoke_event(start))

Dynamic

drive_events is dynamic. You can use goto and abort to change the workflow at runtime:

code snippet for abort
from drive_events.dynamic import abort_this

@default_drive.make_event
async def a(event: EventInput, global_ctx):
    return abort_this()

@default_drive.listen_group([a])
async def b(event: EventInput, global_ctx):
    assert False, "should not be called"
    
asyncio.run(default_drive.invoke_event(a))
code snippet for goto
from drive_events.types import ReturnBehavior
from drive_events.dynamic import goto_events, abort_this

call_a_count = 0
@default_drive.make_event
async def a(event: EventInput, global_ctx):
    global call_a_count
    if call_a_count == 0:
        assert event is None
    elif call_a_count == 1:
        assert event.behavior == ReturnBehavior.GOTO
        assert event.results == {b.id: 2}
        return abort_this()
    call_a_count += 1
    return 1

@default_drive.listen_group([a])
async def b(event: EventInput, global_ctx):
    return goto_events([a], 2)

@default_drive.listen_group([b])
async def c(event: EventInput, global_ctx):
    assert False, "should not be called"
    
asyncio.run(default_drive.invoke_event(a))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

drive_events-0.0.1a1.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

drive_events-0.0.1a1-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file drive_events-0.0.1a1.tar.gz.

File metadata

  • Download URL: drive_events-0.0.1a1.tar.gz
  • Upload date:
  • Size: 8.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.19

File hashes

Hashes for drive_events-0.0.1a1.tar.gz
Algorithm Hash digest
SHA256 0b5730fc12101a1243d8d23620b4a527a9a8e60fc3c04da86ecd930fe8fed1d6
MD5 ddf3a3310d6d86f2aff8a072893715de
BLAKE2b-256 c95245f0024eb3efe288bd1d9d5f72a67250cb6a23e91db11d9c7fac8fd8c994

See more details on using hashes here.

File details

Details for the file drive_events-0.0.1a1-py3-none-any.whl.

File metadata

File hashes

Hashes for drive_events-0.0.1a1-py3-none-any.whl
Algorithm Hash digest
SHA256 365b8283991f275ff2f8e4c04640a80ffe8e77dce6dd93d4a91e5a9e0b0f86df
MD5 e57e30f80c9f7d1c04aca8ffd57f6c76
BLAKE2b-256 43d9e1e57aa8023d2f6f99e7e288d62f0ec83862bd81a81e3d24b4225c450a69

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page