Skip to main content

Context engineering library for AI agents: decorator-based workspace storage for large tool results

Project description

ctxtual

PyPI version Python 3.11+ License: MIT Tests

The context engineering library for AI agents.

Stop truncating tool results. Start engineering context.

pip install ctxtual

Table of Contents


The Problem You Already Have

Your agent calls a tool. The tool returns 10,000 results. The LLM context window fits 200. You truncate to 200, and the agent misses the answer buried in result #8,432.

Every production agent team builds the same workaround: store the data somewhere, give the agent tools to explore it. ctxtual is that workaround, extracted into a library.

# Before: raw data floods the context window
def search_papers(query: str) -> list[dict]:
    return database.search(query)  # 10,000 results → LLM chokes

# After: data is stored, agent gets a map
@forge.producer(workspace_type="papers", toolsets=[paginator(forge), search, filters])
def search_papers(query: str) -> list[dict]:
    return database.search(query)  # 10,000 results → stored in workspace
    # Agent receives:
    # {
    #   "workspace_id": "papers_f3a8bc12",
    #   "item_count": 10000,
    #   "data_shape": "list",
    #   "available_tools": ["papers_paginate(...)", "papers_search(...)"],
    #   "next_steps": ["• papers_paginate: Return a page of items...", ...]
    # }

The agent then explores with surgical precision — paginating, searching, filtering — pulling only what it needs into the context window. The data stays server-side. The agent stays smart.


Why ctxtual

Problem Without ctxtual With ctxtual
Tool returns 10K items Truncate and hope Store, paginate, search on demand
Agent needs to filter by field Load everything, pray it fits filter_by(field="year", value=2024, operator="gte")
Agent needs one specific item Return entire list for index lookup get_item(index=42)
Tool returns large text (HTML, PDF) Dump entire string into context text_content — char-based pagination, in-text search
Multiple tools return overlapping data Duplicate data across messages Workspaces with deterministic IDs, deduplication built-in
Agent crashes mid-conversation All data lost SQLiteStore persists across restarts
LLM calls wrong tool name Unhandled KeyError, agent loop crashes Structured error dict: {"error": ..., "available_tools": [...], "suggested_action": ...}
Multi-agent sharing data Custom IPC Shared store, workspace-level isolation
Framework lock-in Rewrite for every framework Adapters for OpenAI, Anthropic, LangChain

ctxtual is not a framework. It's a library. It doesn't own your agent loop, your LLM calls, or your data. It's the plumbing between "tool returns data" and "agent explores data."


Quick Start

from ctxtual import Forge, MemoryStore
from ctxtual.utils import paginator, text_search, filter_set, pipeline

forge = Forge(store=MemoryStore())

# Wrap your data-fetching function — toolsets get their name from workspace_type
@forge.producer(workspace_type="papers", toolsets=[
    paginator(forge),
    text_search(forge, fields=["title", "abstract"]),
    filter_set(forge),
    pipeline(forge),
])
def search_papers(query: str, limit: int = 10_000) -> list[dict]:
    return database.search(query, limit)

# Agent calls the producer → gets a notification, not 10K items
ref = search_papers("machine learning")
ws_id = ref["workspace_id"]

# Agent explores using consumer tools:
forge.dispatch_tool_call("papers_paginate",  {"workspace_id": ws_id, "page": 0, "size": 10})
forge.dispatch_tool_call("papers_search",    {"workspace_id": ws_id, "query": "transformer"})
forge.dispatch_tool_call("papers_filter_by", {"workspace_id": ws_id, "field": "year", "value": 2024, "operator": "gte"})

# Or: compound operations in ONE call (no round-trips, no intermediate context)
forge.dispatch_tool_call("papers_pipe", {
    "workspace_id": ws_id,
    "steps": [
        {"search": {"query": "transformer"}},
        {"filter": {"year": {"$gte": 2024}}},
        {"sort": {"field": "citations", "order": "desc"}},
        {"limit": 5},
    ],
})

Install

pip install ctxtual     # or: uv add ctxtual

Zero required dependencies. The core library uses only the Python standard library.

Optional extras:

pip install ctxtual[dev]   # pytest, ruff, coverage

Integration adapters (OpenAI, Anthropic, LangChain) are zero-dependency by default — they duck-type against SDK objects. Install the SDK you need separately:

pip install openai              # for ctxtual.integrations.openai
pip install anthropic           # for ctxtual.integrations.anthropic
pip install langchain-core      # for ctxtual.integrations.langchain

Architecture

Your Agent Loop
  LLM ←→ tool_calls ←→ forge.dispatch_tool_call()

