Skip to main content

A powerful workflow orchestration engine for composing complex computational tasks from modular, type-safe nodes

Project description

Aceteam Workflow Engine

A powerful, modular workflow orchestration system designed for composing complex computational tasks from smaller, configurable steps. This engine powers the workflow functionality in Aceteam.ai and is now available as an open-source package.

Overview

The Workflow Engine enables you to:

  • Define workflows as directed acyclic graphs (DAGs)
  • Chain node-based tasks with type-safe data passing
  • Persist and retrieve node outputs using various storage backends
  • Execute workflows programmatically or via API

Installation

pip install aceteam-workflow-engine

Example

import asyncio

from workflow_engine import IntegerValue, Workflow
import workflow_engine.nodes
from workflow_engine.contexts import LocalContext
from workflow_engine.execution import TopologicalExecutionAlgorithm

context = LocalContext()
algorithm = TopologicalExecutionAlgorithm()

# Load and run a workflow
with open("examples/addition.json") as f:
    workflow = Workflow.model_validate_json(f.read())

result = asyncio.run(algorithm.execute(
    context=context,
    workflow=workflow,
    input={"c": IntegerValue(-256)},
)) # {'sum': 1811}

Check the examples directory for more sample workflows in JSON form:

Key Features

Core Functionality

  • Graph-Based Execution: Workflows are executed as DAGs with automatic dependency resolution
  • Type-Safe Data Flow: Data passing between nodes is validated using MIME types
  • Flexible Storage: Supports multiple storage backends (Supabase, Local, In-Memory)
  • Error Handling: Robust error propagation and logging system
  • Versioning: Built-in support for workflow versioning

Node Types

  • Input Nodes: Accept workflow inputs with type constraints
  • Processing Nodes: Execute computational tasks with configurable parameters
  • Output Nodes: Format and return workflow results

Storage Backends

  • Supabase: Primary storage backend for production use
  • Local: File-system based storage for development
  • In-Memory: Lightweight storage for testing

Value Type Casting

The workflow engine supports automatic type casting between Value types. The graph below shows all available casting paths:

Value Typecasting Graph

Architecture

src/workflow_engine/
├── contexts/          # Storage backend implementations
│   ├── in_memory.py   # In-memory storage
│   └── local.py       # Local file system storage
├── core/              # Core workflow components
│   ├── context.py     # Execution context
│   ├── data.py        # Data handling
│   ├── edge.py        # Edge definitions
│   ├── execution.py   # Execution logic
│   ├── file.py        # File handling
│   ├── node.py        # Node base classes
│   └── workflow.py    # Workflow definitions
├── execution/         # Execution strategies
│   └── topological.py # DAG-based execution
├── nodes/             # Node implementations
│   ├── arithmetic.py  # Math operations
│   ├── constant.py    # Constant values
│   ├── json.py        # JSON operations
│   └── text.py        # Text operations
└── utils/             # Helper utilities

Development

Setup

# Using uv (recommended)
uv sync

# Using pip
pip install -e .

Testing

uv run pytest  # Runs both unit and integration tests

Documentation

Available test suites:

  • test_type_checking.py: Type system validation
  • test_workflow_validation.py: Workflow validation tests

Future Enhancements

  • Support for iterative workflows and sub-workflows
  • Enhanced parallel execution capabilities
  • Additional storage backend implementations
  • Improved error recovery and retry mechanisms
  • Real-time workflow monitoring

Contributing

We welcome contributions! Please see our Contributing Guide for details.

License

MIT License

About

This workflow engine is developed and maintained by Adanomad Consulting and powers the workflow functionality in Aceteam.ai. For commercial support or consulting, please contact us at contact@adanomad.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aceteam_workflow_engine-1.0.0rc2.tar.gz (107.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aceteam_workflow_engine-1.0.0rc2-py3-none-any.whl (64.6 kB view details)

Uploaded Python 3

File details

Details for the file aceteam_workflow_engine-1.0.0rc2.tar.gz.

File metadata

File hashes

Hashes for aceteam_workflow_engine-1.0.0rc2.tar.gz
Algorithm Hash digest
SHA256 0e5589139d977ffc557b6e37daa3e85a3342f421e9cd059aa9324c909cb3184d
MD5 d2b9262374ec162dbe84d78974b439fa
BLAKE2b-256 5b1d9421c59c3ef980967da2d5db9de9301fef3f8945e5a8af61420ff560f847

See more details on using hashes here.

Provenance

The following attestation bundles were made for aceteam_workflow_engine-1.0.0rc2.tar.gz:

Publisher: publish.yml on aceteam-ai/workflow-engine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aceteam_workflow_engine-1.0.0rc2-py3-none-any.whl.

File metadata

File hashes

Hashes for aceteam_workflow_engine-1.0.0rc2-py3-none-any.whl
Algorithm Hash digest
SHA256 36c1a321b12d4727912da5b556ee7705c4b3c7c23d497cc7e787a4b36b21c1e4
MD5 20e0ffb2d86409ee059ce872d39bc2ac
BLAKE2b-256 e22bf2881fd7f600b6bb75a024bb26579cd59453e76b7cf3e1b5519bbd9c647b

See more details on using hashes here.

Provenance

The following attestation bundles were made for aceteam_workflow_engine-1.0.0rc2-py3-none-any.whl:

Publisher: publish.yml on aceteam-ai/workflow-engine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page