Skip to main content

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation

Project description

LLM Flow Engine

๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡็‰ˆๆœฌ | ๐Ÿ‡บ๐Ÿ‡ธ English

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.

โœจ Key Features

  • ๐Ÿ”ง DSL Workflow Definition - Define complex LLM workflows using YAML format
  • ๐Ÿ“Š DAG Dependency Management - Support directed acyclic graph node dependencies and parallel execution
  • ๐Ÿ”— Placeholder Resolution - Use ${node.output} syntax for inter-node data passing
  • ๐Ÿค– Multi-Model Support - Support calling different LLM models and result aggregation
  • โš™๏ธ Flexible Configuration - Custom model configuration and parameter management
  • โšก Async Execution - Efficient asynchronous task processing and error retry
  • ๐Ÿ“ˆ Result Aggregation - Built-in various result merging and analysis functions
  • ๐Ÿ”ง Extensible Architecture - Support custom functions and model adapters

๐Ÿš€ Quick Start

Prerequisites

  • Python 3.8+
  • aiohttp >= 3.8.0
  • pyyaml >= 6.0
  • loguru >= 0.7.0

Installation

pip install llm-flow-engine

Basic Usage

import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider

async def main():
    # 1. Configure models (auto-discovery)
    provider = await ModelConfigProvider.from_host_async(
        api_host="http://127.0.0.1:11434", 
        platform="ollama"
    )
    
    # 2. Create engine
    engine = FlowEngine(provider)
    
    # 3. Execute workflow
    dsl_content = """
    metadata:
      version: "1.0"
      description: "Simple Q&A workflow"
    
    input:
      type: "start"
      name: "workflow_input"
      data:
        question: ""
    
    executors:
      - name: answer_step
        type: task
        func: llm_simple_call
        custom_vars:
          user_input: "${workflow_input.question}"
          model: "llama2"
    
    output:
      type: "end"
      name: "workflow_output"
      data:
        answer: "${answer_step.output}"
    """
    
    result = await engine.execute_dsl(
        dsl_content, 
        inputs={"workflow_input": {"question": "What is AI?"}}
    )
    
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

๐Ÿ“‹ Project Structure

llm_flow_engine/
โ”œโ”€โ”€ __init__.py           # Main package initialization
โ”œโ”€โ”€ flow_engine.py        # Main engine entry point
โ”œโ”€โ”€ dsl_loader.py         # DSL parser
โ”œโ”€โ”€ workflow.py           # Unified workflow management
โ”œโ”€โ”€ executor.py           # Task executor
โ”œโ”€โ”€ executor_result.py    # Execution result wrapper
โ”œโ”€โ”€ builtin_functions.py  # Built-in function library
โ”œโ”€โ”€ model_config.py       # Model configuration management
โ””โ”€โ”€ utils.py             # Utility functions

examples/
โ”œโ”€โ”€ demo_example.py       # Complete example demo
โ”œโ”€โ”€ demo_qa.yaml          # Workflow DSL example
โ””โ”€โ”€ model_config_demo.py  # Model configuration demo

๐Ÿ”ง Model Configuration

Method 1: Auto-Discovery (Recommended)

# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
    api_host="http://127.0.0.1:11434",
    platform="ollama"
)

Method 2: Manual Configuration

# Create provider and add models manually
provider = ModelConfigProvider()

# Add OpenAI model
provider.add_single_model(
    model_name="gpt-4",
    platform="openai",
    api_url="https://api.openai.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=4096
)

# Add custom model
provider.add_single_model(
    model_name="custom-llm",
    platform="openai_compatible",
    api_url="https://your-api.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=2048
)

๐Ÿ“ DSL Workflow Format

Basic Structure

metadata:
  version: "1.0"
  description: "Workflow description"

input:
  type: "start"
  name: "workflow_input"
  data:
    key: "value"

executors:
  - name: task1
    type: task
    func: function_name
    custom_vars:
      param1: "${input.key}"
      param2: "static_value"
    depends_on: []  # Dependencies
    timeout: 30     # Timeout in seconds
    retry: 2        # Retry count

output:
  type: "end"
  name: "workflow_output"
  data:
    result: "${task1.output}"

Multi-Model Workflow Example

metadata:
  version: "1.0"
  description: "Multi-model Q&A with analysis"

input:
  type: "start"
  name: "workflow_input"
  data:
    question: ""

executors:
  # Parallel model calls
  - name: model1_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "llama2"
    timeout: 30

  - name: model2_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "mistral"
    timeout: 30

  # Analysis step (depends on both models)
  - name: analysis
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
      model: "llama2"
    depends_on: ["model1_answer", "model2_answer"]

output:
  type: "end"
  name: "workflow_output"
  data:
    original_question: "${workflow_input.question}"
    model1_response: "${model1_answer.output}"
    model2_response: "${model2_answer.output}"
    analysis: "${analysis.output}"

๐Ÿ”Œ Built-in Functions

  • llm_simple_call - Basic LLM model call
  • text_process - Text preprocessing and formatting
  • result_summary - Multi-result summarization
  • data_transform - Data format transformation

๐Ÿงช Running Examples

# Basic usage demo
python examples/demo_example.py

# Model configuration demo  
python examples/model_config_demo.py

# Package usage demo
python examples/package_demo.py

๐Ÿ“Š Supported Platforms

  • Ollama - Local LLM models
  • OpenAI - GPT series models
  • OpenAI Compatible - Any OpenAI-compatible API
  • Anthropic - Claude series models
  • Custom - Custom API endpoints

๐Ÿ› ๏ธ Development

Setup Development Environment

git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black .

Project Validation

# Validate project structure and configuration
python validate_project.py

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ž Support

๐ŸŒŸ Star History

If you find this project helpful, please consider giving it a star! โญ


Made with โค๏ธ by the LLM Flow Engine Team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_flow_engine-0.7.2.tar.gz (33.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_flow_engine-0.7.2-py3-none-any.whl (25.7 kB view details)

Uploaded Python 3

File details

Details for the file llm_flow_engine-0.7.2.tar.gz.

File metadata

  • Download URL: llm_flow_engine-0.7.2.tar.gz
  • Upload date:
  • Size: 33.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for llm_flow_engine-0.7.2.tar.gz
Algorithm Hash digest
SHA256 bcf3cdb7f48973aad2887a221139e5a574aabae774ca558e74d20119dec9793d
MD5 c41ba3d75377edebe6b497419c394eea
BLAKE2b-256 ffe0a33f92f5e7aa407b37634f892f67370b251e1b5b39c07d1d72912142707c

See more details on using hashes here.

File details

Details for the file llm_flow_engine-0.7.2-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_flow_engine-0.7.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4a37be938575b0a60c8984d468737696a7bae6491e8c786f6ba9bb290b7c4a69
MD5 8a0ec8873498a2307379d07cd265cceb
BLAKE2b-256 d8a011155a9443684510a80f9409b3be94e30cf711eeef285eaca125ec613c3f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page