Forge (orchestrator)
  @producer / @consumer decorators
  Schema export (OpenAI, Anthropic, LangChain format)
  Dispatch routing — one method handles all tool calls
  Thread-safe (RLock)

ToolSets (consumer tools)
  paginator · text_search · filter_set
  kv_reader · text_content · pipeline
  + your custom domain tools

Store (pluggable backend)
  MemoryStore — fast, in-process, LRU eviction
  SQLiteStore — persistent, FTS5 search, WAL mode
  BaseStore   — subclass for Redis, Postgres, S3, etc.

Data flow:

  1. Producer runs your function, stores the result in a workspace, returns a WorkspaceRef notification to the LLM.
  2. LLM reads the notification, sees available tools, calls them.
  3. Consumer tools (paginate, search, filter, etc.) read from the store and return only the requested slice.
  4. The LLM never sees the full dataset. It sees pages, search results, filtered subsets — exactly what it needs.

Core API

Forge

The central orchestrator. One per application (or per agent session).

from ctxtual import Forge, MemoryStore, SQLiteStore

# In-memory (default) — fast, test-friendly, process-scoped
forge = Forge(store=MemoryStore())

# Persistent — survives process restarts
forge = Forge(store=SQLiteStore("agent.db"))

# With configuration
forge = Forge(
    store=MemoryStore(max_workspaces=500),  # LRU eviction when exceeded
    default_ttl=3600,     # Workspaces expire after 1 hour
    max_items=100_000,    # Reject payloads larger than 100K items
    default_notify=True,  # Producers return dicts (not WorkspaceRef objects)
)

@forge.producer — Store Data, Return a Map

Wraps any function so its return value is stored in a workspace. The agent gets a self-describing notification instead of raw data.

@forge.producer(
    workspace_type="papers",           # Logical data category
    toolsets=[pager, search, filters], # Consumer tools for this data
    key="papers_{query}",              # Deterministic workspace ID (optional)
    transform=lambda r: r[:5000],      # Pre-store transform (optional)
    meta={"source": "arxiv"},          # Metadata attached to workspace
    ttl=1800,                          # Time-to-live in seconds
    notify=True,                       # True=dict, False=WorkspaceRef object
)
def search_papers(query: str) -> list[dict]:
    return external_api.search(query)

key parameter controls workspace identity:

Key Behavior Use case
None (default) Auto UUID: papers_f3a8bc12a0 Every call creates a new workspace
"papers_{query}" Templated from kwargs Idempotent — same args = same workspace (overwritten)
lambda kw: f"ws_{kw['user_id']}" Custom callable Full control over deduplication logic

@forge.consumer — Transform and Derive

Consumers read from one workspace and optionally produce a new one. This enables multi-hop agent pipelines.

@forge.consumer(
    workspace_type="raw_data",               # Input workspace type
    produces="cleaned_data",                 # Output workspace type
    produces_toolsets=[clean_pager],          # Tools for the output
)
def clean_and_filter(workspace_id: str, forge_ctx: ConsumerContext):
    raw = forge_ctx.get_items()
    cleaned = [normalize(item) for item in raw if item["quality"] > 0.8]
    return forge_ctx.emit(cleaned, meta={"derived_from": workspace_id})

ConsumerContext is injected automatically. It provides:

Method Description
get_items(workspace_id=None) Read items from the input workspace (or any workspace)
emit(payload, *, workspace_type, meta, ttl) Store derived data as a new workspace, return WorkspaceRef dict
store Direct access to the store backend

forge.dispatch_tool_call() — Single Entry Point

Route any tool call — producers and consumers — through one method. This is what your agent loop calls.

result = forge.dispatch_tool_call("papers_search", {"workspace_id": ws_id, "query": "attention"})

Always returns a value, never raises on tool errors:

# Success → tool result (any type)
{"matches": [...], "total_matches": 42}

# Unknown tool → structured error dict
{"error": "Tool 'bad_name' not found.", "available_tools": [...], "suggested_action": "..."}

# Workspace not found → structured error with available workspaces
{"error": "Workspace 'ws_bad' not found.", "available_workspaces": ["ws_real"], "suggested_action": "..."}

This means your agent loop needs zero try/except for tool calls. The LLM reads the error and self-corrects.

Schema Export

Export tool schemas in OpenAI function-calling format. Pass them to any LLM that supports tool use.

# All tools (producers + consumers)
tools = forge.get_tools()

# Only producer tools
producers = forge.get_producer_schemas()

# All consumer tools (optionally scoped to a workspace)
consumers = forge.get_all_tool_schemas(workspace_id="papers_abc")

