Skip to main content

A tiny async task-tree orchestrator for Python, behavior-tree inspired and LLM-ready.

Project description

tinytasktree

CI

A tiny async task-tree orchestrator library for Python, behavior-tree inspired and LLM-ready.

Why tinytasktree?

  • Modular, composable task graph building blocks
  • Behavior-tree inspired control flow with explicit success/failure semantics
  • Async-first execution with local trace visualization

Hello World

from tinytasktree import Tree

tree = (
    Tree("HelloWorld")
    .Sequence()
    ._().Function(lambda: "hello")
    .End()
)

LLM Example

from dataclasses import dataclass
from tinytasktree import Context, JSON, LLMModel, LLMProvider, Tree

@dataclass
class Blackboard:
    prompt: str
    response: str = ""


def make_messages(b: Blackboard) -> list[JSON]:
    return [{"role": "user", "content": b.prompt}]


provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model = LLMModel(
    "gpt-4.1-mini",
    provider=provider,
    input_price_per_m=0.15,
    output_price_per_m=0.60,
)

tree = (
    Tree[Blackboard]("HelloWorld")
    .Sequence()
    ._().LLM(model, make_messages)
    ._().WriteBlackboard("response")
    .End()
)

async def main():
    context = Context()
    blackboard = Blackboard(prompt="Say hello in JSON.")
    async with context.using_blackboard(blackboard):
        result = await tree(context)

    print(result)
    print(blackboard.response)

Requirements

  • Python 3.13+
  • openai-python (only needed for LLM nodes)
  • A cache store backend is only needed for Cacher and Terminable nodes

Features

  • Minimal, expressive tree builder API
  • Async-first execution model
  • Leaf / Composite / Decorator nodes built-in
  • LLM integration via openai-python
  • Store-backed caching and termination signaling
  • Trace collection and optional trace storage
  • UI trace viewer with HTTP server

API Stability

Alpha. Expect breaking changes until the API is stabilized.

⚠️ Warning: This is currently only at ALPHA STAGE, and future API changes may introduce breaking changes.

Installation

uv add tinytasktree

or

pip install tinytasktree

UI Trace Server

Save traces into the same directory that the backend serves, for example:

from tinytasktree import Context, FileTraceStorageHandler

storage = FileTraceStorageHandler(".traces")

context = Context()
async with context.using_blackboard(blackboard):
    result = await tree(context)

trace_id = await storage.save(context.trace_root())
print("Trace URL:", f"http://127.0.0.1:8000/{trace_id}")

Then start the backend and UI:

python -m tinytasktree --httpserver --host 127.0.0.1 --port 8000 --trace-dir .traces
# Visit http://127.0.0.1:8000

Notes:

  • --trace-dir .traces must point to the same directory used by FileTraceStorageHandler(".traces")
  • Opening http://127.0.0.1:8000/ lists saved traces in the current trace directory, newest first
  • Opening http://127.0.0.1:8000/<trace_id> loads a specific trace directly

Design Notes

  • Execution model: nodes are awaited; composite nodes control child ordering and concurrency
  • Results: nodes return OK(data) or FAIL(data) and composites propagate or short-circuit
  • Blackboard: a per-run data object passed through the tree via Context

Table of Contents

Node Reference [↑]

Node Result & Status [↑]

  • Every node returns a Result with a status (OK or FAIL) and an optional data payload
  • Composite nodes typically short-circuit on FAIL (e.g., Sequence) or on OK (e.g., Selector)
  • Decorators can override or invert status while preserving or transforming data

Leaf Nodes [↑]

Function [↑]

Runs a sync/async function. Returns OK(data) for non-Result return values, or passes through a Result.

Usage:

  • Accepts 0/1/2 params: (), (blackboard), (blackboard, tracer)
  • Sync or async functions are supported
  • Returning Result bypasses wrapping; otherwise OK(value)

Supported function forms:

  • () -> Any or () -> Result
  • (blackboard) -> Any or (blackboard) -> Result
  • (blackboard, tracer) -> Any or (blackboard, tracer) -> Result
  • Async variants of all the above
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: "ok1")
    ._().Function(lambda blackboard: "ok2")
    ._().Function(lambda blackboard, tracer: "ok3")
    .End()
)

