Skip to main content

LLM agent workflow orchestration with DAG

Project description

LangDAG

Introduction

Work in process. LangDAG is still experimental.

LangDAG is a specialized orchestration framework for building LLM (Large Language Model) agent workflows using DAGs(Directed Acyclic Graphs), written in Python.

DAGs, widely used in data orchestration tools like Airflow, can be effectively applied to build LLM agent workflows. Despite their simplicity, DAGs are both expressive and powerful.

LangDAG demonstrates how DAGs can be utilized to build LLM agents. It provides several advantages:

  • Package Agnostic: Plain functions first and not tied to any other specific LLM tool framework or package.
  • Friendly Syntax: Features an intuitive DAG syntax that minimizes unnecessary complexity.
  • Stateful: Allows a shared DAG state among steps or nodes in a DAG.
  • Concurrent Execution: Identifies steps that can run concurrently and enables concurrent execution without requiring manual concurrent logic.
  • Conditional Routing: Allows using conditional edges to route queries or outputs to subsequent steps.
  • Hooks: Provides customizable hook functions for executing additional tasks (ie., logging, state reporting) before and after node execution.
  • Observability: Offers a tree-structure diagram (Execution Visualization) for inspecting the DAG execution process directly in your console, making state, output, condition tracking easier.
  • Uniform Achitechture: Ensures a consistent unit structure for all steps (nodes).
  • Reusabe Unit: Enables nodes to be reused across different workflows.

Motivation

Creating workflows for constructing LLM agents can be manageable when there are only a few steps. However, as the complexity of the workflow increases, several challenges arise:

  • Navigation and Modification: Finding or modifying specific elements requires navigating through numerous if and for statements.
  • Logging: Manually logging the intermediate states of variables and tracking which conditions are met becomes cumbersome.
  • Execution Path: Determining the execution path and understanding how an input query triggered the relevant processes can be difficult.
  • Reusability: Reusing parts of the workflow often involves copying, pasting, and modifying code snippets from old repositories.
  • Hook Functions and Callbacks: Adding hook functions or triggering callback functions in any step necessitates finding and modifying specific code sections.
  • Concurrent Execution: Implementing multithreading manually for concurrent execution of steps is time-consuming.
  • Flexibility: Being stuck in inflexible frameworks limits your ability to adapt and extend your workflows.

LangDAG aims to addresses these challenges, providing a solution for constructing and managing complex LLM agent workflows.

๐Ÿ“‘ Contents

  1. ๐Ÿ’ฌ Concepts

  2. ๐Ÿ”ง Setup

  3. ๐Ÿ’ป Usage

  4. ๐Ÿ“• API Reference

๐Ÿ’ฌ Concepts

Before you start, itโ€™s helpful to learn some concepts.

Node

A node, also referred to as a vertex, is a fundamental unit in a DAG. In LangDAG, a node is instantiated with a unique node_id (required) and an optional prompt.

The prompt serves as a fixed input in the context of LLMs, acting as predefined input for the node.

A node can have a predefined prompt and also receive outputs from upstream nodes (a dict with upstream node_ids as keys and respetive output as outputs, can be a empty dict {} if unavailable). It treats both the prompt and the upstream outputs as inputs, processes them, and produces an output that is passed to downstream nodes.

An LLM node treats the upstream output as external input and its own prompt as an internal predefined input. It uses a transforming function (the func_transform parameter of the Node class) to convert these inputs and generate the output.

Edge

An edge represents a connection between nodes, directed from an upstream node to a downstream node. A node can have multiple upstream and downstream nodes. Itโ€™s crucial to avoid creating cycles, as they can lead to infinite loops, which are generally undesirable and forbbiden in a DAG. As a side note, you can still (and it is common) to use loops outside a DAG.

In the context of LLM agents, specific tasks often require multiple steps execute sequentially.

For example, in an LLM DAG, an upstream Node_A can point to a downstream Node_B with a directed edge A >> B (LangDAG uses >> as notation for a directed edge). This means Node_A executes first, and its output is passed along the edge A >> B to Node_B. The output of Node_A is typically referred to as the upstream output. Node_B can then use this upstream output in its execution.

Please note that upstream output in LangDAG refers to the outputs of the upstream nodes that have explicit connections to the node. For example, In A >> B >> C, the output of A will not be accessible to node C unless we explicit add another A >> C.

Node_A can also be called a parent node or the dependent of B.

Node_B can have multiple upstream nodes, such as Node_A1, Node_A2, etc. By default, Node_B will execute only after all upstream nodes have finished (later we will see how to change this default behavior).

Conversely, Node_B can also have multiple downstream nodes.

Conditional Edge

