Skip to main content

Build event-driven workflows with python async functions

Project description

drive-flow

Build event-driven workflows with python async functions

🌬️ Zero dependency. No trouble, no loss.

🍰 With intuitive decorators, write your async workflow like a piece of cake.

🔄 Support dynamic dispatch(goto, abort). Create a looping or if-else workflow with ease.

🔜 Fully asynchronous. Events are always triggered at the same time if they listen to the same group!

Install

Install from PyPi

pip install drive-flow

Install from source

# clone this repo first
cd drive-flow
pip install -e .

Quick Start

A hello world example:

import asyncio
from drive_flow import EventInput, default_drive


@default_drive.make_event
async def hello(event: EventInput, global_ctx):
    print("hello")

@default_drive.listen_group([hello])
async def world(event: EventInput, global_ctx):
    print("world")

# display the dependencies of 'world' event
print(world.debug_string()) 
asyncio.run(default_drive.invoke_event(hello))

In this example, The return of hello event will trigger world event.

[!TIP]

Hello world is not cool enough? Try to build a ReAct Agent Workflow with drive-flow

Break-down

To make an event function, there are few elements:

  • Input Signature: must be (event: EventInput, global_ctx). EventInput is the returns of the listening groups. global_ctx is set by you when invoking events, it can be anything and default to None.

    This example shows how to get returns from EventInput .

  • Make sure you decorate the function with @default_drive.make_event or @default_drive.listen_group([EVENT,...])

Then, run your workflow from any event:

await default_drive.invoke_event(EVENT, EVENT_INPUT, GLOBAL_CTX)

Check out examples for more detailed usages and features!

Features

Multi-Recv

drive_flow allow an event to be triggered only when a group of events are produced:

code snippet
@default_drive.make_event
async def start(event: EventInput, global_ctx):
    print("start")
    
@default_drive.listen_group([start])
async def hello(event: EventInput, global_ctx):
    return 1


@default_drive.listen_group([start])
async def world(event: EventInput, global_ctx):
    return 2


@default_drive.listen_group([hello, world])
async def adding(event: EventInput, global_ctx):
    results = event.results
    print("adding", hello, world)
    return results[hello.id] + results[world.id]


results = asyncio.run(default_drive.invoke_event(start))
assert results[adding.id] == 3

adding will be triggered at first time as long as hello and world are done.

Re-trigger the event

drive_flow suppports different behaviors for multi-event retriggering:

  • all: retrigger this event only when all the listening events are updated.
  • any: retrigger this event as long as one of the listening events is updated.

Check out this example for more details

Parallel

drive_flow is perfect for workflows that have many network IO that can be awaited in parallel. If two events are listened to the same group of events, then they will be triggered at the same time:

code snippet
@default_drive.make_event
async def start(event: EventInput, global_ctx):
    print("start")

@default_drive.listen_group([start])
async def hello(event: EventInput, global_ctx):
    print(datetime.now(), "hello")
    await asyncio.sleep(0.2)
    print(datetime.now(), "hello done")


@default_drive.listen_group([start])
async def world(event: EventInput, global_ctx):
    print(datetime.now(), "world")
    await asyncio.sleep(0.2)
    print(datetime.now(), "world done")

asyncio.run(default_drive.invoke_event(start))

Dynamic

drive_flow is dynamic. You can use goto and abort to change the workflow at runtime:

code snippet for abort_this
from drive_flow.dynamic import abort_this

@default_drive.make_event
async def a(event: EventInput, global_ctx):
    return abort_this()
# abort_this is not exiting the whole workflow,
# only abort this event's return and not causing any other influence
# `a` chooses to abort its return. So no more events in this invoking.
# this invoking then will end
@default_drive.listen_group([a])
async def b(event: EventInput, global_ctx):
    assert False, "should not be called"
    
asyncio.run(default_drive.invoke_event(a))
code snippet for goto
from drive_flow.types import ReturnBehavior
from drive_flow.dynamic import goto_events, abort_this

call_a_count = 0
@default_drive.make_event
async def a(event: EventInput, global_ctx):
    global call_a_count
    if call_a_count == 0:
        assert event is None
    elif call_a_count == 1:
        assert event.behavior == ReturnBehavior.GOTO
        assert event.results == {b.id: 2}
        return abort_this()
    call_a_count += 1
    return 1

@default_drive.listen_group([a])
async def b(event: EventInput, global_ctx):
    return goto_events([a], 2)

@default_drive.listen_group([b])
async def c(event: EventInput, global_ctx):
    assert False, "should not be called"
    
asyncio.run(default_drive.invoke_event(a))

TODO

  • fix: streaming event executation
  • fix: an event never receive the listened events' results twice (de-duplication), unless the group is totally updated for retrigger_type='all'
  • Add ReAct workflow example

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

drive_flow-0.0.1.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

drive_flow-0.0.1-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file drive_flow-0.0.1.tar.gz.

File metadata

  • Download URL: drive_flow-0.0.1.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.19

File hashes

Hashes for drive_flow-0.0.1.tar.gz
Algorithm Hash digest
SHA256 d1aa2031b41d156a8175215311f93ae7543ad2b6d749570d809502f21c90bee9
MD5 95e6a3cfc84815e3a87667d423aa1f2d
BLAKE2b-256 6bd6968cc59ede41c6cc3f543e027bd011d1af4da48ef13b09d1a976540e8c4b

See more details on using hashes here.

File details

Details for the file drive_flow-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: drive_flow-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.19

File hashes

Hashes for drive_flow-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9a3aa1d7beea0a00aef6be3e723899dce86bd2baf05061528212661a62be9934
MD5 061487b52f39525e459a80e1ddf51337
BLAKE2b-256 f3ffeb27b6571ac85120e728f463be739cd0f6f546143ac7900cb63f7df32987

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page