Skip to main content

一个简洁的 Python 智能体工作流引擎,支持同步与异步节点、分支、循环、流程图渲染。

Project description

agnflow

A concise Python workflow agentic engine

License Docs PyPI Python Version

中文 | English

agnflow pursues simplicity, ease of use, and extensibility, suitable for rapid prototyping, customized LLM workflows, and Agent task flows.

🎯 Core Features Showcase

Agent Type Code Example Flowchart
Complex Node Connection n1 >> [n2 >> n3, n3 >> n4] >> n5 Node Connection Flowchart
Complex Workflow Connection f1[n1 >> n2 >> f2[n3]] >> f3[n4] Workflow Connection Flowchart
Supervisor Agent
First node bidirectionally connected to other nodes
s1[n1, n2, n3] >> n4 Supervisor Agent Flowchart
Basic Swarm Connection
Arbitrary nodes bidirectionally connected
s1[n1, n2, n3, n4] Basic Swarm Connection Flowchart
Node-Swarm Connection n1 >> s1[n2, n3] >> n4 Node-Swarm Connection Flowchart
Multiple Swarm Connection s1[n1, n2] >> s2[n3, n4] Multiple Swarm Connection Flowchart

1. TODO (Future Extension Directions)

  • llm (supporting stream, multimodal, async, structured output)
  • memory
  • rag
  • mcp tool
  • ReAct (reasoning + action)
  • TAO (thought + action + observation)
  • ToT (Tree of Thought)
  • CoT (Chain of Thought)
  • hitl (human in the loop)
  • 👏🏻 supervisor swarm

The above are future extensible intelligent agent/reasoning/tool integration directions. Contributions and suggestions are welcome.

2. Features

  • Node-based workflows with support for branching, loops, and sub-flows
  • Support for synchronous and asynchronous execution
  • Support for flowchart rendering (dot/mermaid)
  • Clean code, easy to extend

3. Installation

3.1 Install from PyPI (Recommended)

# Install using pip
pip install agnflow

# Install using rye
rye add agnflow

# Install using poetry
poetry add agnflow

# Install specific version
pip install agnflow==0.1.4

# Install latest development version
pip install --upgrade agnflow

3.2 Install from Source

Recommended to use rye for dependency and virtual environment management:

# Clone repository
git clone https://github.com/jianduo1/agnflow.git
cd agnflow

# Install dependencies
rye sync

# Development mode installation
rye sync --dev

3.3 Flowchart Rendering Tools (Optional)

Note: Generating images requires additional tools

Dot format image generation (Recommended):

# macOS
brew install graphviz

# Ubuntu/Debian
sudo apt-get install graphviz

# CentOS/RHEL
sudo yum install graphviz

# Windows
# Download and install: https://graphviz.org/download/

Mermaid format image generation:

# Install mermaid-cli
npm install -g @mermaid-js/mermaid-cli

# Install puppeteer browser (for rendering)
npx puppeteer browsers install chrome-headless-shell

3.4 Development Environment

Use rye to manage development environment:

# Install dependencies
rye sync

# Run tests
rye run test

# Code formatting
rye run format

# Code linting
rye run lint

# Run examples
rye run example

3.5 Publish to PyPI

# Clean previous builds
rye run clean

# Build package
rye run build

# Upload to test PyPI (recommended to test first)
rye run upload-test

# Upload to official PyPI
rye run upload

Note: First upload to PyPI requires:

  1. Register account on PyPI
  2. Register account on TestPyPI
  3. Configure ~/.pypirc file or use environment variables

4. Quick Start

4.1 Define Nodes

from agnflow import Node, Flow

def hello_exec(state):
    print("hello", state)
    return {"msg": "world"}

def world_exec(state):
    print("world", state)

n1 = Node("hello", exec=hello_exec)
n2 = Node("world", exec=world_exec)
n1 >> n2

4.2 Build and Run Workflow

