Skip to main content

A powerful and flexible workflow engine that enables you to define, execute, and manage complex workflows with ease

Project description

Highway Core

Highway Core is a powerful and flexible workflow engine that enables you to define, execute, and manage complex workflows with ease. Built with resilience and scalability in mind, it supports persistence, conditional flows, loops, and parallel execution.

Table of Contents

Features

  • Declarative Workflows: Define workflows in YAML format
  • Task Execution: Execute functions with dependency management
  • Conditional Logic: Support for if/else branches
  • Parallel Execution: Run multiple tasks concurrently with bulkhead isolation
  • Looping Constructs: foreach and while loop support
  • State Management: Variables, results, and memory management
  • Persistence: Save and restore workflow state for resumability
  • Bulkhead Pattern: Isolate different workflows and operations
  • Pydantic Integration: Strong typing and validation

Installation

Install Highway Core using pip:

pip install highway-core

Or install the development version:

pip install git+https://github.com/rodmena-limited/highway_core.git

Quick Start

Define a Workflow

Create a YAML file (simple_workflow.yaml):

name: simple_example
version: 1.0.0
description: A simple example workflow

start_task: log_start

variables:
  message: "Hello from Highway Core!"

tasks:
  log_start:
    task_id: log_start
    operator_type: task
    function: tools.log.info
    args: ["{{variables.message}}"]
    dependencies: []
    result_key: "start_result"

  process_data:
    task_id: process_data
    operator_type: task
    function: tools.memory.set
    args: ["processed_value", "Data processed successfully"]
    dependencies: ["log_start"]
    result_key: "process_result"

  log_end:
    task_id: log_end
    operator_type: task
    function: tools.log.info
    args: ["Workflow completed with result: {{results.process_result}}"]
    dependencies: ["process_data"]

Execute the Workflow

from highway_core.engine.engine import run_workflow_from_yaml

# Execute the workflow
run_workflow_from_yaml("simple_workflow.yaml")

Workflow Definition

A Highway Core workflow is defined in YAML format with the following structure:

name: workflow_name
version: 1.x.x
description: Optional description of the workflow
variables: # Initial variables for the workflow
  key: value
start_task: task_id_to_start_with
tasks: # Dictionary of tasks
  task_id:
    task_id: task_id
    operator_type: task | condition | parallel | wait | while | foreach
    function: tools.module.function
    args: [list, of, arguments]
    dependencies: [list, of, task, ids]
    result_key: optional_key_to_store_result

Operators

Task Operator

Executes a function with provided arguments.

my_task:
  task_id: my_task
  operator_type: task
  function: tools.log.info
  args: ["Hello World"]
  dependencies: []

Condition Operator

Executes different branches based on a condition.

conditional_task:
  task_id: conditional_task
  operator_type: condition
  condition: "{{variables.some_value}} == true"
  if_true: task_if_true
  if_false: task_if_false
  dependencies: []

Parallel Operator

Executes multiple tasks in parallel.

parallel_task:
  task_id: parallel_task
  operator_type: parallel
  tasks: ["task1", "task2", "task3"]
  dependencies: []

While Operator

Repeats execution while a condition is true.

while_task:
  task_id: while_task
  operator_type: while
  condition: "{{variables.counter}} < 10"
  body: task_to_repeat
  dependencies: []

ForEach Operator

Iterates over a collection and executes a task for each item.

foreach_task:
  task_id: foreach_task
  operator_type: foreach
  items: "{{variables.list_of_items}}"
  body: task_to_repeat
  dependencies: []

Persistence and Resumability

Highway Core supports persistence to save workflow state after each task execution and resume from where it left off if interrupted.

from highway_core.engine.engine import run_workflow_from_yaml

# Run workflow with a specific ID to enable persistence
run_workflow_from_yaml("workflow.yaml", workflow_run_id="my-run-123")

# Later, resume the same workflow with the same ID
run_workflow_from_yaml("workflow.yaml", workflow_run_id="my-run-123")

The workflow will automatically detect completed tasks and skip re-execution.

Testing

Run the test suite:

pytest

Run with coverage:

pytest --cov=highway_core

Development

Setup

git clone https://github.com/rodmena-limited/highway_core.git
cd highway_core
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
pip install -e .
pip install -e ".[dev]"

Running Type Checks

mypy .

Project Structure

highway_core/
├── engine/           # Core execution engine
│   ├── state.py      # Workflow state management
│   ├── orchestrator.py # Task orchestration
│   ├── engine.py     # Main engine entry point
│   └── models.py     # Data models
├── tools/            # Available tools and functions
│   ├── registry.py   # Tool registry
│   ├── memory.py     # Memory operations
│   ├── log.py        # Logging operations
│   └── ...           # Other tools
└── persistence/      # Persistence implementations
    ├── manager.py    # Persistence interface
    └── db_storage.py # File-based persistence

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (pytest)
  6. Run type checks (mypy .)
  7. Commit your changes (git commit -m 'Add amazing feature')
  8. Push to the branch (git push origin feature/amazing-feature)
  9. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

If you encounter any issues, please file them in our Issues section.

Acknowledgments

  • Built with Pydantic for robust data validation
  • Uses graphlib for topological sorting of task dependencies
  • Follows bulkhead pattern for isolation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

highway_core-0.1.6.tar.gz (72.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

highway_core-0.1.6-py3-none-any.whl (61.2 kB view details)

Uploaded Python 3

File details

Details for the file highway_core-0.1.6.tar.gz.

File metadata

  • Download URL: highway_core-0.1.6.tar.gz
  • Upload date:
  • Size: 72.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_core-0.1.6.tar.gz
Algorithm Hash digest
SHA256 6fb466633e161e7c66be1a765758db7a0efb7b5ef3b97ac9daf34d20d3d57df7
MD5 fc62079dab582721f930c4e3a2ec052c
BLAKE2b-256 b42d31aab14206db34774e0f808931fa0563f30ea5c5693256e8e142960ec347

See more details on using hashes here.

File details

Details for the file highway_core-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: highway_core-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 61.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for highway_core-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 48c64249520f91a48e4bfa494c40a67e3ef4d083b530002bcb2cd72128dea8ec
MD5 7be1741f315e3cb445ee63fa5a437dc1
BLAKE2b-256 315b8f3718da1f4072c1559238495859e59e556f0c78ff286000ba851cf03476

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page