We just talked about simple edges, but LangDAG allows you to use conditional edges, which means you can decide whether or not to execute a downstream node based on if the outputs form upstream nodes met certent condition.

As a pseudo example,

A >> 1 >> B

create a conditional edge, which means if A outputs 1, B will execute.

Input and Output of a Node

As mentioned, nodes are connected by directed edges. They receive upstream outputs as external inputs and then generate outputs for downstream nodes.

The format of the upstream output for Node_B with two upstream nodes node_A1 and node_A2 is:

{node_A1: Any, node_A2: Any, ...}

Here, node_A1 and node_A2 are the node_ids of upstream nodes, and Any represents any type of Python object.

These keys identify the source of the output from different upstream nodes.

For a node with a single upstream node, the upstream output format is:

{node_A1: Any}

The output format of a node, say Node_B, is:

{node_B: Any}

where node_B is the node_id of the current node.

DAG (Directed Acyclic Graphs)

In LangDAG, a DAG is simply a huge or small workflow graph with nodes, connected with edges.

LangDAG orchestrates all nodes (steps in a workflow) in such graph structure.

DAG State (or dag_state)

As mentions, nodes process information from upstream and pass their current output to downstream nodes. Unless we explicitly set upstream or further upstream information in the current nodeโ€™s output (which can be verbose and unnecessary), a node cannot access information created by nodes not directly connected to it.

In LangDAG , we use a DAG State (dag_state), which is a info dict shared by all nodes in a DAG.

dag_state is initialized as

{
    "input": dag_input,
    "specs": {},
    "output": None
}

where dag_input is None if not specified when a DAG is created, specs is use to save all the specs of nodes, output is to set the output of the DAG. These keys are reserved, please do not override them in most cases.

Aside from the reserved keys, you can set anything to DAG State in the node's transformation, and subsequent nodes can access this DAG State during data transformation via the dag_state parameter in func_transform.

Please note that, a DAG State is defined on a LangDAG instance and is accessible to nodes within that DAG during execution.

Execution Behavior

In real scenarios, there can be multiple edges from upstream to a node, among which some are unconditional edges and others are conditional edges. We need a way to figure how to control the execution of nodes in a DAG.

Let us first have a look at node execution states (node.execution_state) as well as an important LangDAG concept "acceptable".

A node can have 3 state: initialized, finished, aborted.

  1. A node is initialized when it is create, added to a DAG yet to run.

  2. A node is finished if it is allowed to run and finished running.

  • Starting nodes in a DAG is always allowed to run.
  • For a node, an upstream node is "acceptable" if it is finished and condition met(if condition edge exist).
  • For a node, an upstream node is not "acceptable" if it is not finished or it is finished but condition not met.
  • By default behavoir, a node will execute and finished if all upstream nodes are "acceptable", otherwise it will not be finished.
  • Default behavoir can be changed to: a node will execute and finished if any upstream nodes are "acceptable", otherwise it will not be finished.
  1. If no exceptions occured, A node is either finished or aborted. (You will need to handle exceptions yourself.)

Putting It All Together

To summarize:

  • A node is a step for transforming data.
  • A DAG organizes these steps.
  • To create a DAG, we create nodes and connect them with (conditional) edges.
  • Nodes can process information and pass it to downstream nodes, enabling step-by-step data processing.
  • Shared information can be saved to DAG State , accessible to any subsequent nodes.
  • Execution behavior is up to whether all/any upstream nodes are "acceptable".

๐Ÿ”ง Setup

Installation

Install LangDAG using pip:

$ pip install langdag

From Python 3.12, you will need to create & activate a virtual environment before using the command.

Import

Then import it:

from langdag import Node

To save some time, though not necessary, you may import all of these before try out the examples.

from langdag import Node, LangDAG, run_dag
from langdag.processor import MultiThreadProcessor, SequentialProcessor
from langdag.selector import MaxSelector
from langdag.executor import LangExecutor
from langdag.utils import default, Emptyset, NonEmptyset, Superset, Subset
from langdag.decorator import make_node

๐Ÿ’ป Usage

Defining Nodes

To create a node in LangDAG, use the Node class. Hereโ€™s an example:

node_1 = Node(
    node_id="node_1",
    prompt="SOME PROMPT...",
    func_transform=lambda prompt, upstream_output, dag_state: ...(YOUR LOGIC HERE),
)

Main Parameters:

  • node_id: (required), a unique identifier for each node instance.

  • prompt: (optional, defaults to None), a predefined prompt for the node.

  • func_transform: (optional), a function that takes prompt, upstream_output, and dag_state as inputs and generates the output of the current node. If not defined, the output will be None. You can use a Python lambda function for simplicity or a regular function for more flexibility. Please note that upstream output in LangDAG refers to the outputs of the upstream nodes that have explicit connections to the node. For example, In NodeA >> NodeB >> NodeC, the output of NodeA will not be accessible to NodeC unless we explicit add another NodeA >> NodeC.