Schemas include:

  • Rich parameter descriptions extracted from docstrings (Google-style and Sphinx-style)
  • Proper JSON Schema types for Optional, Union, Literal, list[X], dict[K,V], enums
  • Well-known parameter descriptions for common names like workspace_id, page, query
  • Examples on complex parameters — pipeline steps and metrics include concrete JSON examples that help LLMs construct valid queries
  • schema_extra support — pass schema_extra={"param": {"minimum": 0}} to @ts.tool() to enrich any parameter's schema

Built-in ToolSets

These cover 90% of what agents need. Import them, pass to @producer, done. The name parameter is optional — when omitted, the toolset inherits its name from the producer's workspace_type:

# Simple — name inferred from workspace_type:
@forge.producer(workspace_type="papers", toolsets=[paginator(forge), text_search(forge)])
def fetch(query): ...

# Explicit — still works for advanced use:
pager = paginator(forge, "papers")

paginator(forge, name) — List Navigation

from ctxtual.utils import paginator
pager = paginator(forge, "papers")  # data_shape="list"
Tool Description
{name}_paginate(workspace_id, page=0, size=10) Page of items + metadata (total, has_next, has_prev)
{name}_count(workspace_id) Total item count
{name}_get_item(workspace_id, index) Single item by zero-based index
{name}_get_slice(workspace_id, start=0, end=20) Arbitrary slice

text_search(forge, name, *, fields=None) — Full-Text Search

from ctxtual.utils import text_search
search = text_search(forge, "papers", fields=["title", "abstract"])  # data_shape="list"
Tool Description
{name}_search(workspace_id, query, max_results=20, case_sensitive=False) BM25-ranked full-text search (FTS5 on SQLiteStore, TF scoring on MemoryStore)
{name}_field_values(workspace_id, field, max_values=50) Distinct values for a field (facet discovery)

filter_set(forge, name) — Structured Filtering

from ctxtual.utils import filter_set
filters = filter_set(forge, "papers")  # data_shape="list"
Tool Description
{name}_filter_by(workspace_id, field, value, operator="eq") Filter by field. Operators: eq, ne, lt, lte, gt, gte, contains, startswith
{name}_sort_by(workspace_id, field, descending=False, limit=100) Sort by field

kv_reader(forge, name) — Dict Workspaces

For single-document workspaces (config, metadata, API responses that are dicts, not lists).

from ctxtual.utils import kv_reader
kv = kv_reader(forge, "config")  # data_shape="dict"
Tool Description
{name}_get_keys(workspace_id) List top-level keys
{name}_get_value(workspace_id, key) Read value at key

text_content(forge, name) — Raw Text / Scalar Navigation

For producers that return a string (HTML page, PDF text, log file, API response body). Instead of stuffing the entire string into context, the agent navigates it with character-based pagination and search.

from ctxtual.utils import text_content
reader = text_content(forge, "page")  # data_shape="scalar"
Tool Description
{name}_read_page(workspace_id, page=0, chars_per_page=4000) Character-based page with has_next/has_prev, word count, navigation hints
{name}_search_in_text(workspace_id, query, context_chars=100, max_results=20) Find occurrences with surrounding context, character offsets
{name}_get_length(workspace_id) Character, word, and line counts

Example — webpage producer:

@forge.producer(workspace_type="page", toolsets=[reader])
def read_webpage(url: str) -> str:
    return requests.get(url).text  # Could be 100KB of HTML

# Agent receives WorkspaceRef with item_count=1, data_shape="scalar"
# and tools: page_read_page, page_search_in_text, page_get_length

Text Transforms — Convert Strings to Structured Data

When you want to use list-based tools (paginator, filter, pipeline) with text content, transform the string before storing:

from ctxtual import chunk_text, split_sections, split_markdown_sections

# Fixed-size overlapping chunks (good for embeddings, RAG)
@forge.producer(workspace_type="chunks", toolsets=[pager, search])
def ingest_document(path: str) -> list[dict]:
    text = open(path).read()
    return chunk_text(text, chunk_size=2000, overlap=200)
    # → [{"chunk_index": 0, "text": "...", "char_offset": 0}, ...]

# Split by blank lines (paragraphs)
@forge.producer(workspace_type="paragraphs", toolsets=[pager])
def split_doc(text: str) -> list[dict]:
    return split_sections(text, separator="\n\n")
    # → [{"section_index": 0, "text": "First paragraph..."}, ...]

# Split by Markdown headers
@forge.producer(workspace_type="sections", toolsets=[pager, search])
def parse_markdown(content: str) -> list[dict]:
    return split_markdown_sections(content)
    # → [{"section_index": 0, "heading": "Introduction", "level": 1, "text": "..."}, ...]

All transforms pass non-strings through unchanged, so they're safe in pipelines that might receive mixed data.

