Skip to main content

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation

Project description

LLM Flow Engine

๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡็‰ˆๆœฌ | ๐Ÿ‡บ๐Ÿ‡ธ English

A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.

โœจ Key Features

  • ๐Ÿ”ง DSL Workflow Definition - Define complex LLM workflows using YAML format
  • ๐Ÿ“Š DAG Dependency Management - Support directed acyclic graph node dependencies and parallel execution
  • ๐Ÿ”— Placeholder Resolution - Use ${node.output} syntax for inter-node data passing
  • ๐Ÿค– Multi-Model Support - Support calling different LLM models and result aggregation
  • โš™๏ธ Flexible Configuration - Custom model configuration and parameter management
  • โšก Async Execution - Efficient asynchronous task processing and error retry
  • ๐Ÿ“ˆ Result Aggregation - Built-in various result merging and analysis functions
  • ๐Ÿ”ง Extensible Architecture - Support custom functions and model adapters

๐Ÿš€ Quick Start

Prerequisites

  • Python 3.8+
  • aiohttp >= 3.8.0
  • pyyaml >= 6.0
  • loguru >= 0.7.0

Installation

pip install llm-flow-engine

Basic Usage

import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider

async def main():
    # 1. Configure models (auto-discovery)
    provider = await ModelConfigProvider.from_host_async(
        api_host="http://127.0.0.1:11434", 
        platform="ollama"
    )
    
    # 2. Create engine
    engine = FlowEngine(provider)
    
    # 3. Execute workflow
    dsl_content = """
    metadata:
      version: "1.0"
      description: "Simple Q&A workflow"
    
    input:
      type: "start"
      name: "workflow_input"
      data:
        question: ""
    
    executors:
      - name: answer_step
        type: task
        func: llm_simple_call
        custom_vars:
          user_input: "${workflow_input.question}"
          model: "llama2"
    
    output:
      type: "end"
      name: "workflow_output"
      data:
        answer: "${answer_step.output}"
    """
    
    result = await engine.execute_dsl(
        dsl_content, 
        inputs={"workflow_input": {"question": "What is AI?"}}
    )
    
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

๐Ÿ“‹ Project Structure

llm_flow_engine/
โ”œโ”€โ”€ __init__.py           # Main package initialization
โ”œโ”€โ”€ flow_engine.py        # Main engine entry point
โ”œโ”€โ”€ dsl_loader.py         # DSL parser
โ”œโ”€โ”€ workflow.py           # Unified workflow management
โ”œโ”€โ”€ executor.py           # Task executor
โ”œโ”€โ”€ executor_result.py    # Execution result wrapper
โ”œโ”€โ”€ builtin_functions.py  # Built-in function library
โ”œโ”€โ”€ model_config.py       # Model configuration management
โ””โ”€โ”€ utils.py             # Utility functions

examples/
โ”œโ”€โ”€ demo_example.py       # Complete example demo
โ”œโ”€โ”€ demo_qa.yaml          # Workflow DSL example
โ””โ”€โ”€ model_config_demo.py  # Model configuration demo

๐Ÿ”ง Model Configuration

Method 1: Auto-Discovery (Recommended)

# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
    api_host="http://127.0.0.1:11434",
    platform="ollama"
)

Method 2: Manual Configuration

# Create provider and add models manually
provider = ModelConfigProvider()

# Add OpenAI model
provider.add_single_model(
    model_name="gpt-4",
    platform="openai",
    api_url="https://api.openai.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=4096
)

# Add custom model
provider.add_single_model(
    model_name="custom-llm",
    platform="openai_compatible",
    api_url="https://your-api.com/v1/chat/completions",
    api_key="your-api-key",
    max_tokens=2048
)

๐Ÿ“ DSL Workflow Format

Basic Structure

metadata:
  version: "1.0"
  description: "Workflow description"

input:
  type: "start"
  name: "workflow_input"
  data:
    key: "value"

executors:
  - name: task1
    type: task
    func: function_name
    custom_vars:
      param1: "${input.key}"
      param2: "static_value"
    depends_on: []  # Dependencies
    timeout: 30     # Timeout in seconds
    retry: 2        # Retry count

output:
  type: "end"
  name: "workflow_output"
  data:
    result: "${task1.output}"

Multi-Model Workflow Example

metadata:
  version: "1.0"
  description: "Multi-model Q&A with analysis"

input:
  type: "start"
  name: "workflow_input"
  data:
    question: ""

executors:
  # Parallel model calls
  - name: model1_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "llama2"
    timeout: 30

  - name: model2_answer
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "${workflow_input.question}"
      model: "mistral"
    timeout: 30

  # Analysis step (depends on both models)
  - name: analysis
    type: task
    func: llm_simple_call
    custom_vars:
      user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
      model: "llama2"
    depends_on: ["model1_answer", "model2_answer"]

output:
  type: "end"
  name: "workflow_output"
  data:
    original_question: "${workflow_input.question}"
    model1_response: "${model1_answer.output}"
    model2_response: "${model2_answer.output}"
    analysis: "${analysis.output}"

๐Ÿ”Œ Built-in Functions

  • llm_simple_call - Basic LLM model call
  • text_process - Text preprocessing and formatting
  • result_summary - Multi-result summarization
  • data_transform - Data format transformation

๐Ÿงช Running Examples

# Basic usage demo
python examples/demo_example.py

# Model configuration demo  
python examples/model_config_demo.py

# Package usage demo
python examples/package_demo.py

๐Ÿ“Š Supported Platforms

  • Ollama - Local LLM models
  • OpenAI - GPT series models
  • OpenAI Compatible - Any OpenAI-compatible API
  • Anthropic - Claude series models
  • Custom - Custom API endpoints

๐Ÿ› ๏ธ Development

Setup Development Environment

git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black .

Project Validation

# Validate project structure and configuration
python validate_project.py

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ž Support

๐ŸŒŸ Star History

If you find this project helpful, please consider giving it a star! โญ


Made with โค๏ธ by the LLM Flow Engine Team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llm_flow_engine-0.7.1.tar.gz (33.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llm_flow_engine-0.7.1-py3-none-any.whl (25.7 kB view details)

Uploaded Python 3

File details

Details for the file llm_flow_engine-0.7.1.tar.gz.

File metadata

  • Download URL: llm_flow_engine-0.7.1.tar.gz
  • Upload date:
  • Size: 33.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.7

File hashes

Hashes for llm_flow_engine-0.7.1.tar.gz
Algorithm Hash digest
SHA256 ec39ca819bb089e41a7617a93eb095ac9c28234393ed4d95ca5fcb7e2b82734d
MD5 7afee44d47d85f05741fc465f5734c9d
BLAKE2b-256 b67696cd0b9eb4005aa2d58478d7fd0bc8f39131073040f5f98a26e91c6a5751

See more details on using hashes here.

File details

Details for the file llm_flow_engine-0.7.1-py3-none-any.whl.

File metadata

File hashes

Hashes for llm_flow_engine-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6380db72b9e6ef5c2a3b6c89b921f6341f2954842fc178178d23d27ee1d6a947
MD5 72d8c6f2f4499eee55e49b433b7087e2
BLAKE2b-256 6f0b4eb88c8ec06f878b443198628c9926aa14e20ff41fe40281a6c65849659c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page