A workflow engine to run AI applications with event-driven, stepwise control.
Project description
A powerful and flexible event-driven workflow engine for Python, designed to build complex asynchronous workflows with ease.
Installation
pip install novastack-workflows
Quick Start
Here's a simple example to get you started with Novastack Workflows:
from novastack_workflows import Workflow, Context, step
from novastack_workflows.events import Event, StartEvent, StopEvent
class MessageEvent(Event):
message: str
class MyWorkflow(Workflow):
@step(depends_on=StartEvent)
async def start(self, ctx: Context, ev: StartEvent) -> MessageEvent:
input_msg = ev.get("message", "")
return MessageEvent(message=f"Processed: {input_msg}")
@step(depends_on=MessageEvent)
async def process(self, ctx: Context, ev: MessageEvent) -> StopEvent:
return StopEvent(result=ev.message)
async def main():
workflow = MyWorkflow()
result = await workflow.run(input_msg="Hello, World!")
print(result)
Core Concepts
Workflow
A workflow is a class that inherits from Workflow and contains one or more steps. It orchestrates the execution of steps based on events.
Steps
Steps are asynchronous methods decorated with @step(depends_on=EventType) that define what happens when a specific event is received.
- Steps receive a
Contextand anEvent. - Steps can return new events to trigger subsequent steps.
Events
Events are the building blocks of workflows. They carry data between steps and trigger step execution.
- StartEvent: Automatically triggered when a workflow starts
- StopEvent: Signals the end of a workflow and carries the final result
- Custom Events: Define your own events by inheriting from
Event
Context
The Context object provides access to workflow state and allows steps to share data throughout the workflow execution.
# Read-only access
current_value = ctx.state.count
# Edit state
async with ctx.store.edit_state() as state:
state.count = current_counter + 1
# Send events
ctx.send_event(MyEvent(...))
Features
| Feature | Novastack Workflows |
|---|---|
| Event-driven execution | ✅ |
| Fan-out (parallelism) | ✅ |
| Async execution | ✅ |
| Shared state | ✅ |
| Event joins | ✅ |
| Internal buffer. | ✅ |
| Declarative API | ✅ |
License
Apache License 2.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file novastack_workflows-2.0.1.tar.gz.
File metadata
- Download URL: novastack_workflows-2.0.1.tar.gz
- Upload date:
- Size: 26.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
65b19c469c6608a42b73c2f41af02fbfb240d4bdb3c3a948cd32d2a40098ffa5
|
|
| MD5 |
449bf1225868eeec9cacc98132ab57c8
|
|
| BLAKE2b-256 |
d71340530dbec0ab7f50ddc255a7fa60464f3dbfba7c88b3fd243daa72b788d1
|
File details
Details for the file novastack_workflows-2.0.1-py3-none-any.whl.
File metadata
- Download URL: novastack_workflows-2.0.1-py3-none-any.whl
- Upload date:
- Size: 34.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8a3d7e387f86aa9e9635896133cf2171a72d5c1da3455afc99a2d20e513a0032
|
|
| MD5 |
48e7729de743589b987c241bd4d9f066
|
|
| BLAKE2b-256 |
e2c7ce28e38e0358912753f7b20b5d38d72d0b5003206f743c40a6718d8a4d49
|