pipeline(forge, name) — Declarative Data Pipelines

The most powerful built-in. Instead of the LLM making 4+ round-trips (search → filter → sort → limit), it describes the entire chain in one tool call. Intermediate data stays server-side — only the final result enters context.

This is ctxtual's answer to programmatic tool calling: compound operations in a single step, framework-agnostic, works with any LLM.

from ctxtual.utils import pipeline
pipe = pipeline(forge, "papers")  # data_shape="list"
Tool Description
{name}_pipe(workspace_id, steps, save_as=None) Chain operations declaratively. save_as stores the result as a new browsable workspace.
{name}_aggregate(workspace_id, group_by=None, metrics=None) Group-by aggregation with computed statistics

Pipeline operations (each step is {op_name: params}):

Operation Example Description
filter {"filter": {"year": {"$gte": 2023}}} MongoDB-style conditions: $gt, $gte, $lt, $lte, $ne, $in, $nin, $contains, $startswith, $regex, $exists. Logical: $or, $and, $not. Dot notation: "author.name"
search {"search": {"query": "ML", "fields": ["title"]}} Case-insensitive text search across all or specified fields
sort {"sort": {"field": "score", "order": "desc"}} Single or multi-field sort. Multi: [{"field": "year"}, {"field": "score", "order": "desc"}]
select {"select": ["title", "author"]} Keep only these fields
exclude {"exclude": ["raw_blob"]} Remove these fields
limit {"limit": 10} First N items
skip {"skip": 5} Skip first N items
slice {"slice": [5, 15]} Items [5:15]
sample {"sample": {"n": 10, "seed": 42}} Random sample (reproducible with seed)
unique {"unique": "author"} Deduplicate by field, keep first
flatten {"flatten": "tags"} Expand list fields into multiple items
group_by {"group_by": {"field": "category", "metrics": {"n": "count", "avg": "mean:score"}}} Aggregation. Metrics: count, sum:f, mean:f, min:f, max:f, median:f, stddev:f, values:f
count {"count": true} Terminal: return {"count": N}

Example — what would take 4 tool calls in 1:

result = forge.dispatch_tool_call("papers_pipe", {
    "workspace_id": wid,
    "steps": [
        {"search": {"query": "neural networks"}},
        {"filter": {"year": {"$gte": 2023}}},
        {"sort": {"field": "citations", "order": "desc"}},
        {"limit": 5},
        {"select": ["title", "author", "citations"]},
    ],
})
# → {"items": [...], "count": 5}  — one call, zero intermediate context

Example — tag frequency analysis (flatten → group → sort):

result = forge.dispatch_tool_call("papers_pipe", {
    "workspace_id": wid,
    "steps": [
        {"flatten": "tags"},
        {"group_by": {"field": "tags", "metrics": {"count": "count"}}},
        {"sort": {"field": "count", "order": "desc"}},
        {"limit": 10},
    ],
})

Example — save pipeline result as a new workspace:

result = forge.dispatch_tool_call("papers_pipe", {
    "workspace_id": wid,
    "steps": [{"filter": {"author": "Alice"}}, {"sort": {"field": "year"}}],
    "save_as": "alice_papers",  # stored as a new workspace
})
# LLM can then paginate "alice_papers" with papers_paginate

Custom ToolSets

When built-ins aren't enough, create domain-specific tools:

analytics = forge.toolset("transactions")
analytics.data_shape = "list"  # Validates payload shape at both produce-time and tool-time

@analytics.tool(
    name="transactions_anomalies",
    output_hint="Investigate flagged transactions with transactions_get_item(workspace_id='{workspace_id}', index=INDEX).",
)
def detect_anomalies(workspace_id: str, std_threshold: float = 2.0) -> dict:
    """Flag transactions with amounts that deviate significantly from the mean.

    Args:
        workspace_id: The transactions workspace to analyze.
        std_threshold: Number of standard deviations to flag as anomaly.
    """
    items = analytics.store.get_items(workspace_id)
    # ... your domain logic ...
    return {"anomalies": flagged, "count": len(flagged)}

output_hint is appended to the tool result as a _hint field, making every tool self-describing. The {workspace_id} placeholder is replaced at runtime. The result envelope is always {"result": <original>, "_hint": "<hint>"} — the original return shape is never mutated.

data_shape validation:

  • Set data_shape="list" on toolsets that expect list data (paginator, search, filter, pipeline)
  • Set data_shape="dict" on toolsets that expect dict data (kv_reader)
  • Set data_shape="scalar" on toolsets that expect raw strings (text_content)
  • At producer time: logs a warning if the payload shape doesn't match
  • At tool-call time: returns a structured error dict with expected_shape, actual_shape, suggested_action
  • Shape-aware WorkspaceRef: incompatible tools are automatically filtered from available_tools — if a producer returns a string, the LLM only sees text_content tools, not paginator tools

