A powerful workflow orchestration engine for composing complex computational tasks from modular, type-safe nodes
Project description
Aceteam Workflow Engine
A powerful, modular workflow orchestration system designed for composing complex computational tasks from smaller, configurable steps. This engine powers the workflow functionality in Aceteam.ai and is now available as an open-source package.
Overview
The Workflow Engine enables you to:
- Define workflows as directed acyclic graphs (DAGs)
- Chain node-based tasks with type-safe data passing
- Persist and retrieve node outputs using various storage backends
- Execute workflows programmatically or via API
Installation
pip install aceteam-workflow-engine
Example
import asyncio
from workflow_engine import IntegerValue, Workflow
import workflow_engine.nodes
from workflow_engine.contexts import LocalContext
from workflow_engine.execution import TopologicalExecutionAlgorithm
context = LocalContext()
algorithm = TopologicalExecutionAlgorithm()
# Load and run a workflow
with open("examples/addition.json") as f:
workflow = Workflow.model_validate_json(f.read())
result = asyncio.run(algorithm.execute(
context=context,
workflow=workflow,
input={"c": IntegerValue(-256)},
)) # {'sum': 1811}
Check the examples directory for more sample workflows in JSON form:
addition.json: basic arithmetic operationsappend.json: text file manipulationerror.json: graceful error handling
Key Features
Core Functionality
- Graph-Based Execution: Workflows are executed as DAGs with automatic dependency resolution
- Type-Safe Data Flow: Data passing between nodes is validated using MIME types
- Flexible Storage: Supports multiple storage backends (Supabase, Local, In-Memory)
- Error Handling: Robust error propagation and logging system
- Versioning: Built-in support for workflow versioning
Workflow Structure
Workflows are defined as JSON with:
- input_node: Defines the workflow's input schema using
InputNode - inner_nodes: The processing nodes that perform computations
- output_node: Defines the workflow's output schema using
OutputNode - edges: Connect node outputs to inputs (including to/from input/output nodes)
Node Types
- InputNode: Special node defining workflow input fields and their types
- OutputNode: Special node defining workflow output fields and their types
- Processing Nodes: Execute computational tasks with configurable parameters
Storage Backends
- Supabase: Primary storage backend for production use
- Local: File-system based storage for development
- In-Memory: Lightweight storage for testing
Value Type Casting
The workflow engine supports automatic type casting between Value types. The graph below shows all available casting paths:
Value types serialize to JSON Schema for validation and type resolution. See Values for how the schema system works and limitations around deeply nested generics.
Architecture
src/workflow_engine/
├── contexts/ # Storage backend implementations
│ ├── in_memory.py # In-memory storage
│ └── local.py # Local file system storage
├── core/ # Core workflow components
│ ├── context.py # Execution context
│ ├── data.py # Data handling
│ ├── edge.py # Edge definitions
│ ├── execution.py # Execution logic
│ ├── file.py # File handling
│ ├── node.py # Node base classes
│ └── workflow.py # Workflow definitions
├── execution/ # Execution strategies
│ └── topological.py # DAG-based execution
├── nodes/ # Node implementations
│ ├── arithmetic.py # Math operations
│ ├── constant.py # Constant values
│ ├── json.py # JSON operations
│ └── text.py # Text operations
└── utils/ # Helper utilities
Development
Setup
# Using uv (recommended)
uv sync
# Using pip
pip install -e .
Testing
uv run pytest # Runs both unit and integration tests
Documentation
- Getting Started - Installation and first workflow
- Architecture - Module structure and design decisions
- Nodes - Built-in node reference
- Values - Value type system and casting rules
- Execution - Execution algorithms, retry, and rate limiting
- Contexts - Storage backends and lifecycle hooks
- Migrations - Node versioning and migration system
- Workflow Loading - Safe workflow loading with migration
- Examples
- Changelog
- Contributing
Contributing
We welcome contributions! Please see our Contributing Guide for details.
License
About
This workflow engine is developed and maintained by Adanomad Consulting and powers the workflow functionality in Aceteam.ai. For commercial support or consulting, please contact us at contact@adanomad.com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aceteam_workflow_engine-2.0.0rc2.tar.gz.
File metadata
- Download URL: aceteam_workflow_engine-2.0.0rc2.tar.gz
- Upload date:
- Size: 160.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34ee9f88985ce48a1679eff4a49b465c7acf9e14d06db355b418d1d53672453e
|
|
| MD5 |
1b1b2631e10cf0ade0a91c4dba60226d
|
|
| BLAKE2b-256 |
7b6545e4d3c8fba76d029e3b8fda9bf32ea160938a71c3d830f2c8ef22b2a92c
|
Provenance
The following attestation bundles were made for aceteam_workflow_engine-2.0.0rc2.tar.gz:
Publisher:
publish.yml on aceteam-ai/workflow-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aceteam_workflow_engine-2.0.0rc2.tar.gz -
Subject digest:
34ee9f88985ce48a1679eff4a49b465c7acf9e14d06db355b418d1d53672453e - Sigstore transparency entry: 947006831
- Sigstore integration time:
-
Permalink:
aceteam-ai/workflow-engine@7fb9943d6d314be172ca959b9ba3778dfe3dbcfc -
Branch / Tag:
refs/tags/v2.0.0rc2 - Owner: https://github.com/aceteam-ai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@7fb9943d6d314be172ca959b9ba3778dfe3dbcfc -
Trigger Event:
release
-
Statement type:
File details
Details for the file aceteam_workflow_engine-2.0.0rc2-py3-none-any.whl.
File metadata
- Download URL: aceteam_workflow_engine-2.0.0rc2-py3-none-any.whl
- Upload date:
- Size: 78.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6abb7eed4e26184419514571b285b21057cbdfe77801749a06762b84c1527bf0
|
|
| MD5 |
33ed78108f0fc63943ea7050668e3879
|
|
| BLAKE2b-256 |
5442bc6820f3d2c7dc2bf6d59fe674a8bcc2ca8290a806fba0ac9e9c7b54fb4d
|
Provenance
The following attestation bundles were made for aceteam_workflow_engine-2.0.0rc2-py3-none-any.whl:
Publisher:
publish.yml on aceteam-ai/workflow-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aceteam_workflow_engine-2.0.0rc2-py3-none-any.whl -
Subject digest:
6abb7eed4e26184419514571b285b21057cbdfe77801749a06762b84c1527bf0 - Sigstore transparency entry: 947006833
- Sigstore integration time:
-
Permalink:
aceteam-ai/workflow-engine@7fb9943d6d314be172ca959b9ba3778dfe3dbcfc -
Branch / Tag:
refs/tags/v2.0.0rc2 - Owner: https://github.com/aceteam-ai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@7fb9943d6d314be172ca959b9ba3778dfe3dbcfc -
Trigger Event:
release
-
Statement type: