Skip to main content

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation

Project description

LLM Flow Engine

๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡็‰ˆๆœฌ | ๐Ÿ‡บ๐Ÿ‡ธ English

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.

โœจ Key Features

  • ๐Ÿ”ง DSL Workflow Definition - Define complex LLM workflows using YAML format
  • ๐Ÿ“Š DAG Dependency Management - Support directed acyclic graph node dependencies and parallel execution
  • ๐Ÿ”— Placeholder Resolution - Use ${node.output} syntax for inter-node data passing
  • ๐Ÿค– Multi-Model Support - Support calling different LLM models and result aggregation
  • โš™๏ธ Flexible Configuration - Custom model configuration and parameter management
  • โšก Async Execution - Efficient asynchronous task processing and error retry
  • ๐Ÿ“ˆ Result Aggregation - Built-in various result merging and analysis functions
  • ๐Ÿ”ง Extensible Architecture - Support custom functions and model adapters

๐Ÿš€ Quick Start

Prerequisites

  • Python 3.8+
  • aiohttp >= 3.8.0
  • pyyaml >= 6.0
  • loguru >= 0.7.0

Installation

pip install llm-flow-engine

Basic Usage

import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider

async def main():
    # 1. Configure models (auto-discovery)
    provider = await ModelConfigProvider.from_host_async(
        api_host="http://127.0.0.1:11434", 
        platform="ollama"
    )
    
    # 2. Create engine
    engine = FlowEngine(provider)
    
    # 3. Execute workflow
    dsl_content = """
    metadata:
      version: "1.0"
      description: "Simple Q&A workflow"
    
    input:
      type: "start"
      name: "workflow_input"
      data:
        question: ""
    
    executors:
      - name: answer_step
        type: task
        func: llm_simple_call
        custom_vars:
          user_input: "${workflow_input.question}"
          model: "llama2"
    
    output:
      type: "end"
      name: "workflow_output"
      data:
        answer: "${answer_step.output}"
    """
    
    result = await engine.execute_dsl(
        dsl_content, 
        inputs={"workflow_input": {"question": "What is AI?"}}
    )
    
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

๐Ÿ“‹ Project Structure

llm_flow_engine/
โ”œโ”€โ”€ __init__.py           # Main package initialization
โ”œโ”€โ”€ flow_engine.py        # Main engine entry point
โ”œโ”€โ”€ dsl_loader.py         # DSL parser
โ”œโ”€โ”€ workflow.py           # Unified workflow management
โ”œโ”€โ”€ executor.py           # Task executor
โ”œโ”€โ”€ executor_result.py    # Execution result wrapper
โ”œโ”€โ”€ builtin_functions.py  # Built-in function library
โ”œโ”€โ”€ model_config.py       # Model configuration management
โ””โ”€โ”€ utils.py             # Utility functions

examples/
โ”œโ”€โ”€ demo_example.py       # Complete example demo
โ”œโ”€โ”€ demo_qa.yaml          # Workflow DSL example
โ””โ”€โ”€ model_config_demo.py  # Model configuration demo

๐Ÿ”ง Model Configuration

Method 1: Auto-Discovery (Recommended)

# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
    api_host="http://127.0.0.1:11434",
    platform="ollama"
)

Method 2: Manual Configuration

# Create provider and add models manually
provider = ModelConfigProvider()

# Add OpenAI model
provider.add_single_model(
    model_name="gpt-4",
    platform="openai",
    api_url="https://api.openai.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=4096
)

# Add custom model
provider.add_single_model(
    model_name="custom-llm",
    platform="openai_compatible",
    api_url="https://your-api.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=2048
)

๐Ÿ“ DSL Workflow Format

Basic Structure

metadata:
  version: "1.0"
  description: "Workflow description"

input:
  type: "start"
  name: "workflow_input"
  data:
    key: "value"

executors:
  - name: task1
    type: task
    func: function_name
    custom_vars:
      param1: "${input.key}"
      param2: "static_value"
    depends_on: []  # Dependencies
    timeout: 30     # Timeout in seconds
    retry: 2        # Retry count

output:
  type: "end"
  name: "workflow_output"
  data:
    result: "${task1.output}"

Multi-Model Workflow Example

metadata:
  version: "1.0"
  description: "Multi-model Q&A with analysis"

input:
  type: "start"
  name: "workflow_input"
  data:
    question: ""

executors:
  # Parallel model calls
  - name: model1_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "llama2"
    timeout: 30

  - name: model2_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "mistral"
    timeout: 30

  # Analysis step (depends on both models)
  - name: analysis
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
      model: "llama2"
    depends_on: ["model1_answer", "model2_answer"]

output:
  type: "end"
  name: "workflow_output"
  data:
    original_question: "${workflow_input.question}"
    model1_response: "${model1_answer.output}"
    model2_response: "${model2_answer.output}"
    analysis: "${analysis.output}"

๐Ÿ”Œ Built-in Functions

  • llm_simple_call - Basic LLM model call
  • text_process - Text preprocessing and formatting
  • result_summary - Multi-result summarization
  • data_transform - Data format transformation

๐Ÿงช Running Examples

# Basic usage demo
python examples/demo_example.py

# Model configuration demo  
python examples/model_config_demo.py

# Package usage demo
python examples/package_demo.py

๐Ÿ“Š Supported Platforms

  • Ollama - Local LLM models
  • OpenAI - GPT series models
  • OpenAI Compatible - Any OpenAI-compatible API
  • Anthropic - Claude series models
  • Custom - Custom API endpoints

๐Ÿ› ๏ธ Development

Setup Development Environment

git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black .

Project Validation

# Validate project structure and configuration
python validate_project.py

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ž Support

๐ŸŒŸ Star History

If you find this project helpful, please consider giving it a star! โญ


Made with โค๏ธ by the LLM Flow Engine Team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_flow_engine-0.7.0.tar.gz (33.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_flow_engine-0.7.0-py3-none-any.whl (25.7 kB view details)

Uploaded Python 3

File details

Details for the file llm_flow_engine-0.7.0.tar.gz.

File metadata

  • Download URL: llm_flow_engine-0.7.0.tar.gz
  • Upload date:
  • Size: 33.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for llm_flow_engine-0.7.0.tar.gz
Algorithm Hash digest
SHA256 ad482bac58e97cacdc71bf32b33200b09619373b0d25e7b6a4946f2a066cf448
MD5 929aa6300a24822eee8168e65e9372eb
BLAKE2b-256 b607f6ff110e4c52dd615885b9a41b5452493d625e71680f6c08daf3307b579c

See more details on using hashes here.

File details

Details for the file llm_flow_engine-0.7.0-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_flow_engine-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e8d285b610b9f04b27fce490ee059ab3ea37105df4aca7bc0740eca00279130c
MD5 57c1e348d125e140981d17bc3e773b61
BLAKE2b-256 84d82b2ba02e59edc22ff49eb392783f4fb2c5e51791158d9c8362bc1a93002a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page