Skip to main content

Deterministic Python runtime for relational execution lineage DAGs.

Project description

Musubito

CI PyPI License: AGPL v3 Python 3.10+

Musubito records execution lineage so agentic LLM workflows can safely skip redundant expensive calls instead of recomputing the same DAG steps.

Why Musubito?

LLM-heavy pipelines often re-run the same expensive steps because the runtime has no durable memory of what was executed, which inputs were used, and which upstream results contributed to the output.

Musubito gives each execution node a deterministic identity derived from the operation name, canonical input hash, and sorted upstream node IDs. If the same logical node is reached again and its replay policy allows reuse, Musubito returns the stored artifact instead of executing the function again.

Lineage is stored locally in SQLite, so replay decisions are fast, deterministic, and inspectable without requiring a remote service.

Fan-in DAG patterns are first-class: musubito_merge() lets an aggregate step explicitly depend on multiple upstream MusubitoResult[T] values.

Install

pip install musubito

Core Concepts

A node is one recorded execution of a decorated function. Its identity is computed from three things: the operation name, the canonical hash of the function inputs, and the sorted set of upstream node IDs. This makes node identity stable across runs when the logical computation is the same.

Replay means Musubito returns a previously stored artifact instead of calling the decorated function again. Replay is allowed when the stored node is successful, not stale, not forced to re-execute, and any configured TTL has not expired.

StepType.DETERMINISTIC marks work that is safe to replay freely, such as pure transformations or deterministic parsers.

StepType.STOCHASTIC marks work that may produce different outputs, such as LLM calls. It can still be replayed intentionally, often with a TTL to bound how long the stored result remains valid.

StepType.EXTERNAL_EFFECT marks work with side effects, such as network calls or tool invocations. When replay is allowed, Musubito returns the stored artifact; it does not re-run the external effect. Use force_reexecution=True when the effect must be issued again.

An Artifact is the persisted output of a node. MusubitoResult[T] points to that artifact and carries both the current DAG node ID and the historical producer node ID.

musubito_merge() declares an explicit multi-parent context. It is used when one step depends on multiple previous MusubitoResult[T] values, making fan-in DAG edges visible to the lineage engine.

Quickstart

import asyncio

from musubito import MusubitoResult, musubito_merge, musubito_step

calls = {
    "fetch_context": 0,
    "fetch_policy": 0,
    "summarize": 0,
}


@musubito_step()
async def fetch_context(topic: str) -> dict[str, str]:
    calls["fetch_context"] += 1
    return {"topic": topic, "source": "local"}


@musubito_step()
async def fetch_policy(topic: str) -> dict[str, str]:
    calls["fetch_policy"] += 1
    return {"topic": topic, "policy": "strict-replay"}


@musubito_step()
def summarize(
    context: MusubitoResult[dict[str, str]],
    policy: MusubitoResult[dict[str, str]],
) -> dict[str, str]:
    calls["summarize"] += 1
    return {
        "topic": context.value["topic"],
        "source": context.value["source"],
        "policy": policy.value["policy"],
    }


async def main(label: str) -> None:
    context, policy = await asyncio.gather(
        fetch_context("lineage"),
        fetch_policy("lineage"),
    )

    with musubito_merge(context, policy):
        result = summarize(context, policy)

    print(label, result.value)
    print(label, calls)


asyncio.run(main("first run"))
asyncio.run(main("second run"))

# Assumes a fresh .musubito/musubito.db in the current working directory.
# The second asyncio.run(...) replays from SQLite in the same Python process
# and does not re-execute the decorated functions, so each counter remains at 1.

Decorated functions return MusubitoResult[T]. The raw user value is available through .value; the wrapper is not unwrapped automatically.

Step Configuration

The default configuration is deterministic and requires no extra setup:

from musubito import musubito_step


@musubito_step()
def normalize(text: str) -> str:
    return text.strip().lower()