Log [↑]

Logs a message into the trace. Always returns OK(None).

Usage:

  • msg_or_factory: string or (blackboard) -> str
  • level: trace level (default: info)
  • Emits a trace log entry and continues
tree = (
    Tree()
    .Sequence()
    ._().Log("hello step1")
    ._().Log(lambda b: f"hello, step2: job={b.job_id}", level="debug")
    .End()
)

TODO [↑]

A placeholder node that always returns OK(None).

Usage:

  • No-op leaf for scaffolding or TODO spots
  • Replace with real nodes later
tree = (
    Tree()
    .Sequence()
    ._().TODO("Prepare the Params")
    ._().TODO("Call the LLM")
    ._().Function(real_step)
    ._().TODO("Collect the result")
    .End()
)

ShowBlackboard [↑]

Returns the current blackboard in OK(b).

Usage:

  • Helpful for debugging or inspection
  • Downstream nodes can consume the returned blackboard
tree = (
    Tree()
    .ShowBlackboard()
    .End()
)

WriteBlackboard [↑]

Writes the previous node's result into the blackboard, and returns OK(data).

Usage:

  • attr_or_func: attribute name or (blackboard, data) -> None
  • Reads last_result.data; warns if no last result
  • Returns OK(data) (or OK(None) if missing)
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: 123)
    ._().WriteBlackboard("value")
    .End()
)

or:

def _set_value(b: Blackboard, v: int) -> None:
    b.double_value = v * 2

tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: 7)
    ._().WriteBlackboard(_set_value)
    .End()
)

Assert [↑]

Checks a boolean condition and returns OK(True) or FAIL(False).

Usage:

  • Condition can be attr name or function
  • Sync/async; params 0/1/2: (), (blackboard), (blackboard, tracer)
  • AssertionError is treated as false
tree = (
    Tree()
    .Sequence()
    ._().Assert(lambda: True)
    ._().Assert("is_ready") # checks `blackboard.is_ready`
    ._().Function(run_job)
    .End()
)

Failure [↑]

Always returns FAIL(None).

Usage:

  • Useful for tests, guards, or forcing failures
tree = (
    Tree()
    .Selector()
    ._().Assert("has_cache")
    ._().Failure()
    .End()
)

Subtree [↑]

Runs another tree, optionally with a custom blackboard factory.

Usage:

  • subtree_blackboard_factory: (parent_blackboard) -> child_blackboard
  • Result is the subtree's result
sub = (
    Tree()
    .Sequence()
    ._().Function(lambda: "x")
    .End()
)

tree = (
    Tree()
    .Sequence()
    ._().Subtree(sub) # or _().Subtree(sub, lambda b: SubBlackboard(b.text))
    .End()
)

ParseJSON [↑]

Parses JSON from the last result or from a blackboard source, and returns the parsed object.