Storage Backends

MemoryStore

from ctxtual import MemoryStore

store = MemoryStore(max_workspaces=500)  # Optional LRU eviction

Fast, thread-safe (RLock), zero dependencies. Data lives in process memory.

SQLiteStore

from ctxtual import SQLiteStore

store = SQLiteStore("agent.db")          # Persistent file
store = SQLiteStore(":memory:")          # In-memory SQLite (for testing)

Per-row storage in cf_items table enables SQL-pushed queries — LIMIT/OFFSET, json_extract WHERE, ORDER BY all happen at the SQL level, not in Python. Full-text search uses FTS5 with Porter stemming and bm25() ranking.

Schema:

  • cf_meta — workspace metadata (one row per workspace)
  • cf_items — per-item storage (one row per item per workspace)
  • cf_fts — FTS5 virtual table for full-text search
  • Automatic migration on open (adds new columns to old databases)

Custom Backends

Implement BaseStore for Redis, Postgres, S3, or any storage:

from ctxtual import BaseStore
from ctxtual.types import WorkspaceMeta

class RedisStore(BaseStore):
    # Required — implement these 7 methods:
    def init_workspace(self, meta: WorkspaceMeta) -> None: ...
    def drop_workspace(self, workspace_id: str) -> None: ...
    def get_meta(self, workspace_id: str) -> WorkspaceMeta | None: ...
    def list_workspaces(self, workspace_type: str | None = None) -> list[str]: ...
    def set_items(self, workspace_id: str, payload: Any) -> None: ...
    def get_items(self, workspace_id: str) -> Any: ...
    def set(self, workspace_id: str, key: str, value: Any) -> None: ...
    def get(self, workspace_id: str, key: str, default: Any = None) -> Any: ...

    # Optional — override for performance (defaults iterate over get_items):
    def count_items(self, workspace_id: str) -> int: ...
    def get_page(self, workspace_id: str, offset: int, limit: int) -> list: ...
    def search_items(self, workspace_id, query, *, fields=None, max_results=20, case_sensitive=False) -> list: ...
    def filter_items(self, workspace_id, field, value, operator="eq") -> list: ...
    def sort_items(self, workspace_id, field, *, descending=False, limit=100) -> list: ...

    # Optional — override for mutation support:
    def append_items(self, workspace_id: str, new_items: list) -> int: ...
    def update_item(self, workspace_id: str, index: int, item: Any) -> None: ...
    def patch_item(self, workspace_id: str, index: int, fields: dict) -> None: ...
    def delete_items(self, workspace_id: str, indices: list[int]) -> int: ...

All query and mutation methods have default implementations that work on any store — override them only for performance.


Workspace Mutations

Workspaces are not read-only. Agents can modify data in place:

store = forge.store

# Append new items to an existing workspace
store.append_items("tasks_sprint_1", [{"title": "New task", "status": "todo"}])

# Update an item by index (full replacement)
store.update_item("tasks_sprint_1", 0, {"title": "Updated", "status": "done"})

# Patch specific fields on an item (merge, not replace)
store.patch_item("tasks_sprint_1", 2, {"status": "in_progress"})

# Delete items by indices
store.delete_items("tasks_sprint_1", [5, 6, 7])

All mutations maintain FTS index consistency on SQLiteStore.


Framework Integrations

OpenAI

from ctxtual.integrations.openai import to_openai_tools, has_tool_calls, handle_tool_calls

# Get tool schemas
tools = to_openai_tools(forge)

# In your agent loop
response = client.chat.completions.create(model="gpt-5-mini", messages=messages, tools=tools)

if has_tool_calls(response):
    # Dispatch all tool calls, get tool-result messages
    tool_messages = handle_tool_calls(forge, response)
    messages.append(response.choices[0].message)
    messages.extend(tool_messages)

Anthropic

from ctxtual.integrations.anthropic import to_anthropic_tools, has_tool_use, handle_tool_use

tools = to_anthropic_tools(forge)  # Anthropic's flat schema format

response = client.messages.create(model="claude-sonnet-4.6", tools=tools, messages=messages)

if has_tool_use(response):
    tool_results = handle_tool_use(forge, response)  # Returns tool_result content blocks
    messages.append({"role": "assistant", "content": response.content})
    messages.append({"role": "user", "content": tool_results})

LangChain

from ctxtual.integrations.langchain import to_langchain_tools