flow = Flow(n1, name="demo")
flow.run({"msg": "hi"})

4.3 Asynchronous Execution

import asyncio
async def ahello(state):
    print("async hello", state)
    return {"msg": "async world"}
n1 = Node("hello", aexec=ahello)
flow = Flow(n1)
asyncio.run(flow.arun({"msg": "hi"}))

4.4 Render Flowchart

print(flow.render_dot())      # Output dot format
print(flow.render_mermaid())  # Output mermaid format

# Save as image file
flow.render_dot(saved_file="./flow.png")      # Save dot format image
flow.render_mermaid(saved_file="./flow.png")  # Save mermaid format image

5. Node Function Details

5.1 Function Input Parameter Methods

agnflow supports multiple function input parameter methods and automatically retrieves parameters from the state based on function signatures:

Method 1: Receive the entire state

def my_node(state):
    """Receive the entire state dictionary"""
    print(f"Received state: {state}")
    return {"result": "processed"}

n1 = Node("my_node", exec=my_node)

Method 2: Automatic injection by parameter name

def my_node(user_id, message, data):
    """Automatically get values from state by parameter names"""
    print(f"User ID: {user_id}")
    print(f"Message: {message}")
    print(f"Data: {data}")
    return {"processed": True}

# Pass state containing these fields when calling
flow.run({
    "user_id": "123",
    "message": "hello",
    "data": {"key": "value"}
})

Method 3: Mixed approach

def my_node(user_id, state):
    """Mixed approach: partial parameters + entire state"""
    print(f"User ID: {user_id}")
    print(f"Complete state: {state}")
    return {"user_processed": True}

5.2 Function Return Value Methods

Node functions support multiple return value formats:

Method 1: Return only new state

def my_node(state):
    """Only update state, use default action"""
    return {"new_data": "value", "timestamp": time.time()}

Method 2: Return action and new state

def my_node(state):
    """Return action and updated state"""
    if state.get("condition"):
        return "success", {"result": "success"}
    else:
        return "error", {"result": "error"}

Method 3: Return only action

def my_node(state):
    """Return only action, don't update state"""
    if state.get("condition"):
        return "success"
    else:
        return "error"

Method 4: Return None (end workflow)

def my_node(state):
    """Return None to end workflow"""
    if state.get("should_stop"):
        return None
    return "continue", {"step": "completed"}

5.3 Asynchronous Node Functions

Asynchronous node functions use the aexec parameter and support all synchronous function features:

import asyncio

async def async_node(state):
    """Asynchronous node function"""
    await asyncio.sleep(0.1)  # Simulate async operation
    return {"async_result": "done"}

async def async_node_with_action(user_id, state):
    """Asynchronous node function - mixed parameters + action"""
    await asyncio.sleep(0.1)
    return "next", {"user_id": user_id, "processed": True}

# Create asynchronous nodes
n1 = Node("async_node", aexec=async_node)
n2 = Node("async_node_with_action", aexec=async_node_with_action)

# Asynchronous execution
asyncio.run(flow.arun({"user_id": "123"}))

5.4 Node Class Inheritance Method

Besides function methods, you can also create nodes by inheriting from the Node class:

class MyNode(Node):
    def exec(self, state):
        """Synchronous execution method"""
        print(f"Executing node: {self.name}")
        return {"class_result": "success"}
    
    async def aexec(self, state):
        """Asynchronous execution method"""
        print(f"Asynchronously executing node: {self.name}")
        return {"async_class_result": "success"}

# Use class node
n1 = MyNode("my_class_node")

5.5 Error Handling and Retry

Nodes support error handling and retry mechanisms:

def risky_node(state):
    """Node that might fail"""
    if random.random() < 0.5:
        raise Exception("Random error")
    return {"success": True}

# Create node with retry support
n1 = Node("risky_node", exec=risky_node, max_retries=3, wait=1)