Hereโ€™s an example node_2 is generating an answer based on city name extracted by node_1 from the use query.

node_2 = Node(
    node_id="node_2",
    prompt="The weather in #CITY is #WEATHER.",
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt.replace('#CITY', upstream_output['node_1'])
              .replace('#WEATHER', get_weather(upstream_output['node_1'])),
)

In this case, the output might be The weather in NY is sunny..

Execution Order: When a node executes, func_transform runs first

After execution, the node output can be accessed using node.node_output.

Additionally, when making function / tool calling agents, you may want to add spec to the node by setting node.spec attribute or use node.add_spec() method after creating a node. For example,

node_2 = Node(
    node_id="node_2",
    prompt="The weather in #CITY is #WEATHER.",
    spec = {dummyNestDict...}
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt.replace('#CITY', upstream_output['node_1'])
              .replace('#WEATHER', get_weather(upstream_output['node_1'])),
)

or

node_2 = Node(
    node_id="node_2",
    prompt="The weather in #CITY is #WEATHER.",
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt.replace('#CITY', upstream_output['node_1'])
              .replace('#WEATHER', get_weather(upstream_output['node_1'])),
)

node.add_spec({dummyNestDict...})

After adding a spec to a node, you can acess the spec with node.spec, or you can alse get a list of specs of all nodes (if spec available) in a DAG (will be explained later) dag by dag.get_all_specs().

Define Nodes with Decorators

@make_node()

You can use the @make_node() decorator above a transforming function (ie func_transform) to create a node from that function. This method is useful for creating nodes where the node_id is the function (ie func_transform) name, and when the function is hard to defined with a simple lambda function.

For example, the following two methods of creating nodes are equivalent:

node_2 = Node(
    node_id="node_2",
    prompt = "some_prompt",
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt.replace('#CITY', upstream_output['node_1'])
              .replace('#WEATHER', get_weather(upstream_output['node_1'])),
)

and

from langdag.decorator import make_node

@make_node(prompt = "some_prompt")
def node_2(prompt, upstream_output, dag_state): 
    res = prompt.replace('#CITY', upstream_output['node_1']).replace('#WEATHER', get_weather(upstream_output['node_1']))
    return res

Though the @make_node() decorator provide a different way to create a node by directly associating the function with the node's transformation logic, the @make_node() decorator has the same functionality as the Node() class. It accepts the same parameters as Node(), except it uses the decorated function as func_transform, and the node_id defaults to the name of the decorated function if not explicitly set (the node_id parameter can also be manually set to an id that is different from the function name).

@@make_node(spec = {...})

Use spec parameter in @make_node() decorator to add function / tool spec to this node. This is optional, but will be helpful if you are working on function calling or tool calling, and want to define function / tool spec on a Node. After adding a spec to a node, you can acess the spec with node.spec, and you can also get a list of specs of all nodes (if spec available) in a DAG dag by dag.get_all_specs().

Default Upstream Output

If a node has only one upstream node, and you want to access its output directly in func_transform, you can use the default function from langdag.

from langdag.utils import default

node_2 = Node(
    node_id="node_2",
    prompt="The weather in #CITY is #WEATHER.",
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt.replace('#CITY', default(upstream_output))
              .replace('#WEATHER', get_weather(default(upstream_output))),
)

The default function accept a dict with a single item, otherwise it raise an error.

To improve reusability of a Node, we encourage using default instead of using upstream node_id. If there are multiple upstream outputs, use list(upstream_output.values()) to get them as a whole.

