Skip to main content

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation

Project description

LLM Flow Engine

๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡็‰ˆๆœฌ | ๐Ÿ‡บ๐Ÿ‡ธ English

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.

โœจ Key Features

  • ๐Ÿ”ง DSL Workflow Definition - Define complex LLM workflows using YAML format
  • ๐Ÿ“Š DAG Dependency Management - Support directed acyclic graph node dependencies and parallel execution
  • ๐Ÿ”— Placeholder Resolution - Use ${node.output} syntax for inter-node data passing
  • ๐Ÿค– Multi-Model Support - Support calling different LLM models and result aggregation
  • โš™๏ธ Flexible Configuration - Custom model configuration and parameter management
  • โšก Async Execution - Efficient asynchronous task processing and error retry
  • ๐Ÿ“ˆ Result Aggregation - Built-in various result merging and analysis functions
  • ๐Ÿ”ง Extensible Architecture - Support custom functions and model adapters

๐Ÿš€ Quick Start

Prerequisites

  • Python 3.8+
  • aiohttp >= 3.8.0
  • pyyaml >= 6.0
  • loguru >= 0.7.0

Installation

pip install llm-flow-engine

Basic Usage

import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider

async def main():
    # 1. Configure models (auto-discovery)
    provider = await ModelConfigProvider.from_host_async(
        api_host="http://127.0.0.1:11434", 
        platform="ollama"
    )
    
    # 2. Create engine
    engine = FlowEngine(provider)
    
    # 3. Execute workflow
    dsl_content = """
    metadata:
      version: "1.0"
      description: "Simple Q&A workflow"
    
    input:
      type: "start"
      name: "workflow_input"
      data:
        question: ""
    
    executors:
      - name: answer_step
        type: task
        func: llm_simple_call
        custom_vars:
          user_input: "${workflow_input.question}"
          model: "llama2"
    
    output:
      type: "end"
      name: "workflow_output"
      data:
        answer: "${answer_step.output}"
    """
    
    result = await engine.execute_dsl(
        dsl_content, 
        inputs={"workflow_input": {"question": "What is AI?"}}
    )
    
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

๐Ÿ“‹ Project Structure

llm_flow_engine/
โ”œโ”€โ”€ __init__.py           # Main package initialization
โ”œโ”€โ”€ flow_engine.py        # Main engine entry point
โ”œโ”€โ”€ dsl_loader.py         # DSL parser
โ”œโ”€โ”€ workflow.py           # Unified workflow management
โ”œโ”€โ”€ executor.py           # Task executor
โ”œโ”€โ”€ executor_result.py    # Execution result wrapper
โ”œโ”€โ”€ builtin_functions.py  # Built-in function library
โ”œโ”€โ”€ model_config.py       # Model configuration management
โ””โ”€โ”€ utils.py             # Utility functions

examples/
โ”œโ”€โ”€ demo_example.py       # Complete example demo
โ”œโ”€โ”€ demo_qa.yaml          # Workflow DSL example
โ””โ”€โ”€ model_config_demo.py  # Model configuration demo

๐Ÿ”ง Model Configuration

Method 1: Auto-Discovery (Recommended)

# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
    api_host="http://127.0.0.1:11434",
    platform="ollama"
)

Method 2: Manual Configuration

# Create provider and add models manually
provider = ModelConfigProvider()

# Add OpenAI model
provider.add_single_model(
    model_name="gpt-4",
    platform="openai",
    api_url="https://api.openai.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=4096
)

# Add custom model
model_provider = ModelConfigProvider()
platform = "openai"
demo_host = "https://ai-proxy.4ba-cn.co/openrouter/v1/chat/completions"
demo_free_key = "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"

model_list = ["openai/gpt-oss-20b:free","moonshotai/kimi-k2:free", "google/gemma-3-12b-it:free","z-ai/glm-4.5-air:free"]
for model in model_list:
    model_provider.add_single_model(model_name=model, platform=platform, 
        api_url=demo_host, api_key=demo_free_key)

๐Ÿ“ DSL Workflow Format

Basic Structure

metadata:
  version: "1.0"
  description: "Workflow description"

input:
  type: "start"
  name: "workflow_input"
  data:
    key: "value"

executors:
  - name: task1
    type: task
    func: function_name
    custom_vars:
      param1: "${input.key}"
      param2: "static_value"
    depends_on: []  # Dependencies
    timeout: 30     # Timeout in seconds
    retry: 2        # Retry count

output:
  type: "end"
  name: "workflow_output"
  data:
    result: "${task1.output}"

Multi-Model Workflow Example

metadata:
  version: "1.0"
  description: "Multi-model Q&A with analysis"

input:
  type: "start"
  name: "workflow_input"
  data:
    question: ""

executors:
  # Parallel model calls
  - name: model1_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "llama2"
    timeout: 30

  - name: model2_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "mistral"
    timeout: 30

  # Analysis step (depends on both models)
  - name: analysis
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
      model: "llama2"
    depends_on: ["model1_answer", "model2_answer"]

output:
  type: "end"
  name: "workflow_output"
  data:
    original_question: "${workflow_input.question}"
    model1_response: "${model1_answer.output}"
    model2_response: "${model2_answer.output}"
    analysis: "${analysis.output}"

๐Ÿ”Œ Built-in Functions

  • llm_simple_call - Basic LLM model call
  • text_process - Text preprocessing and formatting
  • result_summary - Multi-result summarization
  • data_transform - Data format transformation

๐Ÿงช Running Examples

# Basic usage demo
python examples/demo_example.py

# Model configuration demo  
python examples/model_config_demo.py

# Package usage demo
python examples/package_demo.py

๐Ÿ“Š Supported Platforms

  • Ollama - Local LLM models
  • OpenAI - GPT series models
  • OpenAI Compatible - Any OpenAI-compatible API
  • Anthropic - Claude series models
  • Custom - Custom API endpoints

๐Ÿ› ๏ธ Development

Setup Development Environment

git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black .

Project Validation

# Validate project structure and configuration
python validate_project.py

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ž Support

๐ŸŒŸ Star History

If you find this project helpful, please consider giving it a star! โญ


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_flow_engine-0.7.4.tar.gz (59.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_flow_engine-0.7.4-py3-none-any.whl (42.9 kB view details)

Uploaded Python 3

File details

Details for the file llm_flow_engine-0.7.4.tar.gz.

File metadata

  • Download URL: llm_flow_engine-0.7.4.tar.gz
  • Upload date:
  • Size: 59.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for llm_flow_engine-0.7.4.tar.gz
Algorithm Hash digest
SHA256 79e029c20321a9f3f4ffa4799b7d7bf811693e53f557aaeb44e2bcbd9b4045e0
MD5 f2f8bf05d05aacdcf047a606c22181de
BLAKE2b-256 ec757dddfa74fdb44aac03ec21a1fdcb8d4bb43f2b3931c09f225edbce352ed4

See more details on using hashes here.

File details

Details for the file llm_flow_engine-0.7.4-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_flow_engine-0.7.4-py3-none-any.whl
Algorithm Hash digest
SHA256 3ccfbc86363ea1f166ddc22d4500ffe7fb5c3626c20b8ce442c41a5e7b59ae09
MD5 62c2c4b3faff5de952e1653dd4e09693
BLAKE2b-256 d69aaf69991b8a5ac4f801ccd1a7fa5955c4dec74648ce4c07eb66074570695c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page