# Returns list of StructuredTool instances — plug into any LangChain agent
tools = to_langchain_tools(forge)
agent = create_react_agent(llm, tools)

All adapters are zero-hard-dependency — they duck-type against SDK objects and raw dicts. Install the SDK you need; the adapter works with whatever version you have.


Concurrency & Thread Safety

ctxtual is designed for production web servers (FastAPI, Django, Flask) serving concurrent agent sessions:

  • Forgethreading.RLock protects all registration dicts
  • MemoryStorethreading.RLock protects all data access; get_meta() returns deep copies
  • SQLiteStore — per-thread connections, threading.RLock, WAL journal mode
  • Transactionsstore.transaction() context manager for atomic multi-step operations (nesting supported in SQLiteStore)
# Safe: one Forge instance, many concurrent requests
forge = Forge(store=MemoryStore(max_workspaces=1000), default_ttl=1800)

@app.post("/search")
async def search(query: str):
    return search_papers(query=query)  # Thread-safe

@app.get("/explore/{workspace_id}")
async def explore(workspace_id: str, page: int = 0):
    return forge.dispatch_tool_call("papers_paginate", {"workspace_id": workspace_id, "page": page})

Error Recovery

LLMs make mistakes. ctxtual never crashes — it teaches the LLM to self-correct.

LLM Mistake Response
Calls unknown tool {"error": "Tool 'bad_name' not found.", "available_tools": [...], "suggested_action": "Use one of the available tools."}
Wrong workspace_id {"error": "Workspace 'ws_bad' not found.", "available_workspaces": ["ws_real"], "suggested_action": "Use one of the available workspaces."}
Workspace expired {"error": "Workspace 'ws_old' has expired.", "suggested_action": "Call load_papers() to create a fresh workspace."}
Type mismatch {"error": "...", "expected_type": "papers", "actual_type": "employees", "workspaces_of_correct_type": [...]}
Shape mismatch {"error": "This tool expects 'list' data but workspace contains 'dict' data.", "suggested_action": "Use get_keys/get_value instead."}
Index out of range {"error": "Index 999 out of range.", "valid_range": "0–42", "total_items": 43, "suggested_action": "..."}
Missing dict key {"error": "Key 'bad' not found.", "available_keys": ["host", "port"], "suggested_action": "..."}

Every error includes suggested_action — the LLM reads it and knows exactly what to do next.


Advanced Patterns

Deterministic Workspace IDs (Idempotency)

@forge.producer(workspace_type="inventory", toolsets=[pager], key="inv_{warehouse}")
def sync_inventory(warehouse: str) -> list:
    return fetch_from_wms(warehouse)

sync_inventory(warehouse="us-east")  # Creates workspace "inv_us-east"
sync_inventory(warehouse="us-east")  # Overwrites same workspace — no duplicates

Multi-Hop Pipelines

# Step 1: Load raw data
@forge.producer(workspace_type="raw", toolsets=[raw_pager])
def ingest(source: str) -> list: ...

# Step 2: Filter and enrich
@forge.consumer(workspace_type="raw", produces="clean", produces_toolsets=[clean_pager])
def clean(workspace_id: str, forge_ctx: ConsumerContext):
    data = forge_ctx.get_items()
    return forge_ctx.emit([normalize(d) for d in data if d["quality"] > 0.5])

# Step 3: Aggregate into a report
@forge.consumer(workspace_type="clean", produces="report", produces_toolsets=[report_kv])
def summarize(workspace_id: str, forge_ctx: ConsumerContext):
    items = forge_ctx.get_items()
    return forge_ctx.emit({"total": len(items), "summary": aggregate(items)})

Each step creates a new workspace. The agent can explore any level.

Multi-Agent Collaboration

Multiple agents share one store. Each agent reads/writes workspaces. Workspace metadata tracks lineage.

shared_store = SQLiteStore("shared.db")
forge = Forge(store=shared_store)

# Agent A: collect data
@forge.producer(workspace_type="raw", toolsets=[...], meta={"agent": "collector"})
def collect(topic: str) -> list: ...

# Agent B: analyze
@forge.consumer(workspace_type="raw", produces="analysis", meta={"agent": "analyst"})
def analyze(workspace_id: str, forge_ctx: ConsumerContext): ...

# Agent C: write report from analysis
@forge.consumer(workspace_type="analysis", produces="report", meta={"agent": "writer"})
def report(workspace_id: str, forge_ctx: ConsumerContext): ...

BoundToolSet (Fixed Workspace)

When an agent is focused on one workspace, bind it once:

bound = pager.bind("papers_abc123")
bound.papers_paginate(page=0)    # No workspace_id needed
bound.papers_get_item(index=5)   # Cleaner API for sub-agents