Define and Run a DAG (Syntax #1)

A Directed Acyclic Graph (DAG) represents a workflow with multiple steps and paths, unlike simple straight-line step-by-step workflows.

To add nodes to a DAG and connect them, use the following syntax:

from langdag import Node, LangDAG

user_question = "Tell me more about SF."

with LangDAG(dag_input=user_question) as dag:
    dag += node_1
    dag += node_2
    dag += node_3
    dag += node_4
    dag += node_5
    dag += node_6

    node_1 >> node_2
    node_2 >> node_3 >> node_6
    node_2 >> node_4 >> node_6
    node_2 >> node_5 >> node_6

or you can This code defines a DAG and connects nodes as shown:

โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚                                                                 โ”‚
โ”‚     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚     โ”‚node_1โ”‚ โ”€โ”€โ–ทโ”‚node_2โ”œโ”€โ”€โ–ทโ”‚ node_3 โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ–ทโ”‚node_6โ”‚            โ”‚
โ”‚     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ””โ”€โ”€โ”€โ–ฒโ”€โ”€โ”˜            โ”‚
โ”‚                    โ”‚       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”‚               โ”‚
โ”‚                    โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ–ทโ”‚ node_4 โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค               โ”‚
โ”‚                    โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ”‚               โ”‚
โ”‚                    โ”‚       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”‚               โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ node_5 โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                           โ”‚
โ”‚                                                                 โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

Example use case:

  1. node_1: Translate user question to English.
  2. node_2: Extracts a English city name (with LLM).
  3. node_3: Checks the city's introduction.
  4. node_4: Gets the city's best places to visit.
  5. node_5: Checks the city's weather.
  6. node_6: Combines all the information into a friendly response.

To execute the DAG, use the run_dag function:

from langdag import MultiThreadProcessor
user_question = "Tell me more about SF."
with LangDAG(dag_input=user_question) as dag:
    dag += node_1
    dag += node_2
    dag += node_3
    dag += node_4
    dag += node_5
    dag += node_6

    node_1 >> node_2
    node_2 >> node_3 >> node_6
    node_2 >> node_4 >> node_6
    node_2 >> node_5 >> node_6

    run_dag(dag)  # <--- THIS LINE
    # run_dag(dag, processor=MultiThreadProcessor())  # Or run concurrently

    print(dag.dag_stae["output"])

Note: The run_dag function can be used either within or not within the with LangDAG() context.

You can also run the DAG concurrently using MultiThreadProcessor. The final output will be accessible via dag.dag_state["output"].

Define and Run a DAG without Context Manager (Syntax #2)

For the above example, though it is recommended to use with context manager and simplified += and >> syntax, you can also define the DAG without these.

from langdag import MultiThreadProcessor
user_question = "Tell me more about SF."

dag = LangDAG(dag_input=user_question)
dag.add_node(node_1, node_2, node_3, node_4, node_5, node_6 )
dag.add_edge(node_1, node_2)
dag.add_edge(node_2, node_3)
dag.add_edge(node_2, node_4)
dag.add_edge(node_2, node_5)
dag.add_edge(node_3, node_6)
dag.add_edge(node_4, node_6)
dag.add_edge(node_4, node_6)

run_dag(dag)

print(dag.dag_stae["output"])

DAG Execution Observability

To improve observability and inspect the execution of a DAG, you can use the following method:

dag.inspect_execution()

This prints a tree-structure diagram. For example:

DAG INPUT: hello222
โ”—โ”โ” node_1, desc1, (โˆš), ## <- means node_execution_state=`finished` 
    OUTPUT: False
    โ”ฃโ”โ” node_3, node_desc_of_node_3, (X), ## <- means node_execution_state=`aborted` 
    โ”ƒ   OUTPUT: None, Condition not matched: {'node_1': True} ## output of node_1 is False but condition require True
    โ”—โ”โ” node_2, node_desc_of_node_2, (โˆš), 
        OUTPUT: False
        โ”—โ”โ” node_3, node_desc_of_node_3, (X), 
            OUTPUT: None, Condition not matched: {'node_2': 1}

In this diagram:

  • (โˆš) indicates that the node execution state is finished.
  • (X) indicates that the node execution state is aborted.
  • The output of each node is displayed along with conditions that were met or not met. (conditon explain later)

This visualization helps you understand the execution flow and identify any issues or unmet conditions in your DAG.

DAG Input or dag_input

As demonstrated in the previous example, you can use the dag_input parameter when instantiating a node to define an input for all nodes in the DAG:

with LangDAG(dag_input="SOMETHING") as dag:
    dag += node_1
    ...

All nodes can access the DAG input in the func_transform method of the node class by using dag_state["input"].

Find Starts & terminals in a DAG

To find all the starting and terminating nodes, use

print(dag.all_starts())
print(dag.all_terminals())

Cycles in a Graph

Cycles are not allowed in a DAG as they create infinite loops. For example:

with LangDAG() as dag:
    dag += node_1
    dag += node_2
    dag += node_3_1
    dag += node_3_2
    dag += node_4
    dag += node_5

    node_1 >> node_2
    node_2 >> node_3_1 >> node_4 >> node_1
    node_2 >> node_3_2

    run_dag(dag)

This will cause an error:

paradag.error.DAGCycleError: Cycle if add edge from "node_4" to "node_1"

Conditional Edges

LangDAG supports conditional execution with Conditional Edges. Let us compare two example to see the difference conditional edges make.

Without Conditional Edges:

# without conditional edges
with LangDAG("some_input") as dag:
    dag += node_input_clean
    dag += node_check_if_equation
    dag += node_eval
    dag += node_just_answer

    node_input_clean >> node_check_if_equation
    node_check_if_equation >> node_eval # node_eval need a internal logic to tell `if there is eq` first
    node_check_if_equation >> node_just_answer # node_just_answer need a internal logic to tell `if there is eq` first

    run_dag(dag)

With Conditional Edges:

# with conditional edges
with LangDAG("some_input") as dag:
    dag += node_input_clean
    dag += node_check_if_equation
    dag += node_eval
    dag += node_just_answer

    node_input_clean >> node_check_if_equation
    node_check_if_equation >> True >> node_eval # node_eval does not need a internal logic to tell `if there is eq` first, if condition not met node_eval will be aborted 
    node_check_if_equation >> False >> node_just_answer # node_just_answer does not need a internal logic to tell `if there is eq` first, if condition not met node_just_answer will be aborted 

    run_dag(dag)

Note: Conditional edges should be unique between two nodes and should only be used as follows:

node_1 >> [condition] >> node_2

Note: Conditional edges use == in Python to determine if the condition matches the upstream output. Consequently, the following boolean comparisons, for example, will also match:

True == 1  # returns True
False == 0  # returns True
{1:1} == {True:True} # returns True

This 'equal' property is also utilized in special condition defining, as we will see in "Conditions with Special Classes" later.

Note: Adding conditional edges without context manager (Syntax B), please use add_conditional_edge, for example:

 dag.add_conditional_edge(node_1, True, node_2)

Multiple Conditions and Upstream Nodes

As we mentioned in "Exectuion behavior":

  • For a node, an upstream node is "acceptable" if it is finished and condition met(if condition edge exist).
  • For a node, an upstream node is not "acceptable" if it is not finished or it is finished but condition not met.

By default, a node will execute and finished if all upstream nodes are "acceptable", otherwise it will not be finished (ie, itwill be aborted).

Consider the following example:

with LangDAG("my input") as dag:
    dag += node_1
    dag += node_2
    dag += node_3
    dag += node_5

    node_1 >> 1 >> node_5
    node_2 >> True >> node_5
    node_3 >> 3 >> node_4 >> node_5

    run_dag(dag)

In this case, by default, node_5 will execute only if

  • node_1 is finished (as a starting node, node_1 will always finished) and outputs 1
  • and node_2 is finished and outputs True (as a starting node, node_2 will always finished)
  • and node_4 is finished (ie., node_3 outputs 3 and the condition 3 is met, and node_4 is not aborted).

This behavior can be customized using the following node methods:

  • node.exec_if_any_upstream_acceptable()

For example, to make node_5 execute if any upstream node is acceptable, use the exec_if_any_upstream_acceptable() method on the node instance:

with LangDAG("my input") as dag:
    dag += node_1
    dag += node_2
    dag += node_3
    dag += node_5.node.exec_if_any_upstream_acceptable()`

    node_1 >> 1 >> node_5
    node_2 >> True >> node_5
    node_3 >> 3 >> node_4 >> node_5

    run_dag(dag)

Alternatively:

with LangDAG("my input") as dag:
    dag += node_1
    dag += node_2
    dag += node_3
    dag += node_5
    node_5.node.exec_if_any_upstream_acceptable()`

    node_1 >> 1 >> node_5
    node_2 >> True >> node_5
    node_3 >> 3 >> node_4 >> node_5

    run_dag(dag)

With this new behavior, node_5 will execute if either of node_1, node_2, node_4 is *acceptable, that is when

  • [node_1 is finished (as a starting node, node_1 will always finished) and outputs 1 ]
  • or [node_2 is finished and outputs True (as a starting node, node_2 will always finished) ]
  • or [node_4 is finished (ie., node_3 outputs 3 and the condition 3 is met, and node_4 is not aborted).]

Conditions with Special Classes

In some scenarios, you may want to execute a downstream node if a condition is a specific element or subset of the upstream output. Conversely, you might want to execute the downstream node if the upstream output is a specific element or subset of the condition.

For instance, consider the conditional edge nodeA >> 1 >> nodeB. If the upstream output is [1, 2, 3] and the condition is 1, since 1 is part of the list [1, 2, 3], the condition is met, and the downstream nodeB will execute.

Similarly, for nodeA >> [2, 3] >> nodeB, if the upstream output is [1, 2, 3] and the condition is [2, 3], since [2, 3] is a subset of [1, 2, 3], the condition is met, and nodeB will execute.

However, the syntax above suggests evaluating 1 == [1, 2, 3] or [2, 3] == [1, 2, 3], both of which return False, making them unsuitable for this purpose.

To handle such cases, LangDAG provides special classes:

  • Superset(list)
  • Subset(list)
  • Emptyset()
  • NonEmptyset()
  • PretransformSet(func, res)
  • NotPretransformSet(func, res)

These can be imported from langdag.utils.

For example, Superset has a custom equality (__eq__) method, allowing Superset(a) == b to return True if b is a superset of a.

from langdag.utils import Superset

Superset([1, 2, 3]) == [1, 2, 3, 4]  # returns True

Here is an example using Superset:

nodeA = Node(
    node_id=1,
    func_transform=lambda prompt, upstream_output, dag_state: [1, 2]
)

nodeB = Node(
    node_id=2,
    func_transform=lambda prompt, upstream_output, dag_state: default(upstream_output)
)

with LangDAG() as dag:
    dag += nodeA
    dag += nodeB
    nodeA >> Superset([1]) >> nodeB
    # nodeA output [1, 2] is a superset of [1]
    # so [1, 2] == Superset([1]) returns True
    # when the condition is True, the downstream node will execute.
    ...

In this example, [1, 2] == Superset([1]) returns True, triggering the execution of nodeB.

Similarly, you can use Subset, Emptyset, and NonEmptyset from langdag.utils to represent a subset of a list, an empty set, and a non-empty set, respectively.

PretransformSet is a convenient class to create conditions that are transformed results from upstream outputs. This is particularly useful if the upstream output is complex and you do not want to add an auxiliary edge between two nodes already connected with a conditional edge.

For example, suppose nodeA's output is {1: {nestDict...}, 2: {nestDict...}}, and "nodeA must output a dict containing 1 as a key" is the condition to execute nodeB. Without PretransformSet, you would need to add another node to get a simpler output:

with LangDAG() as dag:
    dag += nodeA
    dag += nodeA_if_1_in_key
    dag += nodeB
    nodeA_if_1_in_key >> True >> nodeB
    nodeA >> nodeB

This is inconvenient, and since nodeB has two upstream nodes, you must specify the node_id key when using upstream_output[node_id] in func_transform.

With PretransformSet, you can do as follows instead:

with LangDAG() as dag:
    dag += nodeA
    dag += nodeB
    nodeA >> PretransformSet(lambda x: list(x.keys()), Superset([1])) >> nodeB

Here we use PretransformSet and Superset together, and it will evaluate

a == PretransformSet(func=lambda x: list(x.keys()), res=Superset([1]))

which returns True because func(a) == res, where func(a) returns [1, 2] and [1, 2] is a superset of [1].

Please note that func is a function with a single parameter.

Note: The condition on conditional edges will only be evaluate if the upstream node is finished, so when you define a special conditon, you can suppose that there is always an output from upstream without first checking it.

Node execution_state

A node can be in one of three possible execution states (all represented as strings):

  • "initialized": The node is defined but not yet executed.
  • "finished": The node has executed successfully.
  • "aborted": The node was aborted due to unmet conditions.

Setting DAG Output

For terminating nodes, their outputs are saved to dag.dag_state["output"] as the final DAG output if their execution_state is not "aborted". In most LLM agent use cases, there should be only one terminating node with "finished" state after execution, so dag.dag_state["output"] will be set once.

However, in DAGs with multiple terminating nodes, the final output may be set multiple times in the order of node execution. This can add complexity and should be used cautiously.

Concurrent Execution

To enable concurrent execution of the DAG, use run_dag as shown below:

with LangDAG() as dag:
    ...
    run_dag(dag, processor=MultiThreadProcessor(), executor=LangExecutor())  # <--- THIS LINE

The default setting maximizes concurrent task execution. You can control this by setting the selector parameter to MaxSelector(N), where N is the maximum number of nodes that can run concurrently. For example, MaxSelector(4) allows up to 4 nodes to run concurrently:

with LangDAG() as dag:
    ...
    run_dag(
        dag,
        processor=MultiThreadProcessor(),
        selector=MaxSelector(4),
        executor=LangExecutor()
    )

Node Reset

When instantiated, a node has an internal state. To view this state, simply print the node:

print(node_1)

After a DAG run, a node's state may change. Nodes are reusable, but you often do not want to carry over the state from one DAG run to another. Reset the node's internal state before reusing it in a new DAG:

node_1.reset()

To reset all nodes in a previous DAG, use:

dag.reset_all_nodes()

Run DAG Silently

When running a DAG with run_dag, LangDAG will print useful logs.

If you like to run the DAG without standard output (for example in production), set the verbose parameter of LangExecutor to False:

with LangDAG() as dag:
    ...
    run_dag(
        dag,
        executor=LangExecutor(verbose=False) ## <---- Here
    )

or set verbose=False directly in run_dag, which will do the above job for you.

with LangDAG() as dag:
    ...
    run_dag(
        dag,
        verbose=False ## <---- Here
    )

Similarly, to run the DAG without progress bar, set the progressbar parameter of run_dag to False:

with LangDAG() as dag:
    ...
    run_dag(
        dag,
        progressbar=False ## <---- Here
    )

Node Hooks

You can set the parameters func_start_hook and func_finish_hook when instantializing a LangExecutor.

  • func_start_hook runs before node execution. It takes a function with two required positional parameters: node_id and node_desc.
  • func_finish_hook runs after node execution finishes. It takes a function with four required positional parameters: node_id, node_desc, execution_state, and node_output.

Example:

with LangDAG("some input") as dag:
    dag += node_input_clean
    dag += node_1
    dag += node_2
    dag += node_3.executeIfAnyMatch()

    node_input_clean >> node_1 >> node_2
    node_1 >> True >> node_3 
    node_2 >> 1 >> node_3

    myCustomExecutor = LangExecutor(
        verbose=False,
        func_start_hook=lambda node_id, node_desc: 
            print(f"----FAKE---- UI showing: starting `{node_id}` with desc `{node_desc}`"),
        func_finish_hook=lambda node_id, node_desc, execution_state, node_output: 
            print(f"----FAKE---- UI showing: finished `{node_desc}` with state `{execution_state}`")
    )

    run_dag(
        dag, 
        selector=MaxSelector(1),
        processor=SequentialProcessor(), 
        executor=myCustomExecutor
    )

print(dag.dag_state["output"])

Node Description

You can add a description to a Node by setting the node_desc parameter or the func_desc when creating a node.

node_1 = Node(
    node_id="node_1", 
    prompt="...",
    node_desc="THIS IS DESC",
    func_desc=lambda prompt, upstream_output, upstream_input: 
        f"THIS IS A DYNAMIC DESC FROM {prompt}",
    func_transform=...
)

node_desc is a static description, while func_desc dynamically creates a description from prompt, upstream_output, and upstream_input. If both are set, node_desc will be overridden by the value func_desc return.

Though optional, node descriptions are beneficial. For example, before executing a node, you may want to send a status message using a node hook function. This message can inform the user about the current action, such as "Getting weather..." or "Getting weather for: New York on 2024-01-01...".

Example:

node_1 = Node(
    node_id="node_get_weather", 
    prompt="Weather for %s on %s is %s",
    func_desc=lambda prompt, upstream_output, upstream_input: 
        f"Getting weather for: {upstream_output['node_0']} on {date.today().strftime("%d/%m/%Y")}",
    func_transform=lambda prompt, upstream_output, dag_state: 
        prompt % (
          upstream_output['node_0'], 
          date.today().strftime("%d/%m/%Y"), 
          get_weather(
                      upstream_output['node_0'], 
                      date.today()
                      )
        )
)

Setting a hook:

myCustomExecutor = LangExecutor(
    verbose=False,
    func_start_hook=lambda node_id, node_desc: 
        print(f"----FAKE---- UI showing: starting `{node_desc}`"),
    func_finish_hook=lambda node_id, node_desc, execution_state, node_output: 
        print(f"---FAKE----- UI showing: finished `{node_desc}`")
)

Function calling (to-do)

(see adapted openai function calling example)

๐Ÿ“• API Reference

Main Classes

Node (class)

The Node class in LangDAG represents a single unit of work or computation in the DAG.

from langdag import Node

Parameters:

  • node_id (Any, required):
    A unique identifier for the node.

  • node_desc (Any, optional, defaults to None):
    A description of the node, accessible via node.node_desc.

  • prompt (Any, optional, defaults to None):
    A predefined prompt for the node.

  • spec (Dict | Any, optional, defaults to None): A optional property for saving specification of the node as a tool (Example spec: https://cookbook.openai.com/examples/how_to_call_functions_with_chat_models#basic-concepts )

  • func_desc (Callable, optional, defaults to None):
    A function that generates a dynamic description from prompt, upstream_output, and dag_state.

  • func_transform (Callable, optional, defaults to None):
    A function that transforms prompt, upstream_output, and dag_state into the node's output.

  • func_set_dag_output_when (Callable, optional, defaults to None):
    A function returns boolean that decides whether the node_output should be set as the final output of the DAG (dag.dag_state["output"]) based on prompt, upstream_output, node_output, and execution_state.

Instance Methods:

  • reset() -> None: Resets the node to its original state as when instantiated.

  • get_info() -> Dict: Returns a dict containing attributes of the node.

  • add_spec(spec_dict: Dict) -> None: Save parameter tool spec spec_dict to node.spec

  • exec_if_any_upstream_finished():
    Configures the node to execute when any upstream output matches the required conditions on conditional edges.

  • exec_if_any_upstream_finished: NOT default behavior. Configures the node to execute when any upstream nodes finished & all (by default, or any if use exec_if_any_condition_met in meantime) upstream outputs match the required conditions on conditional edges (if there are conditional edges, otherwise deemed conditions met).

  • exec_if_any_upstream_acceptable: NOT default behavior. Configures the node to execute when any upstream nodes acceptable. "acceptable" meaning see docs.

LangDAG (class)

The LangDAG class defines the Directed Acyclic Graph (DAG) structure for managing nodes and their execution.

from langdag import LangDAG

Parameters:

  • dag_input (Any, optional, defaults to None):
    input for a dag, accessible to func_transform in every Node.

Instance Methods:

  • all_starts():
    Returns all the starting nodes in the DAG.

  • all_terminals():
    Returns all the terminating nodes in the DAG.

  • reset_all_nodes():
    Reset all nodes (node.reset) in this dag to its original state (when instantialized)

  • inspect_execution():
    Print to console a rich.tree to show DAG execution (dag.inspect_execution)

LangExecutor (class)

The LangExecutor class handles the execution of nodes in the DAG, with optional hooks for custom behavior.

from langdag.executor import LangExecutor

Parameters:

  • verbose (boolean, optional, defaults to True):
    When verbose=True, execution information is printed to the console.

  • func_start_hook (Callable, optional, defaults to None):
    A function that takes node_id and node_desc, executing custom actions before the node executes.

  • func_finish_hook (Callable, optional, defaults to None):
    A function that takes node_id, node_desc, execution_state, and node_output, executing custom actions after the node finishes executing.

Functions

run_dag(dag, processor, selector, executor, verbose, slower, progressbar) (function)

Executes the DAG with various configurations for processing and execution.

from langdag import run_dag
# Optional imports for processors:
from langdag.processor import SequentialProcessor, MultiThreadProcessor
# Optional imports for selectors:
from langdag.selector import FullSelector, MaxSelector

Parameters:

  • dag (LangDAG, required):
    The DAG to run.

  • processor (optional, defaults to SequentialProcessor()):
    Can be set to SequentialProcessor() for sequential execution or MultiThreadProcessor() for concurrent execution.

  • selector (optional, defaults to FullSelector()):
    When using MultiThreadProcessor(), set to FullSelector() for unlimited concurrent execution, or use MaxSelector(max_no) to limit the maximum number of nodes executing concurrently to max_no.

  • executor (optional, defaults to LangExecutor):
    An instance of LangExecutor for executing the DAG.

  • verbose (Boolean, optional, defaults to True):
    When set to False, it disable verbose logging.

  • slower (Boolean, optional, defaults to False):
    When set to True, it slow down every node execution by 1 sec; When set to a number N, it slow down every node execution by N sec.

  • progressbar (Boolean, optional, defaults to True):
    By default set to True, a progress bar shows up when runing a dag. When set to False, it disable progressbar.

default(dict) (function)

Retrieves the default value from a dictionary containing a single item. If the dictionary does not have exactly one item, it raises an error.

from langdag.utils import default

Parameters:

  • dict (Dict, required):
    A dictionary with a single item.

Usage Example:

upstream_output = {'key_1': 'value_1'}
default(upstream_output)  # returns 'value_1'

Cutomization with paradag

langdag uses paradag as the DAG engine. This means you can customize processors, selectors, and executors according to your specific needs with paradag.

Contributing

Contributions are welcome! Please submit a pull request or open an issue to discuss your ideas.

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langdag-0.1.0a0.tar.gz (140.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langdag-0.1.0a0-py3-none-any.whl (24.9 kB view details)

Uploaded Python 3

File details

Details for the file langdag-0.1.0a0.tar.gz.

File metadata

  • Download URL: langdag-0.1.0a0.tar.gz
  • Upload date:
  • Size: 140.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for langdag-0.1.0a0.tar.gz
Algorithm Hash digest
SHA256 1b5fb2f8acd6f8580c2b6922cc0df034f2e7ffef7c7eb674565897acf8aa1f47
MD5 0d38b7adbf091414a819d68baffce5b9
BLAKE2b-256 9f2aeeec346e1d644337630c27a46e3594f147d0c44d3eb2f91fd2dd6db0aa47

See more details on using hashes here.

File details

Details for the file langdag-0.1.0a0-py3-none-any.whl.

File metadata

  • Download URL: langdag-0.1.0a0-py3-none-any.whl
  • Upload date:
  • Size: 24.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for langdag-0.1.0a0-py3-none-any.whl
Algorithm Hash digest
SHA256 126083936f2b12dd6fe7b0d5c834e393cca4a733b735c9d04ceeaa1724b0b274
MD5 86e25fb9cb70bc846967f6dcf428d0fd
BLAKE2b-256 136afa03c4acb29729e838720f54317f2aaab544140b0b6ae88181a84fe122d6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page