For stochastic work, such as an LLM call, use a TTL when the cached answer should only be reused for a bounded time:

from musubito import StepConfiguration, StepType, musubito_step

llm_semantics = StepConfiguration(
    step_type=StepType.STOCHASTIC,
    ttl_seconds=3600,
)


@musubito_step(semantics=llm_semantics)
def draft_answer(prompt: str) -> str:
    return prompt.upper()

To force a stochastic step to run again, set force_reexecution=True:

from musubito import StepConfiguration, StepType, musubito_step

fresh_semantics = StepConfiguration(
    step_type=StepType.STOCHASTIC,
    force_reexecution=True,
)


@musubito_step(semantics=fresh_semantics)
def generate_fresh_answer(prompt: str) -> str:
    return prompt.upper()

For external effects, Musubito stores and returns the artifact when replay is allowed. The side effect itself is not repeated unless force_reexecution=True is used:

from musubito import StepConfiguration, StepType, musubito_step

external_semantics = StepConfiguration(
    step_type=StepType.EXTERNAL_EFFECT,
)


@musubito_step(semantics=external_semantics)
def call_external_tool(payload: dict[str, str]) -> dict[str, str]:
    return {"status": "recorded", "id": payload["id"]}

Using a Custom Engine

Use use_musubito_engine() when you want explicit control over the storage path or engine instance:

from musubito import (
    MusubitoEngine,
    SQLiteStorage,
    musubito_step,
    use_musubito_engine,
)


@musubito_step()
def summarize_text(text: str) -> str:
    return text.upper()


with SQLiteStorage(db_path=".my_run/run.db") as storage:
    engine = MusubitoEngine(storage)

    with use_musubito_engine(engine):
        result = summarize_text("Musubito records deterministic lineage.")

print(result.value)

Storage

Musubito uses SQLite as its local relational storage layer. By default, it stores runtime data under:

.musubito/musubito.db

The path can be customized with SQLiteStorage(db_path=...).

SQLite is opened in WAL mode and uses short BEGIN IMMEDIATE write transactions for node, artifact, edge, and invalidation updates. This keeps local concurrent writes predictable while still allowing normal reads.

Downstream invalidation is performed in place with a recursive CTE. When a node output changes, dependent downstream nodes can be marked stale so future runs recompute only the affected part of the DAG.

Use Cases

  • Agentic LLM pipelines with repeated planning, tool, or synthesis steps.
  • Multi-step RAG workflows where retrieval, filtering, and summarization can be replayed.
  • CI regression workflows that call expensive tools or model-based checks.
  • Long-running research agents that need durable local execution memory.
  • Reproducible evaluation pipelines with explicit lineage and cache boundaries.

License

Musubito is dual-licensed:

Open-source projects and personal use: AGPL-3.0. Closed-source or commercial products: commercial license required.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

musubito-0.1.1.tar.gz (27.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

musubito-0.1.1-py3-none-any.whl (28.6 kB view details)

Uploaded Python 3

File details

Details for the file musubito-0.1.1.tar.gz.

File metadata

  • Download URL: musubito-0.1.1.tar.gz
  • Upload date:
  • Size: 27.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for musubito-0.1.1.tar.gz
Algorithm Hash digest
SHA256 8821a084f66db7103b73cdca00aff3c373e5c02286a3aab58f0b26aa3589a8a3
MD5 5a13c908e8033bcb731cdbc924f12d00
BLAKE2b-256 77a55916c63724f1b1cd1005d938dc08034958d4a427e302524760fc0ab59402

See more details on using hashes here.

File details

Details for the file musubito-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: musubito-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 28.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for musubito-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 78dbd3f91dad0dc84a7f13f2c76df1d4e6ab90905616423689a97cc0ff5d16c5
MD5 d831f3386fb0258b55c36c75277b6316
BLAKE2b-256 cd890df9979eff9fc79e2865e1cfbae4365cf5219f8aaa58181e4c6c10fee453

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page