Usage:

  • src: last result (default), blackboard attr, or (blackboard) -> str
  • dst: optional blackboard attr or (blackboard, data) -> None
  • Strips common ```json fences before parsing
  • Default loader tries json_repair if installed, otherwise orjson, otherwise the standard library json
  • Recommended: install json_repair when parsing LLM-generated or otherwise non-strict JSON
tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: '{"a":1}')
    ._().ParseJSON(dst="data")
    .End()
)

If json_repair is installed, no extra code is needed for tolerant parsing:

tree = (
    Tree()
    .Sequence()
    ._().Function(lambda: '{"a": 1')
    ._().ParseJSON(dst="data")
    .End()
)

Another example:

def set_parsed_value(b: Blackboard, d: JSON) -> None:
    b.parsed = d

tree = (
    Tree()
    .Sequence()
    ._().ParseJSON(src="raw_json", dst=set_parsed_value)
    .End()
)

LLM [↑]

Calls an LLM via openai-python and returns the output text. Supports streaming, API key factories, and OpenAI-compatible base URLs for custom LLM gateways.

Usage:

  • model / messages can be values or (blackboard) -> ... factories
  • stream: bool or (blackboard) -> bool; stream_on_delta supports sync/async callbacks
  • api_key: string or factory (blackboard) / (blackboard, model)
  • base_url: string or (blackboard) -> str | None for OpenAI-compatible providers
  • client_kwargs: explicit kwargs forwarded to AsyncOpenAI(...)
  • extra_body: explicit provider-specific request body fields merged into extra_body
  • **llm_call_kwargs: regular request kwargs forwarded to chat.completions.create(...)
  • LLMModel can optionally carry input_price_per_m / output_price_per_m in USD per 1M tokens
  • Tracer records tokens when the provider returns usage
  • Cost is taken from provider metadata when available; otherwise it falls back to token usage and the LLMModel prices
provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model = LLMModel(
    "gpt-4.1-mini",
    provider=provider,
    extra_body={"reasoning": {"enabled": False}},
    input_price_per_m=0.15,
    output_price_per_m=0.60,
)

tree = (
    Tree()
    .Sequence()
    ._().LLM(model, [{"role": "user", "content": "hi"}])
    .End()
)

Streaming response example:

from tinytasktree import Tree

def on_delta(b, full, delta, done, reason=""):
    if delta:
        print(delta, end="")

tree = (
    Tree()
    .Sequence()
    ._().LLM(lambda b: b.model, lambda b: b.messages, stream=True, stream_on_delta=on_delta)
    .End()
)

Tool Call [↑]

LLMNode supports tool calls out of the box: when the LLM returns tool_calls, the node executes them via tool_executor, appends results to the conversation, and re-calls the LLM until the final answer is reached (up to max_iterations).

from tinytasktree import JSON, ToolCall, ToolDef, ToolFunctionDef, Tree

TOOLS = [
    ToolDef(
        type="function",
        function=ToolFunctionDef(
            name="get_weather",
            description="Get the current weather in a given location",
            parameters={"type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"]},
        ),
    ),
]

def tool_executor(b: Blackboard, tool_call: ToolCall) -> JSON:
    return {"location": "Beijing", "weather": "sunny", "temperature": 25}

tree = (
    Tree[Blackboard]("WeatherApp")
    .Sequence()
    ._().LLM(model, make_messages, tools=TOOLS, tool_executor=tool_executor, max_iterations=5)
    ._().WriteBlackboard("weather_result")
    .End()
)

tool_executor supports sync/async and tools accepts ToolDef dataclasses (or raw dicts). Streaming tool calls are also supported -- deltas are accumulated and executed after the stream completes.

Composite Nodes [↑]

Sequence [↑]

Runs children in order. Returns FAIL on first failure, otherwise OK(last_child_data).

Usage:

  • Stops on first FAIL and returns last successful data
  • Empty sequence returns OK(None)
tree = (
    Tree()
    .Sequence()
    ._().Function(A)
    ._().Function(B)
    .End()
)

Selector [↑]

Runs children in order until one succeeds. Returns the first OK, else FAIL.

Usage:

  • Short-circuits on first success
  • Empty selector returns OK(None)
tree = (
    Tree()
    .Selector()
    ._().Failure()
    ._().Function(lambda: "ok")
    .End()
)

Selector is the typical choice for the fallback chain pattern:

provider = LLMProvider(base_url="https://llm.example/v1", api_key="...")
model1 = LLMModel("model1", provider=provider)
model2 = LLMModel("model2", provider=provider)
model3 = LLMModel("model3", provider=provider)

tree = (
    Tree()
    .Selector()
    ._().Timeout(20)
    ._()._().LLM(model1, llm_message)
    ._().Timeout(20)
    ._()._().LLM(model2, llm_message)
    ._().Timeout(20)
    ._()._().LLM(model3, llm_message)
    .End()
)

Parallel [↑]

Runs children concurrently. Returns OK only if all children succeed.

Usage:

  • concurrency_limit must be > 0
  • Result data is None
tree = (
    Tree()
    .Parallel(concurrency_limit=2)
    ._().Function(A)
    ._().Function(B)
    .End()
)

Gather [↑]

Runs multiple subtrees with their own blackboards and returns a list of results.

Usage:

  • params_factory: (blackboard) -> (trees, blackboards)
  • Runs each tree with its paired blackboard
  • Returns list of child data in tree order
tree = (
    Tree()
    .Gather(lambda b: (trees, blackboards))
    .End()
)

In a more detailed example:

def build_params(b):
    trees = [subtree1, subtree2]
    bbs = [BB(x=1), BB(x=2)]
    return trees, bbs

tree = (
    Tree()
    .Gather(build_params, concurrency_limit=2)
    .End()
)

RandomSelector [↑]

Randomizes the child order (optionally weighted) and returns the first OK.

Usage:

  • weights: list or (blackboard) -> list[float]
  • Weights must be positive and match child count
tree = (
    Tree()
    .RandomSelector(weights=[0.4, 0.4, 0.2]) # or weights from a factory: lambda b: b.route_weights
    ._().Function(A)
    ._().Function(B)
    ._().Function(C)
    .End()
)

If / Else [↑]

Conditional branch. If the condition is false and no else branch exists, returns OK(None).

Usage:

  • Condition supports attr name or function (sync/async)
  • 1 child (if) or 2 children (if + else)
  • Else node must be a child of If
tree = (
    Tree()
    .If(lambda b: b.flag)
    ._().Function(A)
    ._().Else()
    ._()._().Function(B)
    .End()
)

Or uses only If (without Else):

tree = (
    Tree()
    .If("is_admin")
    ._().Function(admin_flow)
    .End()
)

Decorator Nodes [↑]

ForceOk [↑]

Forces the result status to OK, optionally with a custom data factory.

Usage:

  • Optional result_factory(blackboard) -> data
  • If omitted, preserves child data
tree = (
    Tree()
    .ForceOk()
    ._().Failure()
    .End()
)

Or a ForceOk overriding the result:

tree = (
    Tree()
    .ForceOk(lambda b: {"skipped": True})
    ._().Function(best_effort)
    .End()
)

ForceFail [↑]

Forces the result status to FAIL, optionally with a custom data factory.

Usage:

  • Optional result_factory(blackboard) -> data
  • If omitted, preserves child data
tree = (
    Tree()
    .ForceFail()
    ._().Function(lambda: "x")
    .End()
)

Return [↑]

Preserves child status but replaces data with a factory result.

Usage:

  • result_factory(blackboard) -> data
  • Status is unchanged from child
tree = (
    Tree()
    .Return(lambda b: "data")
    ._().Function(A)
    .End()
)

Invert [↑]

Inverts child status while keeping data.

Usage:

  • OK becomes FAIL, FAIL becomes OK
  • Data is preserved
tree = (
    Tree()
    .Invert()
    ._().Failure()
    .End()
)

Retry [↑]

Retries a child on failure for up to max_tries with optional sleeps.

Usage:

  • sleep_secs: float or list per retry index
  • Returns first OK, else FAIL(None)
tree = (
    Tree()
    .Retry(max_tries=3, sleep_secs=0.1) # or usage: [0.1, 0.2, 0.5]
    ._().Function(A)
    .End()
)

While [↑]

Repeats child while condition is true, returns the last successful result.

Usage:

  • Condition supports attr name or function (sync/async)
  • max_loop_times guards infinite loops
tree = (
    Tree()
    .While(lambda b: b.count < 3)
    ._().Function(step)
    .End()
)

Timeout / Fallback [↑]

Runs a child with a time limit. On timeout, runs the fallback child if provided.

Usage:

  • Timeout has 1 child (main) or 2 (main + fallback)
  • On timeout, returns FAIL(None) or executes fallback
  • Fallback node must be a child of Timeout or Terminable
tree = (
    Tree()
    .Timeout(1.0)
    ._().Function(slow)
    ._().Function(on_timeout)
    .End()
)

With .Fallback():

tree = (
    Tree()
    .Timeout(2.0)
    ._().Function(main_job)
    ._().Fallback()
    ._()._().Function(fallback_job)
    .End()
)

Cacher [↑]

Caches child results in a key-value store. Optional value_validator invalidates stale cache.

Usage:

  • key_func(blackboard) -> str, required store
  • expiration: seconds, timedelta, or random (min, max)
  • value_validator: (blackboard) or (blackboard, tracer)
  • enabled: bool or (blackboard) -> bool
tree = (
    Tree()
    .Cacher(key_func=lambda b: b.key, store=store, enabled=lambda b: b.use_cache)
    ._().Function(expensive_call)
    .End()
)

Redis example:

tinytasktree itself does not depend on Redis, but redis.asyncio.Redis can be used directly as the store:

import redis.asyncio as redis

store = redis.Redis.from_url("redis://127.0.0.1:6379")

tree = (
    Tree()
    .Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, expiration=60)
    ._().Function(fetch_user)
    .End()
)

With a value_validator, the cache is only considered a hit if this value matches the one stored during the cache set. This is useful for invalidating cache when dependent logic or state changes:

tree = (
    Tree()
    .Cacher(key_func=lambda b: f"user:{b.user_id}", store=store, value_validator=lambda b: b.version)
    ._().Function(fetch_user)
    .End()
)

Terminable [↑]

Runs a child while polling a store key for termination. Optionally runs a fallback.

Usage:

  • key_func(blackboard) -> signal_key, required store
  • Monitors until key exists; then cancels child
  • 1 child (main) or 2 (main + fallback)
tree = (
    Tree()
    .Terminable(lambda b: f"stop:{b.job_id}", store=store)
    ._().Function(A)
    ._().Fallback()
    ._()._().Function(B)
    .End()
)

# To trigger termination from an external script or process:
await store.set(f"stop:{job_id}", "1")

Wrapper [↑]

Wraps a child with a custom async context manager.

Usage:

  • func(child, context) -> async context manager yielding a Result
  • Useful for custom setup/teardown or instrumentation
from contextlib import asynccontextmanager

@asynccontextmanager
async def traced(child, context):
    try:
        print("before run")
        result = await child(context)
        yield result
    finally:
        print("after run")

tree = (
    Tree()
    .Wrapper(traced)
    ._().Function(run)
    .End()
)

Core APIs (Non-Node)

  • Context: runtime state (blackboard stack, trace root, path)
  • TraceRoot / TraceNode: structured trace tree
  • TraceStorageHandler / FileTraceStorageHandler: save and load traces
  • register_global_hook_after_spawned_task_finish(hook): hook for Parallel/Gather/Terminable tasks
  • run_httpserver(host, port, trace_dir) / create_http_app(...): built-in HTTP trace server

Contributing

  • Install dev dependencies: uv sync --dev
  • Build a wheel or sdist with bundled UI assets: uv build
  • Lint: uv run ruff check .
  • Test: uv run pytest

License

MIT. See LICENSE.txt.

TODO

  • Metrics Handler
  • Build Tasktree from json

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinytasktree-0.1.5.tar.gz (401.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tinytasktree-0.1.5-py3-none-any.whl (401.8 kB view details)

Uploaded Python 3

File details

Details for the file tinytasktree-0.1.5.tar.gz.

File metadata

  • Download URL: tinytasktree-0.1.5.tar.gz
  • Upload date:
  • Size: 401.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tinytasktree-0.1.5.tar.gz
Algorithm Hash digest
SHA256 16250136666d1d5ed25128af7369601da4cdb7874e4c831d91d4995686488c04
MD5 8dec1140a6f3cf8f5d9bfd30e8820d16
BLAKE2b-256 8fdbdaf565f384c66968ecc0f70d7183db153a367270db150db19f61167b3991

See more details on using hashes here.

File details

Details for the file tinytasktree-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: tinytasktree-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 401.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for tinytasktree-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 26320fd7e77d737959c766ce6740dcb24ec14445610aa2ddec24c1d0105d7e22
MD5 79efbae1f1f022572e7b492b5d2c2809
BLAKE2b-256 77acbc415a58e2b65e36a52dd3cbaee13819d6cce2c40b1b852680c523276f76

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page