# Custom error handling
class SafeNode(Node):
    def exec_fallback(self, state, exc):
        """Custom error handling"""
        return "error", {"error": str(exc), "recovered": True}
    
    async def aexec_fallback(self, state, exc):
        """Custom asynchronous error handling"""
        return "error", {"error": str(exc), "recovered": True}

5.6 Complete Example

from agnflow import Node, Flow
import time

# Define different types of node functions
def start_node(user_id, message):
    """Receive specific parameters"""
    return "n2", {"user_id": user_id, "message": message}

def process_node(state):
    """Receive entire state"""
    processed = f"Processed: {state['message']}"
    return "n3", {"processed": processed, "timestamp": time.time()}

def complete_node(result, state):
    """Mixed parameters"""
    print(f"Result: {result}")
    print(f"State: {state}")
    return {"final_result": "success"}

# Create nodes
n1 = Node("start", exec=start_node)
n2 = Node("process", exec=process_node)
n3 = Node("complete", exec=complete_node)

# Connect nodes
n1 >> n2 >> n3

# Create workflow
flow = Flow(n1, name="example_flow")

# Run workflow
result = flow.run({
    "user_id": "123",
    "message": "Hello agnflow!"
})

print(f"Workflow result: {result}")

6. Node Connection Syntax

agnflow provides multiple flexible node connection methods:

6.1 Linear Connection

# Method 1: Forward connection
a >> b >> c

# Method 2: Reverse connection  
c << b << a

6.2 Branch Connection

# Branch based on node return value
a >> [b, c]

6.3 Sub-flow Connection

# Connect sub-flows
d1 >> flow >> d2

7. Complex Workflow Example

After running the example code src/agnflow/example.py, the following flowcharts will be generated:

Workflow definition:

a >> [b >> flow, c >> a]
d1 >> flow >> d2

7.1 Dot Format Flowchart

Dot Flow

7.2 Mermaid Format Flowchart

Mermaid Flow

These flowcharts illustrate:

  • Connections between nodes
  • Branching and looping structures
  • Nesting of subprocesses
  • Overall execution paths of workflows

8. Reference Frameworks

agnflow references and benchmarks against the following mainstream intelligent agent/workflow frameworks:

LangGraph LlamaIndex AutoGen Haystack CrewAI FastGPT PocketFlow

9. Project Status

📦 Release Status

🔄 Version Information

  • Current Version: 0.1.4
  • Python Support: 3.8+
  • License: MIT
  • Status: Beta

10. License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agnflow-0.1.4.tar.gz (901.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agnflow-0.1.4-py3-none-any.whl (42.4 kB view details)

Uploaded Python 3

File details

Details for the file agnflow-0.1.4.tar.gz.

File metadata

  • Download URL: agnflow-0.1.4.tar.gz
  • Upload date:
  • Size: 901.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for agnflow-0.1.4.tar.gz
Algorithm Hash digest
SHA256 b23efcf30557b0e7240737c327b526da28bb386a483d4d834d730a4fcb016d84
MD5 180b65b444cc0979425e01941ec7d792
BLAKE2b-256 c8f6c374c43753c0bc407391b423629ee3757cd0875c408562dbf800dd2c86dc

See more details on using hashes here.

Provenance

The following attestation bundles were made for agnflow-0.1.4.tar.gz:

Publisher: python-publish.yml on jianduo1/agnflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file agnflow-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: agnflow-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 42.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for agnflow-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 48d7d523c103e1cbffb67eff8016fc2ec377842109532824be981e0833c999ba
MD5 155d512f7a1db3632fd80633ca372858
BLAKE2b-256 e4e58066b022d559a84b97ba47e79029720cb415a3366b2775cf9ce747325c2e

See more details on using hashes here.

Provenance

The following attestation bundles were made for agnflow-0.1.4-py3-none-any.whl:

Publisher: python-publish.yml on jianduo1/agnflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page