Result Transformation

Pre-process data before storage — normalize, deduplicate, trim:

@forge.producer(
    workspace_type="papers",
    toolsets=[pager],
    transform=lambda papers: [
        {k: v for k, v in p.items() if k in ("title", "abstract", "year")}
        for p in papers
    ],
)
def fetch_papers(query: str) -> list: ...

TTL & Automatic Cleanup

forge = Forge(store=MemoryStore(), default_ttl=3600)  # 1 hour default

# Override per-producer
@forge.producer(workspace_type="cache", toolsets=[pager], ttl=300)  # 5 min
def fetch_live_prices() -> list: ...

# Sweep expired workspaces (call periodically in production)
expired_ids = forge.sweep_expired()

System Prompt Generation

# Auto-generate a system prompt from registered producers and tools.
# Includes: workspace pattern, producer descriptions, pipeline syntax
# (if registered), exploration tools, and error recovery guidance.
system = forge.system_prompt(preamble="You are a research assistant.")

The generated prompt adapts to your forge configuration — if you register pipeline tools, it includes pipeline syntax; if you register search, it mentions search. Each producer is listed by name with its docstring summary.

Item Schema in Notifications

Producer tool calls return a self-describing WorkspaceRef that includes a JSON Schema for the item structure, so the LLM can construct filter/sort/pipeline queries immediately without paginating first to discover the data:

ref = search_papers("transformers")
# ref["item_schema"]     → {"type": "object", "properties": {"title": {"type": "string"},
#                            "year": {"type": "integer"}, "citations": {"type": "integer"}, ...},
#                            "required": ["title", "year", ...]}
# ref["available_tools"] → ["papers_paginate(...)", "papers_pipe(...)", ...]
# ref["next_steps"]      → ["• papers_paginate: Return a page of items...", ...]

The schema tells the LLM types — it knows year is an integer (so $gte/$lte work), tags is an array (so $contains works), name is a string (so $startswith works). Fields not present in every item are omitted from required.


Workspace Introspection

forge.list_workspaces()                    # All workspace IDs
forge.list_workspaces("papers")            # Filtered by type

meta = forge.workspace_meta("papers_abc")
meta.workspace_id          # "papers_abc"
meta.workspace_type        # "papers"
meta.data_shape            # "list", "dict", or "scalar"
meta.item_count            # 8234
meta.producer_fn           # "search_papers"
meta.producer_kwargs       # {"query": "ml", "limit": 10000}
meta.created_at            # Unix timestamp
meta.last_accessed_at      # Unix timestamp
meta.ttl                   # 3600.0 or None
meta.is_expired            # True/False
meta.extra                 # {"source": "arxiv"}

forge.drop_workspace("papers_abc")
forge.clear()                              # Drop all workspaces

Examples

The examples/ directory contains production-pattern examples organized from beginner to advanced:

# Example What It Shows
01 quickstart.py The 20-line pattern — copy and customize
02 rag_support_agent.py Search → filter → read knowledge base articles
03 data_pipeline.py Producer → Consumer → Consumer with workspace lineage
04 custom_tools.py Financial analytics: anomaly detection, aggregation, date ranges
05 pipelines.py Compound operations in one call: filter→sort→group→save
06 persistence.py SQLite + mutations: patch_item, append_items, process restart
07 error_handling.py Every failure mode and how the LLM self-corrects
08 multi_agent.py Collector → Analyst → Writer sharing one store
09 openai_agent.py Full OpenAI agent loop with schema export and tool dispatch
10 anthropic_agent.py Anthropic Claude code review agent with dict + list workspaces
11 concurrent_server.py FastAPI server with 20 concurrent sessions, thread safety, TTL

Run any example:

uv run python examples/01_quickstart.py

Design Principles

1. The LLM never sees bulk data. Producers store data and return a map. The agent pulls what it needs.

2. Tools are self-describing. Every WorkspaceRef includes tool names, descriptions, and call examples. Every error includes suggested_action. The agent doesn't need system prompt instructions to use the tools.

3. No framework lock-in. ctxtual is a library, not a framework. It doesn't own your agent loop. Use it with OpenAI, Anthropic, LangChain, or raw HTTP — the adapters are thin and optional.

4. Errors are data, not exceptions. dispatch_tool_call() returns structured error dicts. Your agent loop doesn't need try/except. The LLM reads the error and self-corrects.

5. Zero dependencies. The core library uses only the Python standard library. You don't inherit someone else's dependency tree.

6. Thread-safe by default. One Forge instance handles concurrent requests. No external locking required.


Testing

uv run pytest tests/ -v          # 456 tests
uv run ruff check src/ tests/    # Lint

