Skip to main content

A workflow engine to run AI applications with event-driven, stepwise control.

Project description

NovaStack Workflows

PyPI - Version Hatch project Testing Ruff Pepy Total Downloads PyPI - License

A powerful and flexible event-driven workflow engine for Python, designed to build complex asynchronous workflows with ease.

Installation

pip install novastack-workflows

Quick Start

Here's a simple example to get you started with NovaStack Workflows:

from novastack.workflows import Workflow, Context, step
from novastack.workflows.events import Event, StartEvent, StopEvent


class MessageEvent(Event):
    message: str

class MyWorkflow(Workflow):

    @step(depends_on=StartEvent)
    async def start(self, ctx: Context, ev: StartEvent) -> MessageEvent:
        
        input_msg = ev.get("input_msg", "")
        return MessageEvent(message=f"Processed: {input_msg}")

    @step(depends_on=MessageEvent)
    async def process(self, ctx: Context, ev: MessageEvent) -> StopEvent:
        return StopEvent(result=ev.message)


async def main():
    workflow = MyWorkflow()
    result = await workflow.run(input_msg="Hello, World!")

    print(result)

Core Concepts

Workflow

A workflow is a class that inherits from Workflow and contains one or more steps. It orchestrates the execution of steps based on events.

Steps

Steps are asynchronous methods decorated with @step(depends_on=EventType) that define what happens when a specific event is received.

  • Steps receive a Context and an Event.
  • Steps can return new events to trigger subsequent steps.

Events

Events are the building blocks of workflows. They carry data between steps and trigger step execution.

  • StartEvent: Automatically triggered when a workflow starts
  • StopEvent: Signals the end of a workflow and carries the final result
  • Custom Events: Define your own events by inheriting from Event

Context

The Context object provides access to workflow state and allows steps to share data throughout the workflow execution.

# Read-only access
current_value = ctx.state.count

# Edit state
async with ctx.store.edit_state() as state:
    state.count = current_counter + 1

# Emit events
ctx.send_event(MyEvent(...))

Features

Feature NovaStack Workflows
Event-driven execution
Fan-out (parallelism)
Async execution
Shared state
Event joins
Internal buffer.
Declarative API

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

novastack_workflows-1.2.1.tar.gz (24.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

novastack_workflows-1.2.1-py3-none-any.whl (31.7 kB view details)

Uploaded Python 3

File details

Details for the file novastack_workflows-1.2.1.tar.gz.

File metadata

  • Download URL: novastack_workflows-1.2.1.tar.gz
  • Upload date:
  • Size: 24.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for novastack_workflows-1.2.1.tar.gz
Algorithm Hash digest
SHA256 7dad6dbfc17946a541c2a2da6c92db42418da1d6f3fd43fa581b22e49be3515c
MD5 a8f66028e97373a5a679e22d73dcac89
BLAKE2b-256 fab302518a3aba74efa7f22652b6e75650359b5423f211949e22091ece5d22f0

See more details on using hashes here.

File details

Details for the file novastack_workflows-1.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for novastack_workflows-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9e569a666ba4b8a2616bbeb7c30b3be94ff9c71e42cd76f7168cc6e7081bb922
MD5 1c599243afa035710af6163176626ef9
BLAKE2b-256 52bad0af0c9d8cad94e76042d5fb9a35a9ca0337f1e5f1a2a86842b13140f611

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page