Context engineering library for AI agents: decorator-based workspace storage for large tool results
Project description
ctxtual
The context engineering library for AI agents.
Stop truncating tool results. Start engineering context.
pip install ctxtual
Table of Contents
- The Problem You Already Have
- Why ctxtual
- Quick Start
- Install
- Architecture
- Core API — Forge,
@producer,@consumer, dispatch, schema export - Built-in ToolSets — paginator, text_search, filter_set, kv_reader, text_content, pipeline
- Custom ToolSets
- Storage Backends
- Workspace Mutations
- Framework Integrations — OpenAI, Anthropic, LangChain
- Concurrency & Thread Safety
- Error Recovery
- Advanced Patterns
- Workspace Introspection
- Examples
- Design Principles
- Testing
- API Reference
- Contributing
- License
The Problem You Already Have
Your agent calls a tool. The tool returns 10,000 results. The LLM context window fits 200. You truncate to 200, and the agent misses the answer buried in result #8,432.
Every production agent team builds the same workaround: store the data somewhere, give the agent tools to explore it. ctxtual is that workaround, extracted into a library.
# Before: raw data floods the context window
def search_papers(query: str) -> list[dict]:
return database.search(query) # 10,000 results → LLM chokes
# After: data is stored, agent gets a map
@forge.producer(workspace_type="papers", toolsets=[paginator(forge), search, filters])
def search_papers(query: str) -> list[dict]:
return database.search(query) # 10,000 results → stored in workspace
# Agent receives:
# {
# "workspace_id": "papers_f3a8bc12",
# "item_count": 10000,
# "data_shape": "list",
# "available_tools": ["papers_paginate(...)", "papers_search(...)"],
# "next_steps": ["• papers_paginate: Return a page of items...", ...]
# }
The agent then explores with surgical precision — paginating, searching, filtering — pulling only what it needs into the context window. The data stays server-side. The agent stays smart.
Why ctxtual
| Problem | Without ctxtual | With ctxtual |
|---|---|---|
| Tool returns 10K items | Truncate and hope | Store, paginate, search on demand |
| Agent needs to filter by field | Load everything, pray it fits | filter_by(field="year", value=2024, operator="gte") |
| Agent needs one specific item | Return entire list for index lookup | get_item(index=42) |
| Tool returns large text (HTML, PDF) | Dump entire string into context | text_content — char-based pagination, in-text search |
| Multiple tools return overlapping data | Duplicate data across messages | Workspaces with deterministic IDs, deduplication built-in |
| Agent crashes mid-conversation | All data lost | SQLiteStore persists across restarts |
| LLM calls wrong tool name | Unhandled KeyError, agent loop crashes |
Structured error dict: {"error": ..., "available_tools": [...], "suggested_action": ...} |
| Multi-agent sharing data | Custom IPC | Shared store, workspace-level isolation |
| Framework lock-in | Rewrite for every framework | Adapters for OpenAI, Anthropic, LangChain |
ctxtual is not a framework. It's a library. It doesn't own your agent loop, your LLM calls, or your data. It's the plumbing between "tool returns data" and "agent explores data."
Quick Start
from ctxtual import Forge, MemoryStore
from ctxtual.utils import paginator, text_search, filter_set, pipeline
forge = Forge(store=MemoryStore())
# Wrap your data-fetching function — toolsets get their name from workspace_type
@forge.producer(workspace_type="papers", toolsets=[
paginator(forge),
text_search(forge, fields=["title", "abstract"]),
filter_set(forge),
pipeline(forge),
])
def search_papers(query: str, limit: int = 10_000) -> list[dict]:
return database.search(query, limit)
# Agent calls the producer → gets a notification, not 10K items
ref = search_papers("machine learning")
ws_id = ref["workspace_id"]
# Agent explores using consumer tools:
forge.dispatch_tool_call("papers_paginate", {"workspace_id": ws_id, "page": 0, "size": 10})
forge.dispatch_tool_call("papers_search", {"workspace_id": ws_id, "query": "transformer"})
forge.dispatch_tool_call("papers_filter_by", {"workspace_id": ws_id, "field": "year", "value": 2024, "operator": "gte"})
# Or: compound operations in ONE call (no round-trips, no intermediate context)
forge.dispatch_tool_call("papers_pipe", {
"workspace_id": ws_id,
"steps": [
{"search": {"query": "transformer"}},
{"filter": {"year": {"$gte": 2024}}},
{"sort": {"field": "citations", "order": "desc"}},
{"limit": 5},
],
})
Install
pip install ctxtual # or: uv add ctxtual
Zero required dependencies. The core library uses only the Python standard library.
Optional extras:
pip install ctxtual[dev] # pytest, ruff, coverage
Integration adapters (OpenAI, Anthropic, LangChain) are zero-dependency by default — they duck-type against SDK objects. Install the SDK you need separately:
pip install openai # for ctxtual.integrations.openai
pip install anthropic # for ctxtual.integrations.anthropic
pip install langchain-core # for ctxtual.integrations.langchain
Architecture
Your Agent Loop
LLM ←→ tool_calls ←→ forge.dispatch_tool_call()
Forge (orchestrator)
@producer / @consumer decorators
Schema export (OpenAI, Anthropic, LangChain format)
Dispatch routing — one method handles all tool calls
Thread-safe (RLock)
ToolSets (consumer tools)
paginator · text_search · filter_set
kv_reader · text_content · pipeline
+ your custom domain tools
Store (pluggable backend)
MemoryStore — fast, in-process, LRU eviction
SQLiteStore — persistent, FTS5 search, WAL mode
BaseStore — subclass for Redis, Postgres, S3, etc.
Data flow:
- Producer runs your function, stores the result in a workspace, returns a
WorkspaceRefnotification to the LLM. - LLM reads the notification, sees available tools, calls them.
- Consumer tools (paginate, search, filter, etc.) read from the store and return only the requested slice.
- The LLM never sees the full dataset. It sees pages, search results, filtered subsets — exactly what it needs.
Core API
Forge
The central orchestrator. One per application (or per agent session).
from ctxtual import Forge, MemoryStore, SQLiteStore
# In-memory (default) — fast, test-friendly, process-scoped
forge = Forge(store=MemoryStore())
# Persistent — survives process restarts
forge = Forge(store=SQLiteStore("agent.db"))
# With configuration
forge = Forge(
store=MemoryStore(max_workspaces=500), # LRU eviction when exceeded
default_ttl=3600, # Workspaces expire after 1 hour
max_items=100_000, # Reject payloads larger than 100K items
default_notify=True, # Producers return dicts (not WorkspaceRef objects)
)
@forge.producer — Store Data, Return a Map
Wraps any function so its return value is stored in a workspace. The agent gets a self-describing notification instead of raw data.
@forge.producer(
workspace_type="papers", # Logical data category
toolsets=[pager, search, filters], # Consumer tools for this data
key="papers_{query}", # Deterministic workspace ID (optional)
transform=lambda r: r[:5000], # Pre-store transform (optional)
meta={"source": "arxiv"}, # Metadata attached to workspace
ttl=1800, # Time-to-live in seconds
notify=True, # True=dict, False=WorkspaceRef object
)
def search_papers(query: str) -> list[dict]:
return external_api.search(query)
key parameter controls workspace identity:
| Key | Behavior | Use case |
|---|---|---|
None (default) |
Auto UUID: papers_f3a8bc12a0 |
Every call creates a new workspace |
"papers_{query}" |
Templated from kwargs | Idempotent — same args = same workspace (overwritten) |
lambda kw: f"ws_{kw['user_id']}" |
Custom callable | Full control over deduplication logic |
@forge.consumer — Transform and Derive
Consumers read from one workspace and optionally produce a new one. This enables multi-hop agent pipelines.
@forge.consumer(
workspace_type="raw_data", # Input workspace type
produces="cleaned_data", # Output workspace type
produces_toolsets=[clean_pager], # Tools for the output
)
def clean_and_filter(workspace_id: str, forge_ctx: ConsumerContext):
raw = forge_ctx.get_items()
cleaned = [normalize(item) for item in raw if item["quality"] > 0.8]
return forge_ctx.emit(cleaned, meta={"derived_from": workspace_id})
ConsumerContext is injected automatically. It provides:
| Method | Description |
|---|---|
get_items(workspace_id=None) |
Read items from the input workspace (or any workspace) |
emit(payload, *, workspace_type, meta, ttl) |
Store derived data as a new workspace, return WorkspaceRef dict |
store |
Direct access to the store backend |
forge.dispatch_tool_call() — Single Entry Point
Route any tool call — producers and consumers — through one method. This is what your agent loop calls.
result = forge.dispatch_tool_call("papers_search", {"workspace_id": ws_id, "query": "attention"})
Always returns a value, never raises on tool errors:
# Success → tool result (any type)
{"matches": [...], "total_matches": 42}
# Unknown tool → structured error dict
{"error": "Tool 'bad_name' not found.", "available_tools": [...], "suggested_action": "..."}
# Workspace not found → structured error with available workspaces
{"error": "Workspace 'ws_bad' not found.", "available_workspaces": ["ws_real"], "suggested_action": "..."}
This means your agent loop needs zero try/except for tool calls. The LLM reads the error and self-corrects.
Schema Export
Export tool schemas in OpenAI function-calling format. Pass them to any LLM that supports tool use.
# All tools (producers + consumers)
tools = forge.get_tools()
# Only producer tools
producers = forge.get_producer_schemas()
# All consumer tools (optionally scoped to a workspace)
consumers = forge.get_all_tool_schemas(workspace_id="papers_abc")
Schemas include:
- Rich parameter descriptions extracted from docstrings (Google-style and Sphinx-style)
- Proper JSON Schema types for
Optional,Union,Literal,list[X],dict[K,V], enums - Well-known parameter descriptions for common names like
workspace_id,page,query - Examples on complex parameters — pipeline
stepsandmetricsinclude concrete JSON examples that help LLMs construct valid queries schema_extrasupport — passschema_extra={"param": {"minimum": 0}}to@ts.tool()to enrich any parameter's schema
Built-in ToolSets
These cover 90% of what agents need. Import them, pass to @producer, done. The name parameter is optional — when omitted, the toolset inherits its name from the producer's workspace_type:
# Simple — name inferred from workspace_type:
@forge.producer(workspace_type="papers", toolsets=[paginator(forge), text_search(forge)])
def fetch(query): ...
# Explicit — still works for advanced use:
pager = paginator(forge, "papers")
paginator(forge, name) — List Navigation
from ctxtual.utils import paginator
pager = paginator(forge, "papers") # data_shape="list"
| Tool | Description |
|---|---|
{name}_paginate(workspace_id, page=0, size=10) |
Page of items + metadata (total, has_next, has_prev) |
{name}_count(workspace_id) |
Total item count |
{name}_get_item(workspace_id, index) |
Single item by zero-based index |
{name}_get_slice(workspace_id, start=0, end=20) |
Arbitrary slice |
text_search(forge, name, *, fields=None) — Full-Text Search
from ctxtual.utils import text_search
search = text_search(forge, "papers", fields=["title", "abstract"]) # data_shape="list"
| Tool | Description |
|---|---|
{name}_search(workspace_id, query, max_results=20, case_sensitive=False) |
BM25-ranked full-text search (FTS5 on SQLiteStore, TF scoring on MemoryStore) |
{name}_field_values(workspace_id, field, max_values=50) |
Distinct values for a field (facet discovery) |
filter_set(forge, name) — Structured Filtering
from ctxtual.utils import filter_set
filters = filter_set(forge, "papers") # data_shape="list"
| Tool | Description |
|---|---|
{name}_filter_by(workspace_id, field, value, operator="eq") |
Filter by field. Operators: eq, ne, lt, lte, gt, gte, contains, startswith |
{name}_sort_by(workspace_id, field, descending=False, limit=100) |
Sort by field |
kv_reader(forge, name) — Dict Workspaces
For single-document workspaces (config, metadata, API responses that are dicts, not lists).
from ctxtual.utils import kv_reader
kv = kv_reader(forge, "config") # data_shape="dict"
| Tool | Description |
|---|---|
{name}_get_keys(workspace_id) |
List top-level keys |
{name}_get_value(workspace_id, key) |
Read value at key |
text_content(forge, name) — Raw Text / Scalar Navigation
For producers that return a string (HTML page, PDF text, log file, API response body). Instead of stuffing the entire string into context, the agent navigates it with character-based pagination and search.
from ctxtual.utils import text_content
reader = text_content(forge, "page") # data_shape="scalar"
| Tool | Description |
|---|---|
{name}_read_page(workspace_id, page=0, chars_per_page=4000) |
Character-based page with has_next/has_prev, word count, navigation hints |
{name}_search_in_text(workspace_id, query, context_chars=100, max_results=20) |
Find occurrences with surrounding context, character offsets |
{name}_get_length(workspace_id) |
Character, word, and line counts |
Example — webpage producer:
@forge.producer(workspace_type="page", toolsets=[reader])
def read_webpage(url: str) -> str:
return requests.get(url).text # Could be 100KB of HTML
# Agent receives WorkspaceRef with item_count=1, data_shape="scalar"
# and tools: page_read_page, page_search_in_text, page_get_length
Text Transforms — Convert Strings to Structured Data
When you want to use list-based tools (paginator, filter, pipeline) with text content, transform the string before storing:
from ctxtual import chunk_text, split_sections, split_markdown_sections
# Fixed-size overlapping chunks (good for embeddings, RAG)
@forge.producer(workspace_type="chunks", toolsets=[pager, search])
def ingest_document(path: str) -> list[dict]:
text = open(path).read()
return chunk_text(text, chunk_size=2000, overlap=200)
# → [{"chunk_index": 0, "text": "...", "char_offset": 0}, ...]
# Split by blank lines (paragraphs)
@forge.producer(workspace_type="paragraphs", toolsets=[pager])
def split_doc(text: str) -> list[dict]:
return split_sections(text, separator="\n\n")
# → [{"section_index": 0, "text": "First paragraph..."}, ...]
# Split by Markdown headers
@forge.producer(workspace_type="sections", toolsets=[pager, search])
def parse_markdown(content: str) -> list[dict]:
return split_markdown_sections(content)
# → [{"section_index": 0, "heading": "Introduction", "level": 1, "text": "..."}, ...]
All transforms pass non-strings through unchanged, so they're safe in pipelines that might receive mixed data.
pipeline(forge, name) — Declarative Data Pipelines
The most powerful built-in. Instead of the LLM making 4+ round-trips (search → filter → sort → limit), it describes the entire chain in one tool call. Intermediate data stays server-side — only the final result enters context.
This is ctxtual's answer to programmatic tool calling: compound operations in a single step, framework-agnostic, works with any LLM.
from ctxtual.utils import pipeline
pipe = pipeline(forge, "papers") # data_shape="list"
| Tool | Description |
|---|---|
{name}_pipe(workspace_id, steps, save_as=None) |
Chain operations declaratively. save_as stores the result as a new browsable workspace. |
{name}_aggregate(workspace_id, group_by=None, metrics=None) |
Group-by aggregation with computed statistics |
Pipeline operations (each step is {op_name: params}):
| Operation | Example | Description |
|---|---|---|
filter |
{"filter": {"year": {"$gte": 2023}}} |
MongoDB-style conditions: $gt, $gte, $lt, $lte, $ne, $in, $nin, $contains, $startswith, $regex, $exists. Logical: $or, $and, $not. Dot notation: "author.name" |
search |
{"search": {"query": "ML", "fields": ["title"]}} |
Case-insensitive text search across all or specified fields |
sort |
{"sort": {"field": "score", "order": "desc"}} |
Single or multi-field sort. Multi: [{"field": "year"}, {"field": "score", "order": "desc"}] |
select |
{"select": ["title", "author"]} |
Keep only these fields |
exclude |
{"exclude": ["raw_blob"]} |
Remove these fields |
limit |
{"limit": 10} |
First N items |
skip |
{"skip": 5} |
Skip first N items |
slice |
{"slice": [5, 15]} |
Items [5:15] |
sample |
{"sample": {"n": 10, "seed": 42}} |
Random sample (reproducible with seed) |
unique |
{"unique": "author"} |
Deduplicate by field, keep first |
flatten |
{"flatten": "tags"} |
Expand list fields into multiple items |
group_by |
{"group_by": {"field": "category", "metrics": {"n": "count", "avg": "mean:score"}}} |
Aggregation. Metrics: count, sum:f, mean:f, min:f, max:f, median:f, stddev:f, values:f |
count |
{"count": true} |
Terminal: return {"count": N} |
Example — what would take 4 tool calls in 1:
result = forge.dispatch_tool_call("papers_pipe", {
"workspace_id": wid,
"steps": [
{"search": {"query": "neural networks"}},
{"filter": {"year": {"$gte": 2023}}},
{"sort": {"field": "citations", "order": "desc"}},
{"limit": 5},
{"select": ["title", "author", "citations"]},
],
})
# → {"items": [...], "count": 5} — one call, zero intermediate context
Example — tag frequency analysis (flatten → group → sort):
result = forge.dispatch_tool_call("papers_pipe", {
"workspace_id": wid,
"steps": [
{"flatten": "tags"},
{"group_by": {"field": "tags", "metrics": {"count": "count"}}},
{"sort": {"field": "count", "order": "desc"}},
{"limit": 10},
],
})
Example — save pipeline result as a new workspace:
result = forge.dispatch_tool_call("papers_pipe", {
"workspace_id": wid,
"steps": [{"filter": {"author": "Alice"}}, {"sort": {"field": "year"}}],
"save_as": "alice_papers", # stored as a new workspace
})
# LLM can then paginate "alice_papers" with papers_paginate
Custom ToolSets
When built-ins aren't enough, create domain-specific tools:
analytics = forge.toolset("transactions")
analytics.data_shape = "list" # Validates payload shape at both produce-time and tool-time
@analytics.tool(
name="transactions_anomalies",
output_hint="Investigate flagged transactions with transactions_get_item(workspace_id='{workspace_id}', index=INDEX).",
)
def detect_anomalies(workspace_id: str, std_threshold: float = 2.0) -> dict:
"""Flag transactions with amounts that deviate significantly from the mean.
Args:
workspace_id: The transactions workspace to analyze.
std_threshold: Number of standard deviations to flag as anomaly.
"""
items = analytics.store.get_items(workspace_id)
# ... your domain logic ...
return {"anomalies": flagged, "count": len(flagged)}
output_hint is appended to the tool result as a _hint field, making every tool self-describing. The {workspace_id} placeholder is replaced at runtime. The result envelope is always {"result": <original>, "_hint": "<hint>"} — the original return shape is never mutated.
data_shape validation:
- Set
data_shape="list"on toolsets that expect list data (paginator, search, filter, pipeline) - Set
data_shape="dict"on toolsets that expect dict data (kv_reader) - Set
data_shape="scalar"on toolsets that expect raw strings (text_content) - At producer time: logs a warning if the payload shape doesn't match
- At tool-call time: returns a structured error dict with
expected_shape,actual_shape,suggested_action - Shape-aware WorkspaceRef: incompatible tools are automatically filtered from
available_tools— if a producer returns a string, the LLM only seestext_contenttools, not paginator tools
Storage Backends
MemoryStore
from ctxtual import MemoryStore
store = MemoryStore(max_workspaces=500) # Optional LRU eviction
Fast, thread-safe (RLock), zero dependencies. Data lives in process memory.
SQLiteStore
from ctxtual import SQLiteStore
store = SQLiteStore("agent.db") # Persistent file
store = SQLiteStore(":memory:") # In-memory SQLite (for testing)
Per-row storage in cf_items table enables SQL-pushed queries — LIMIT/OFFSET, json_extract WHERE, ORDER BY all happen at the SQL level, not in Python. Full-text search uses FTS5 with Porter stemming and bm25() ranking.
Schema:
cf_meta— workspace metadata (one row per workspace)cf_items— per-item storage (one row per item per workspace)cf_fts— FTS5 virtual table for full-text search- Automatic migration on open (adds new columns to old databases)
Custom Backends
Implement BaseStore for Redis, Postgres, S3, or any storage:
from ctxtual import BaseStore
from ctxtual.types import WorkspaceMeta
class RedisStore(BaseStore):
# Required — implement these 7 methods:
def init_workspace(self, meta: WorkspaceMeta) -> None: ...
def drop_workspace(self, workspace_id: str) -> None: ...
def get_meta(self, workspace_id: str) -> WorkspaceMeta | None: ...
def list_workspaces(self, workspace_type: str | None = None) -> list[str]: ...
def set_items(self, workspace_id: str, payload: Any) -> None: ...
def get_items(self, workspace_id: str) -> Any: ...
def set(self, workspace_id: str, key: str, value: Any) -> None: ...
def get(self, workspace_id: str, key: str, default: Any = None) -> Any: ...
# Optional — override for performance (defaults iterate over get_items):
def count_items(self, workspace_id: str) -> int: ...
def get_page(self, workspace_id: str, offset: int, limit: int) -> list: ...
def search_items(self, workspace_id, query, *, fields=None, max_results=20, case_sensitive=False) -> list: ...
def filter_items(self, workspace_id, field, value, operator="eq") -> list: ...
def sort_items(self, workspace_id, field, *, descending=False, limit=100) -> list: ...
# Optional — override for mutation support:
def append_items(self, workspace_id: str, new_items: list) -> int: ...
def update_item(self, workspace_id: str, index: int, item: Any) -> None: ...
def patch_item(self, workspace_id: str, index: int, fields: dict) -> None: ...
def delete_items(self, workspace_id: str, indices: list[int]) -> int: ...
All query and mutation methods have default implementations that work on any store — override them only for performance.
Workspace Mutations
Workspaces are not read-only. Agents can modify data in place:
store = forge.store
# Append new items to an existing workspace
store.append_items("tasks_sprint_1", [{"title": "New task", "status": "todo"}])
# Update an item by index (full replacement)
store.update_item("tasks_sprint_1", 0, {"title": "Updated", "status": "done"})
# Patch specific fields on an item (merge, not replace)
store.patch_item("tasks_sprint_1", 2, {"status": "in_progress"})
# Delete items by indices
store.delete_items("tasks_sprint_1", [5, 6, 7])
All mutations maintain FTS index consistency on SQLiteStore.
Framework Integrations
OpenAI
from ctxtual.integrations.openai import to_openai_tools, has_tool_calls, handle_tool_calls
# Get tool schemas
tools = to_openai_tools(forge)
# In your agent loop
response = client.chat.completions.create(model="gpt-5-mini", messages=messages, tools=tools)
if has_tool_calls(response):
# Dispatch all tool calls, get tool-result messages
tool_messages = handle_tool_calls(forge, response)
messages.append(response.choices[0].message)
messages.extend(tool_messages)
Anthropic
from ctxtual.integrations.anthropic import to_anthropic_tools, has_tool_use, handle_tool_use
tools = to_anthropic_tools(forge) # Anthropic's flat schema format
response = client.messages.create(model="claude-sonnet-4.6", tools=tools, messages=messages)
if has_tool_use(response):
tool_results = handle_tool_use(forge, response) # Returns tool_result content blocks
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
LangChain
from ctxtual.integrations.langchain import to_langchain_tools
# Returns list of StructuredTool instances — plug into any LangChain agent
tools = to_langchain_tools(forge)
agent = create_react_agent(llm, tools)
All adapters are zero-hard-dependency — they duck-type against SDK objects and raw dicts. Install the SDK you need; the adapter works with whatever version you have.
Concurrency & Thread Safety
ctxtual is designed for production web servers (FastAPI, Django, Flask) serving concurrent agent sessions:
Forge—threading.RLockprotects all registration dictsMemoryStore—threading.RLockprotects all data access;get_meta()returns deep copiesSQLiteStore— per-thread connections,threading.RLock,WALjournal mode- Transactions —
store.transaction()context manager for atomic multi-step operations (nesting supported in SQLiteStore)
# Safe: one Forge instance, many concurrent requests
forge = Forge(store=MemoryStore(max_workspaces=1000), default_ttl=1800)
@app.post("/search")
async def search(query: str):
return search_papers(query=query) # Thread-safe
@app.get("/explore/{workspace_id}")
async def explore(workspace_id: str, page: int = 0):
return forge.dispatch_tool_call("papers_paginate", {"workspace_id": workspace_id, "page": page})
Error Recovery
LLMs make mistakes. ctxtual never crashes — it teaches the LLM to self-correct.
| LLM Mistake | Response |
|---|---|
| Calls unknown tool | {"error": "Tool 'bad_name' not found.", "available_tools": [...], "suggested_action": "Use one of the available tools."} |
| Wrong workspace_id | {"error": "Workspace 'ws_bad' not found.", "available_workspaces": ["ws_real"], "suggested_action": "Use one of the available workspaces."} |
| Workspace expired | {"error": "Workspace 'ws_old' has expired.", "suggested_action": "Call load_papers() to create a fresh workspace."} |
| Type mismatch | {"error": "...", "expected_type": "papers", "actual_type": "employees", "workspaces_of_correct_type": [...]} |
| Shape mismatch | {"error": "This tool expects 'list' data but workspace contains 'dict' data.", "suggested_action": "Use get_keys/get_value instead."} |
| Index out of range | {"error": "Index 999 out of range.", "valid_range": "0–42", "total_items": 43, "suggested_action": "..."} |
| Missing dict key | {"error": "Key 'bad' not found.", "available_keys": ["host", "port"], "suggested_action": "..."} |
Every error includes suggested_action — the LLM reads it and knows exactly what to do next.
Advanced Patterns
Deterministic Workspace IDs (Idempotency)
@forge.producer(workspace_type="inventory", toolsets=[pager], key="inv_{warehouse}")
def sync_inventory(warehouse: str) -> list:
return fetch_from_wms(warehouse)
sync_inventory(warehouse="us-east") # Creates workspace "inv_us-east"
sync_inventory(warehouse="us-east") # Overwrites same workspace — no duplicates
Multi-Hop Pipelines
# Step 1: Load raw data
@forge.producer(workspace_type="raw", toolsets=[raw_pager])
def ingest(source: str) -> list: ...
# Step 2: Filter and enrich
@forge.consumer(workspace_type="raw", produces="clean", produces_toolsets=[clean_pager])
def clean(workspace_id: str, forge_ctx: ConsumerContext):
data = forge_ctx.get_items()
return forge_ctx.emit([normalize(d) for d in data if d["quality"] > 0.5])
# Step 3: Aggregate into a report
@forge.consumer(workspace_type="clean", produces="report", produces_toolsets=[report_kv])
def summarize(workspace_id: str, forge_ctx: ConsumerContext):
items = forge_ctx.get_items()
return forge_ctx.emit({"total": len(items), "summary": aggregate(items)})
Each step creates a new workspace. The agent can explore any level.
Multi-Agent Collaboration
Multiple agents share one store. Each agent reads/writes workspaces. Workspace metadata tracks lineage.
shared_store = SQLiteStore("shared.db")
forge = Forge(store=shared_store)
# Agent A: collect data
@forge.producer(workspace_type="raw", toolsets=[...], meta={"agent": "collector"})
def collect(topic: str) -> list: ...
# Agent B: analyze
@forge.consumer(workspace_type="raw", produces="analysis", meta={"agent": "analyst"})
def analyze(workspace_id: str, forge_ctx: ConsumerContext): ...
# Agent C: write report from analysis
@forge.consumer(workspace_type="analysis", produces="report", meta={"agent": "writer"})
def report(workspace_id: str, forge_ctx: ConsumerContext): ...
BoundToolSet (Fixed Workspace)
When an agent is focused on one workspace, bind it once:
bound = pager.bind("papers_abc123")
bound.papers_paginate(page=0) # No workspace_id needed
bound.papers_get_item(index=5) # Cleaner API for sub-agents
Result Transformation
Pre-process data before storage — normalize, deduplicate, trim:
@forge.producer(
workspace_type="papers",
toolsets=[pager],
transform=lambda papers: [
{k: v for k, v in p.items() if k in ("title", "abstract", "year")}
for p in papers
],
)
def fetch_papers(query: str) -> list: ...
TTL & Automatic Cleanup
forge = Forge(store=MemoryStore(), default_ttl=3600) # 1 hour default
# Override per-producer
@forge.producer(workspace_type="cache", toolsets=[pager], ttl=300) # 5 min
def fetch_live_prices() -> list: ...
# Sweep expired workspaces (call periodically in production)
expired_ids = forge.sweep_expired()
System Prompt Generation
# Auto-generate a system prompt from registered producers and tools.
# Includes: workspace pattern, producer descriptions, pipeline syntax
# (if registered), exploration tools, and error recovery guidance.
system = forge.system_prompt(preamble="You are a research assistant.")
The generated prompt adapts to your forge configuration — if you register pipeline tools, it includes pipeline syntax; if you register search, it mentions search. Each producer is listed by name with its docstring summary.
Item Schema in Notifications
Producer tool calls return a self-describing WorkspaceRef that includes a
JSON Schema for the item structure, so the LLM can construct filter/sort/pipeline
queries immediately without paginating first to discover the data:
ref = search_papers("transformers")
# ref["item_schema"] → {"type": "object", "properties": {"title": {"type": "string"},
# "year": {"type": "integer"}, "citations": {"type": "integer"}, ...},
# "required": ["title", "year", ...]}
# ref["available_tools"] → ["papers_paginate(...)", "papers_pipe(...)", ...]
# ref["next_steps"] → ["• papers_paginate: Return a page of items...", ...]
The schema tells the LLM types — it knows year is an integer (so $gte/$lte work),
tags is an array (so $contains works), name is a string (so $startswith works).
Fields not present in every item are omitted from required.
Workspace Introspection
forge.list_workspaces() # All workspace IDs
forge.list_workspaces("papers") # Filtered by type
meta = forge.workspace_meta("papers_abc")
meta.workspace_id # "papers_abc"
meta.workspace_type # "papers"
meta.data_shape # "list", "dict", or "scalar"
meta.item_count # 8234
meta.producer_fn # "search_papers"
meta.producer_kwargs # {"query": "ml", "limit": 10000}
meta.created_at # Unix timestamp
meta.last_accessed_at # Unix timestamp
meta.ttl # 3600.0 or None
meta.is_expired # True/False
meta.extra # {"source": "arxiv"}
forge.drop_workspace("papers_abc")
forge.clear() # Drop all workspaces
Examples
The examples/ directory contains production-pattern examples organized from beginner to advanced:
| # | Example | What It Shows |
|---|---|---|
| 01 | quickstart.py | The 20-line pattern — copy and customize |
| 02 | rag_support_agent.py | Search → filter → read knowledge base articles |
| 03 | data_pipeline.py | Producer → Consumer → Consumer with workspace lineage |
| 04 | custom_tools.py | Financial analytics: anomaly detection, aggregation, date ranges |
| 05 | pipelines.py | Compound operations in one call: filter→sort→group→save |
| 06 | persistence.py | SQLite + mutations: patch_item, append_items, process restart |
| 07 | error_handling.py | Every failure mode and how the LLM self-corrects |
| 08 | multi_agent.py | Collector → Analyst → Writer sharing one store |
| 09 | openai_agent.py | Full OpenAI agent loop with schema export and tool dispatch |
| 10 | anthropic_agent.py | Anthropic Claude code review agent with dict + list workspaces |
| 11 | concurrent_server.py | FastAPI server with 20 concurrent sessions, thread safety, TTL |
Run any example:
uv run python examples/01_quickstart.py
Design Principles
1. The LLM never sees bulk data. Producers store data and return a map. The agent pulls what it needs.
2. Tools are self-describing. Every WorkspaceRef includes tool names, descriptions, and call examples. Every error includes suggested_action. The agent doesn't need system prompt instructions to use the tools.
3. No framework lock-in. ctxtual is a library, not a framework. It doesn't own your agent loop. Use it with OpenAI, Anthropic, LangChain, or raw HTTP — the adapters are thin and optional.
4. Errors are data, not exceptions. dispatch_tool_call() returns structured error dicts. Your agent loop doesn't need try/except. The LLM reads the error and self-corrects.
5. Zero dependencies. The core library uses only the Python standard library. You don't inherit someone else's dependency tree.
6. Thread-safe by default. One Forge instance handles concurrent requests. No external locking required.
Testing
uv run pytest tests/ -v # 456 tests
uv run ruff check src/ tests/ # Lint
Test coverage includes:
- Core forge operations (producers, consumers, dispatch)
- Both storage backends (MemoryStore, SQLiteStore)
- All built-in toolsets (paginator, search, filter, kv_reader, pipeline)
- All integration adapters (OpenAI, Anthropic, LangChain)
- Concurrency (11 threading tests with parallel sessions)
- Schema quality (43 tests for type mapping and docstring extraction)
- Search quality (17 tests for relevance ranking and FTS5)
- Error recovery (12 tests for LLM-friendly error dicts)
- Data shape validation (13 tests for producer-consumer contracts)
- Pipeline engine (73 tests for all 13 operators and compound pipelines)
- LLM interface quality (36 tests for data preview, system prompt, schema examples, end-to-end workflows)
API Reference
Forge
| Method | Description |
|---|---|
Forge(store, *, default_notify, default_ttl, max_items) |
Create orchestrator |
@forge.producer(workspace_type, toolsets, key, transform, meta, notify, ttl) |
Producer decorator |
@forge.consumer(workspace_type, produces, produces_toolsets) |
Consumer decorator |
forge.toolset(name, *, enforce_type) |
Create/get a ToolSet |
forge.dispatch_tool_call(tool_name, arguments) |
Route a tool call |
forge.get_tools(workspace_id=None) |
All tool schemas (OpenAI format) |
forge.get_producer_schemas() |
Producer-only schemas |
forge.get_all_tool_schemas(workspace_id=None) |
Consumer-only schemas |
forge.system_prompt(preamble="") |
Auto-generated system prompt |
forge.list_workspaces(workspace_type=None) |
List workspace IDs |
forge.workspace_meta(workspace_id) |
Get workspace metadata |
forge.drop_workspace(workspace_id) |
Delete a workspace |
forge.sweep_expired() |
Delete expired workspaces |
forge.clear() |
Delete all workspaces |
ToolSet
| Method | Description |
|---|---|
ToolSet(name, *, enforce_type, safe, data_shape) |
Create tool group |
@toolset.tool(name, validate_workspace, output_hint) |
Register a tool |
toolset.bind(workspace_id) |
Create BoundToolSet with fixed workspace |
toolset.tools |
Dict of {name: callable} |
toolset.tool_names |
List of tool names |
toolset.to_tool_schemas(workspace_id=None) |
OpenAI schemas for this toolset |
Store
| Method | Description |
|---|---|
init_workspace(meta) |
Create/update workspace |
drop_workspace(workspace_id) |
Delete workspace and all data |
get_meta(workspace_id) |
Get metadata (or None) |
list_workspaces(workspace_type=None) |
List workspace IDs |
set_items(workspace_id, payload) |
Store payload (list, dict, or scalar) |
get_items(workspace_id) |
Read full payload |
count_items(workspace_id) |
Item count |
get_page(workspace_id, offset, limit) |
Paginated read |
search_items(workspace_id, query, *, fields, max_results, case_sensitive) |
Full-text search |
filter_items(workspace_id, field, value, operator) |
Structured filter |
sort_items(workspace_id, field, *, descending, limit) |
Sorted read |
distinct_field_values(workspace_id, field, *, max_values) |
Facet values |
append_items(workspace_id, new_items) |
Append to list workspace |
update_item(workspace_id, index, item) |
Replace item at index |
patch_item(workspace_id, index, fields) |
Merge fields into item |
delete_items(workspace_id, indices) |
Remove items by index |
transaction() |
Atomic operation context manager |
sweep_expired() |
Delete expired workspaces |
Contributing
Contributions are welcome. Please open an issue to discuss before submitting large changes.
git clone https://github.com/banda-larga/ctxtual.git
cd ctxtual
uv sync --dev
uv run pytest tests/ -v
uv run ruff check src/ tests/
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ctxtual-0.1.2.tar.gz.
File metadata
- Download URL: ctxtual-0.1.2.tar.gz
- Upload date:
- Size: 152.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d87da7c1d68fad6705bb0aeeb4b34d97f21f1edf90a4b9e2fe9515e4c9aa08ec
|
|
| MD5 |
dca2a65af057b4e89f1917f56f2b21d0
|
|
| BLAKE2b-256 |
0e2a3df2930eabeee179c70c180a37dda0297f3fc2b942338ce52ace1b1d64ba
|
File details
Details for the file ctxtual-0.1.2-py3-none-any.whl.
File metadata
- Download URL: ctxtual-0.1.2-py3-none-any.whl
- Upload date:
- Size: 69.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9bc41bfc91b395181a468b734e6740beda0124a5b141d2b8f2f50632d736189b
|
|
| MD5 |
b15e49b682c8bad029ca6652a15827b1
|
|
| BLAKE2b-256 |
d19d224364dbcd8d1e00d3c9b8018bfac3457fb5a4b011e17c26451adb9c42ca
|