Test coverage includes:

  • Core forge operations (producers, consumers, dispatch)
  • Both storage backends (MemoryStore, SQLiteStore)
  • All built-in toolsets (paginator, search, filter, kv_reader, pipeline)
  • All integration adapters (OpenAI, Anthropic, LangChain)
  • Concurrency (11 threading tests with parallel sessions)
  • Schema quality (43 tests for type mapping and docstring extraction)
  • Search quality (17 tests for relevance ranking and FTS5)
  • Error recovery (12 tests for LLM-friendly error dicts)
  • Data shape validation (13 tests for producer-consumer contracts)
  • Pipeline engine (73 tests for all 13 operators and compound pipelines)
  • LLM interface quality (36 tests for data preview, system prompt, schema examples, end-to-end workflows)

API Reference

Forge

Method Description
Forge(store, *, default_notify, default_ttl, max_items) Create orchestrator
@forge.producer(workspace_type, toolsets, key, transform, meta, notify, ttl) Producer decorator
@forge.consumer(workspace_type, produces, produces_toolsets) Consumer decorator
forge.toolset(name, *, enforce_type) Create/get a ToolSet
forge.dispatch_tool_call(tool_name, arguments) Route a tool call
forge.get_tools(workspace_id=None) All tool schemas (OpenAI format)
forge.get_producer_schemas() Producer-only schemas
forge.get_all_tool_schemas(workspace_id=None) Consumer-only schemas
forge.system_prompt(preamble="") Auto-generated system prompt
forge.list_workspaces(workspace_type=None) List workspace IDs
forge.workspace_meta(workspace_id) Get workspace metadata
forge.drop_workspace(workspace_id) Delete a workspace
forge.sweep_expired() Delete expired workspaces
forge.clear() Delete all workspaces

ToolSet

Method Description
ToolSet(name, *, enforce_type, safe, data_shape) Create tool group
@toolset.tool(name, validate_workspace, output_hint) Register a tool
toolset.bind(workspace_id) Create BoundToolSet with fixed workspace
toolset.tools Dict of {name: callable}
toolset.tool_names List of tool names
toolset.to_tool_schemas(workspace_id=None) OpenAI schemas for this toolset

Store

Method Description
init_workspace(meta) Create/update workspace
drop_workspace(workspace_id) Delete workspace and all data
get_meta(workspace_id) Get metadata (or None)
list_workspaces(workspace_type=None) List workspace IDs
set_items(workspace_id, payload) Store payload (list, dict, or scalar)
get_items(workspace_id) Read full payload
count_items(workspace_id) Item count
get_page(workspace_id, offset, limit) Paginated read
search_items(workspace_id, query, *, fields, max_results, case_sensitive) Full-text search
filter_items(workspace_id, field, value, operator) Structured filter
sort_items(workspace_id, field, *, descending, limit) Sorted read
distinct_field_values(workspace_id, field, *, max_values) Facet values
append_items(workspace_id, new_items) Append to list workspace
update_item(workspace_id, index, item) Replace item at index
patch_item(workspace_id, index, fields) Merge fields into item
delete_items(workspace_id, indices) Remove items by index
transaction() Atomic operation context manager
sweep_expired() Delete expired workspaces

Contributing

Contributions are welcome. Please open an issue to discuss before submitting large changes.

git clone https://github.com/banda-larga/ctxtual.git
cd ctxtual
uv sync --dev
uv run pytest tests/ -v
uv run ruff check src/ tests/

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ctxtual-0.1.2.tar.gz (152.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ctxtual-0.1.2-py3-none-any.whl (69.9 kB view details)

Uploaded Python 3

File details

Details for the file ctxtual-0.1.2.tar.gz.

File metadata

  • Download URL: ctxtual-0.1.2.tar.gz
  • Upload date:
  • Size: 152.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.9

File hashes

Hashes for ctxtual-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d87da7c1d68fad6705bb0aeeb4b34d97f21f1edf90a4b9e2fe9515e4c9aa08ec
MD5 dca2a65af057b4e89f1917f56f2b21d0
BLAKE2b-256 0e2a3df2930eabeee179c70c180a37dda0297f3fc2b942338ce52ace1b1d64ba

See more details on using hashes here.

File details

Details for the file ctxtual-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: ctxtual-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 69.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.9

File hashes

Hashes for ctxtual-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9bc41bfc91b395181a468b734e6740beda0124a5b141d2b8f2f50632d736189b
MD5 b15e49b682c8bad029ca6652a15827b1
BLAKE2b-256 d19d224364dbcd8d1e00d3c9b8018bfac3457fb5a4b011e17c26451adb9c42ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page