Skip to main content

A workflow engine to run AI applications with event-driven, stepwise control.

Project description

Novastack workflows Logo

A powerful and flexible event-driven workflow engine for Python, designed to build complex asynchronous workflows with ease.

PyPI - Version Testing Ruff Pepy Total Downloads PyPI - License

Installation

pip install novastack-workflows

Quick Start

Here's a simple example to get you started with Novastack Workflows:

from novastack_workflows import Workflow, Context, step
from novastack_workflows.events import Event, StartEvent, StopEvent


class MessageEvent(Event):
    message: str

class MyWorkflow(Workflow):

    @step(depends_on=StartEvent)
    async def start(self, ctx: Context, ev: StartEvent) -> MessageEvent:
        
        input_msg = ev.get("message", "")
        return MessageEvent(message=f"Processed: {input_msg}")

    @step(depends_on=MessageEvent)
    async def process(self, ctx: Context, ev: MessageEvent) -> StopEvent:
        return StopEvent(result=ev.message)


async def main():
    workflow = MyWorkflow()
    result = await workflow.run(input_msg="Hello, World!")

    print(result)

Core Concepts

Workflow

A workflow is a class that inherits from Workflow and contains one or more steps. It orchestrates the execution of steps based on events.

Steps

Steps are asynchronous methods decorated with @step(depends_on=EventType) that define what happens when a specific event is received.

  • Steps receive a Context and an Event.
  • Steps can return new events to trigger subsequent steps.

Events

Events are the building blocks of workflows. They carry data between steps and trigger step execution.

  • StartEvent: Automatically triggered when a workflow starts
  • StopEvent: Signals the end of a workflow and carries the final result
  • Custom Events: Define your own events by inheriting from Event

Context

The Context object provides access to workflow state and allows steps to share data throughout the workflow execution.

# Read-only access
current_value = ctx.state.count

# Edit state
async with ctx.store.edit_state() as state:
    state.count = current_counter + 1

# Send events
ctx.send_event(MyEvent(...))

Features

Feature Novastack Workflows
Event-driven execution
Fan-out (parallelism)
Async execution
Shared state
Event joins
Internal buffer.
Declarative API

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

novastack_workflows-2.0.1.tar.gz (26.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

novastack_workflows-2.0.1-py3-none-any.whl (34.7 kB view details)

Uploaded Python 3

File details

Details for the file novastack_workflows-2.0.1.tar.gz.

File metadata

  • Download URL: novastack_workflows-2.0.1.tar.gz
  • Upload date:
  • Size: 26.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for novastack_workflows-2.0.1.tar.gz
Algorithm Hash digest
SHA256 65b19c469c6608a42b73c2f41af02fbfb240d4bdb3c3a948cd32d2a40098ffa5
MD5 449bf1225868eeec9cacc98132ab57c8
BLAKE2b-256 d71340530dbec0ab7f50ddc255a7fa60464f3dbfba7c88b3fd243daa72b788d1

See more details on using hashes here.

File details

Details for the file novastack_workflows-2.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for novastack_workflows-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8a3d7e387f86aa9e9635896133cf2171a72d5c1da3455afc99a2d20e513a0032
MD5 48e7729de743589b987c241bd4d9f066
BLAKE2b-256 e2c7ce28e38e0358912753f7b20b5d38d72d0b5003206f743c40a6718d8a4d49

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page