Skip to main content

Mycelium runtime — failure prevention for AI agents (AF-006, AF-004, AF-002)

Project description

Mycelium runtime

Runtime failure prevention for AI agents.

PyPI: pip install mycelium-runtimeimport: from mycelium import ...

This directory is the publishable package. Only mycelium/ ships on PyPI; everything else here is for development and docs.

Path In PyPI wheel?
mycelium/ Yes (code + templates/*.yaml)
mycelium/templates/ Yes — use mycelium init to write into your project
tests/ No — run with pytest tests/
examples/ No — see mycelium init
README.md PyPI project page only (not inside wheel)
pyproject.toml, uv.lock No

v1.1 ships three failure modes: context corruption (AF-006), tool boundary enforcement (AF-004), and action traceability prevention (AF-002).

AF-002 is not an observability platform. The failure mode is called "observability black hole" because agents act without durable records. Mycelium prevents that — via idempotency ledgers, state flush on cancel, and signed receipts — not via traces, spans, or dashboards. For post-hoc tracing, use Langfuse, Helicone, or Opik alongside Mycelium.

Install

Requires Python 3.10+ (3.11+ recommended).

pip install mycelium-runtime
mycelium init              # writes ./mycelium.yaml (full annotated template)
mycelium init --minimal    # smaller starter config
# local dev from repo:
pip install -e ./sdk

Quickstart — AF-006

from mycelium import protect, Session

@protect(entity_param="customer_id", ttl=60)
async def fetch_customer(customer_id: str) -> dict:
    return await db.get(customer_id)

async def handle_request(customer_id: str):
    async with Session():
        return await fetch_customer(customer_id=customer_id)

Sync tools (CrewAI, Smolagents):

from mycelium import protect_sync, Session

@protect_sync(entity_param="customer_id", ttl=60)
def fetch_customer(customer_id: str) -> dict:
    return db.get(customer_id)

with Session():
    customer = fetch_customer(customer_id="c1")

What @protect / protect_sync / Session do

  • @protect / protect_sync — TTL cache with per-entity keys; auto-refetch when stale; clear on error
  • Session — one cache per agent run; use in production to prevent cross-request leakage

MessageValidator

Run before each LLM call to catch broken transcripts:

from mycelium import MessageValidator

messages = MessageValidator().repair(messages)  # auto-fix what it can
# or
messages = MessageValidator().validate(messages)  # raise on first issue

Catches orphan tool results, duplicate tool-call IDs, invalid roles, and related serialization bugs.

HistoryGuard

Run before each LLM call to catch oversized or corrupted history:

from mycelium import HistoryGuard

guard = HistoryGuard(max_tokens=100_000)
messages = guard.validate(messages)
guard.check_for_drops(processed_messages)  # after framework trimming

Raises on token overflow, message count limits, duplicate turns, and silent message drops.

Quickstart — AF-004

from mycelium import bounded, ToolRegistry, ToolRunner

FETCH_CUSTOMER_SCHEMA = {
    "customer_id": {"type": "string", "required": True, "pattern": r"^c\d+$"},
}

CUSTOMER_RECORD_SCHEMA = {
    "customer_id": {"type": "string", "required": True},
    "name": {"type": "string", "required": True},
}

registry = ToolRegistry(allowed=["fetch_customer"])

@registry.register
@bounded(
    schema=FETCH_CUSTOMER_SCHEMA,
    output_schema=CUSTOMER_RECORD_SCHEMA,
    allowed_paths=["/workspace/src/"],
)
async def fetch_customer(customer_id: str) -> dict:
    return await db.get(customer_id)

runner = ToolRunner(registry=registry)
result = await runner.call(fetch_customer, customer_id="c1")

Sync tools:

from mycelium import bounded_sync

@bounded_sync(schema=FETCH_CUSTOMER_SCHEMA)
def fetch_customer(customer_id: str) -> dict:
    return db.get(customer_id)

Field spec keys: type (string, integer, number, boolean), required, pattern, min_length, max_length. You pass plain dicts — Mycelium validates internally; no Pydantic imports in your code.

What @bounded / bounded_sync do

  • @bounded / bounded_sync — validate tool args against your field spec before the function runs
  • output_schema — validate the return value after the function runs; bad results are not propagated
  • allowed_paths / entity_pattern — user-defined scope gates (path prefixes, entity ID format)
  • On failure, raises ToolBoundaryError with llm_message for the agent loop — does not retry by itself

ToolRegistry

Run before dispatch to enforce which tools this agent may call:

from mycelium import ToolRegistry

registry = ToolRegistry(allowed=["search_docs", "summarize"])
registry.validate_call("fetch_customer")  # raises ToolBoundaryError

Blocks calls to tools outside the developer-defined allowlist.

ToolRunner

Run around @bounded tools when you want automatic retries:

from mycelium import ToolRunner

runner = ToolRunner(registry=registry, max_llm_retries=2, max_tool_retries=3)

result, messages = await runner.run_with_llm_retry(
    fetch_customer,
    messages=messages,
    tool_call_id="call_1",
    kwargs={"customer_id": "c1"},
    invoke_llm=llm.ainvoke,
    parse_tool_kwargs=extract_tool_args,
)
  • Input, allowlist, and scope failures → append tool error to messages → LLM retry
  • Output failures → retry the tool up to max_tool_retries → then LLM retry
  • Raises ToolBoundaryExhaustedError when retries are used up

Quickstart — AF-002 (action traceability prevention)

AF-002 prevents the "observability black hole" failure class: duplicate side effects, lost state on cancel, and actions that can't be audited. This is not distributed tracing — see the note at the top of this README.

Tool-level idempotency

from mycelium import ledger_sync

@ledger_sync()
def send_payment(amount: float, recipient: str) -> dict:
    return gateway.charge(amount, recipient)

# Same logical call executes only once.
send_payment(amount=100.0, recipient="acct_123", request_id="invoice-42")
send_payment(amount=100.0, recipient="acct_123", request_id="invoice-42")

Async tools:

from mycelium import ledger

@ledger()
async def send_payment(amount: float, recipient: str) -> dict:
    return await gateway.charge(amount, recipient)

What @ledger / ledger_sync do

  • Record every tool invocation in a durable ActionLedger
  • Deduplicate retries and redispatches by request_id or LLM tool_call_id
  • Allow legitimate repeats when the request id differs
  • Persist failed attempts for audit and debugging

Storage backends:

Backend Use case YAML storage
memory Single process, tests memory (default)
file Local dev, single host (fcntl lock) file + path
redis Multi-worker, in-flight TTL redis + url or url_env
postgres Audit/compliance, durable SQL postgres + dsn or dsn_env
from mycelium import ActionLedger, FileLedgerStorage, InMemoryLedgerStorage
from mycelium import RedisLedgerStorage, PostgresLedgerStorage

ledger = ActionLedger(storage=InMemoryLedgerStorage())
ledger = ActionLedger(storage=FileLedgerStorage("./mycelium-ledger.json"))
ledger = ActionLedger(storage=RedisLedgerStorage("redis://localhost:6379/0"))
ledger = ActionLedger(storage=PostgresLedgerStorage("postgresql://localhost/mycelium"))

Optional extras: pip install 'mycelium-runtime[redis]' or pip install 'mycelium-runtime[postgres]'.

Quickstart — AF-002 task-level ledger

Stop entire tasks from re-running on framework-level retries:

from mycelium import task_ledger_sync

@task_ledger_sync()
def process_invoice(invoice_id: str) -> dict:
    customer = fetch_customer(customer_id=...)
    payment = send_payment(...)
    return {"invoice_id": invoice_id, "status": "paid"}

# Framework retries the task with the same task_id
process_invoice(invoice_id="inv-42", task_id="invoice-42")  # executes
process_invoice(invoice_id="inv-42", task_id="invoice-42")  # returns stored result

Use id_from to derive the task id from business keys automatically:

@task_ledger_sync(id_from=["invoice_id"])
def process_invoice(invoice_id: str, amount: float) -> dict:
    ...

# Both calls map to the same task id because invoice_id is the same.
process_invoice(invoice_id="inv-42", amount=100.0)
process_invoice(invoice_id="inv-42", amount=200.0)  # returns first result

Correction retries

If a completed task produced a bad result and the LLM/agent needs to re-attempt it, use a new task id. The framework will normally generate fresh tool call ids for the new attempt, so the task re-executes cleanly.

r1 = process_invoice(invoice_id="inv-42", task_id="invoice-42-attempt-1")  # bad result
r2 = process_invoice(invoice_id="inv-42", task_id="invoice-42-attempt-2")  # fresh attempt

YAML configuration

Separate sections per failure mode. Global AF-002 settings inherit into tools/tasks so you do not repeat storage paths on every function.

Minimum integration (3 steps):

# mycelium.yaml — global sections (configure once)
action_ledger:
  storage: file
  path: ./mycelium-ledger.json
  tools: [send_payment]          # auto-ledger side-effect tools

task_ledger:
  storage: file
  path: ./mycelium-task-ledger.json
  tasks: [process_invoice]

state_flush:
  storage: file
  path: ./mycelium-state.json

audit_receipt:
  agent_id: my-agent
  signing_key_env: MYCELIUM_SIGNING_KEY
  storage: file
  path: ./mycelium-receipts.jsonl

# Per-tool: only what differs (schemas, cache, etc.)
tools:
  fetch_customer:
    protect: {entity_param: customer_id, ttl: 60}
    bounded:
      schema:
        customer_id: {type: string, required: true, pattern: "^c\\d+$"}

  send_payment:
    bounded:
      schema:
        amount: {type: number, required: true}
        recipient: {type: string, required: true}

tasks:
  process_invoice:
    ledger: true
    id_from: [invoice_id]

registry:
  auto: true                     # allowlist = all configured tools

history_guard:
  max_tokens: 100000

message_validator:
  enabled: true
from mycelium import load_config
import my_tools

config = load_config("mycelium.yaml")
tools = config.instrument(my_tools)   # one call wraps tools + tasks

with config.run(thread_id):
    messages = config.prepare_messages(messages)  # AF-006 + auto state flush
    ...

ledger: true inherits from action_ledger / task_ledger. When audit_receipt is configured with auto: true (default), all ledgered tools/tasks get signed receipts automatically.

Legacy per-tool style still works — run mycelium init for the full annotated template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mycelium_runtime-1.1.0.tar.gz (76.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mycelium_runtime-1.1.0-py3-none-any.whl (45.4 kB view details)

Uploaded Python 3

File details

Details for the file mycelium_runtime-1.1.0.tar.gz.

File metadata

  • Download URL: mycelium_runtime-1.1.0.tar.gz
  • Upload date:
  • Size: 76.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for mycelium_runtime-1.1.0.tar.gz
Algorithm Hash digest
SHA256 754a40fb8aa0388de8d4ed687087bcd8ff6562f4664ede169e010234903364f6
MD5 8c6839f3c55358d6f46867aff082f92a
BLAKE2b-256 75963ea8004d98d0bf417813a65f331426f94f89e9c1e1f51de16d5a8225b352

See more details on using hashes here.

Provenance

The following attestation bundles were made for mycelium_runtime-1.1.0.tar.gz:

Publisher: publish.yml on mycelium-labs/mycelium

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mycelium_runtime-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mycelium_runtime-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 597da0785b2ed4f7bfd511179b9dc944b1bf440479169654d8d1013bc3bfb206
MD5 b274fa8aac7e81936796b0c51af47714
BLAKE2b-256 2329c0a1d5239b9b1964fcce67357317082c95b90711c2ae14bb573a28f5d64c

See more details on using hashes here.

Provenance

The following attestation bundles were made for mycelium_runtime-1.1.0-py3-none-any.whl:

Publisher: publish.yml on mycelium-labs/mycelium

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page