Deterministic Python runtime for relational execution lineage DAGs.
Project description
Musubito
Musubito records execution lineage so agentic LLM workflows can safely skip redundant expensive calls instead of recomputing the same DAG steps.
Why Musubito?
LLM-heavy pipelines often re-run the same expensive steps because the runtime has no durable memory of what was executed, which inputs were used, and which upstream results contributed to the output.
Musubito gives each execution node a deterministic identity derived from the operation name, canonical input hash, and sorted upstream node IDs. If the same logical node is reached again and its replay policy allows reuse, Musubito returns the stored artifact instead of executing the function again.
Lineage is stored locally in SQLite, so replay decisions are fast, deterministic, and inspectable without requiring a remote service.
Fan-in DAG patterns are first-class: musubito_merge() lets an aggregate step explicitly depend on multiple upstream MusubitoResult[T] values.
Install
pip install musubito
Core Concepts
A node is one recorded execution of a decorated function. Its identity is computed from three things: the operation name, the canonical hash of the function inputs, and the sorted set of upstream node IDs. This makes node identity stable across runs when the logical computation is the same.
Replay means Musubito returns a previously stored artifact instead of calling the decorated function again. Replay is allowed when the stored node is successful, not stale, not forced to re-execute, and any configured TTL has not expired.
StepType.DETERMINISTIC marks work that is safe to replay freely, such as pure transformations or deterministic parsers.
StepType.STOCHASTIC marks work that may produce different outputs, such as LLM calls. It can still be replayed intentionally, often with a TTL to bound how long the stored result remains valid.
StepType.EXTERNAL_EFFECT marks work with side effects, such as network calls or tool invocations. When replay is allowed, Musubito returns the stored artifact; it does not re-run the external effect. Use force_reexecution=True when the effect must be issued again.
An Artifact is the persisted output of a node. MusubitoResult[T] points to that artifact and carries both the current DAG node ID and the historical producer node ID.
musubito_merge() declares an explicit multi-parent context. It is used when one step depends on multiple previous MusubitoResult[T] values, making fan-in DAG edges visible to the lineage engine.
Quickstart
import asyncio
from musubito import MusubitoResult, musubito_merge, musubito_step
calls = {
"fetch_context": 0,
"fetch_policy": 0,
"summarize": 0,
}
@musubito_step()
async def fetch_context(topic: str) -> dict[str, str]:
calls["fetch_context"] += 1
return {"topic": topic, "source": "local"}
@musubito_step()
async def fetch_policy(topic: str) -> dict[str, str]:
calls["fetch_policy"] += 1
return {"topic": topic, "policy": "strict-replay"}
@musubito_step()
def summarize(
context: MusubitoResult[dict[str, str]],
policy: MusubitoResult[dict[str, str]],
) -> dict[str, str]:
calls["summarize"] += 1
return {
"topic": context.value["topic"],
"source": context.value["source"],
"policy": policy.value["policy"],
}
async def main(label: str) -> None:
context, policy = await asyncio.gather(
fetch_context("lineage"),
fetch_policy("lineage"),
)
with musubito_merge(context, policy):
result = summarize(context, policy)
print(label, result.value)
print(label, calls)
asyncio.run(main("first run"))
asyncio.run(main("second run"))
# Assumes a fresh .musubito/musubito.db in the current working directory.
# The second asyncio.run(...) replays from SQLite in the same Python process
# and does not re-execute the decorated functions, so each counter remains at 1.
Decorated functions return MusubitoResult[T]. The raw user value is available through .value; the wrapper is not unwrapped automatically.
Step Configuration
The default configuration is deterministic and requires no extra setup:
from musubito import musubito_step
@musubito_step()
def normalize(text: str) -> str:
return text.strip().lower()
For stochastic work, such as an LLM call, use a TTL when the cached answer should only be reused for a bounded time:
from musubito import StepConfiguration, StepType, musubito_step
llm_semantics = StepConfiguration(
step_type=StepType.STOCHASTIC,
ttl_seconds=3600,
)
@musubito_step(semantics=llm_semantics)
def draft_answer(prompt: str) -> str:
return prompt.upper()
To force a stochastic step to run again, set force_reexecution=True:
from musubito import StepConfiguration, StepType, musubito_step
fresh_semantics = StepConfiguration(
step_type=StepType.STOCHASTIC,
force_reexecution=True,
)
@musubito_step(semantics=fresh_semantics)
def generate_fresh_answer(prompt: str) -> str:
return prompt.upper()
For external effects, Musubito stores and returns the artifact when replay is allowed. The side effect itself is not repeated unless force_reexecution=True is used:
from musubito import StepConfiguration, StepType, musubito_step
external_semantics = StepConfiguration(
step_type=StepType.EXTERNAL_EFFECT,
)
@musubito_step(semantics=external_semantics)
def call_external_tool(payload: dict[str, str]) -> dict[str, str]:
return {"status": "recorded", "id": payload["id"]}
Using a Custom Engine
Use use_musubito_engine() when you want explicit control over the storage path or engine instance:
from musubito import (
MusubitoEngine,
SQLiteStorage,
musubito_step,
use_musubito_engine,
)
@musubito_step()
def summarize_text(text: str) -> str:
return text.upper()
with SQLiteStorage(db_path=".my_run/run.db") as storage:
engine = MusubitoEngine(storage)
with use_musubito_engine(engine):
result = summarize_text("Musubito records deterministic lineage.")
print(result.value)
Storage
Musubito uses SQLite as its local relational storage layer. By default, it stores runtime data under:
.musubito/musubito.db
The path can be customized with SQLiteStorage(db_path=...).
SQLite is opened in WAL mode and uses short BEGIN IMMEDIATE write transactions for node, artifact, edge, and invalidation updates. This keeps local concurrent writes predictable while still allowing normal reads.
Downstream invalidation is performed in place with a recursive CTE. When a node output changes, dependent downstream nodes can be marked stale so future runs recompute only the affected part of the DAG.
Use Cases
- Agentic LLM pipelines with repeated planning, tool, or synthesis steps.
- Multi-step RAG workflows where retrieval, filtering, and summarization can be replayed.
- CI regression workflows that call expensive tools or model-based checks.
- Long-running research agents that need durable local execution memory.
- Reproducible evaluation pipelines with explicit lineage and cache boundaries.
License
Musubito is dual-licensed:
- Open Source: AGPL-3.0-or-later — free for open-source projects. See LICENSE.
- Commercial: closed-source or proprietary use requires a separate license. See COMMERCIAL_LICENSE.md or contact softwaretamrsv@gmail.com.
Open-source projects and personal use: AGPL-3.0. Closed-source or commercial products: commercial license required.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file musubito-0.1.1.tar.gz.
File metadata
- Download URL: musubito-0.1.1.tar.gz
- Upload date:
- Size: 27.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8821a084f66db7103b73cdca00aff3c373e5c02286a3aab58f0b26aa3589a8a3
|
|
| MD5 |
5a13c908e8033bcb731cdbc924f12d00
|
|
| BLAKE2b-256 |
77a55916c63724f1b1cd1005d938dc08034958d4a427e302524760fc0ab59402
|
File details
Details for the file musubito-0.1.1-py3-none-any.whl.
File metadata
- Download URL: musubito-0.1.1-py3-none-any.whl
- Upload date:
- Size: 28.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78dbd3f91dad0dc84a7f13f2c76df1d4e6ab90905616423689a97cc0ff5d16c5
|
|
| MD5 |
d831f3386fb0258b55c36c75277b6316
|
|
| BLAKE2b-256 |
cd890df9979eff9fc79e2865e1cfbae4365cf5219f8aaa58181e4c6c10fee453
|