Standalone policy evaluation engine for AI governance under the EU AI Act
Project description
kyvvu-engine
Runtime policy evaluation for AI agents — a stateful behavioural firewall as a Python library.
kyvvu-engine evaluates policies against the full execution path of an AI agent. Given an intended behaviour — the smallest atomic unit of what an agent is about to do — it decides allow, warn, or block, and explains why.
Governance is modelled as a pathwise problem: decisions depend not only on the step an agent is about to take, but on the full ordered history of what it has already done in the current task. This is "policies on paths," formalised in the paper Runtime Governance for AI Agents: Policies on Paths. kyvvu-engine is the reference implementation.
kyvvu-engine is used by the kyvvu SDK (Python agent integration), the Kyvvu platform, and is available as a standalone HTTP service via the SDK's kyvvu serve command for harnesses in other languages.
Contents
- Installation
- Quickstart
- Mental model
- Atomic behaviours
- Properties
- The evaluation lifecycle
- Agent registration
- Rule functions
- Writing your own rule function
- The two-tier API
- HTTP endpoints (the runner)
- Configuration
- Policy fetch resilience
- Debugging and explainability
- Performance
- Running as a standalone service
- Multi-agent and branching patterns
- Stability and versioning
Installation
# Most users: install the SDK (includes the engine)
pip install kyvvu
# Engine only (no SDK, no agent integration — for embedding)
pip install kyvvu-engine
# Standalone HTTP server (for non-Python harnesses)
pip install "kyvvu-engine[serve]"
Quickstart
from kyvvu_engine import KyvvuRunner
from kyvvu_engine.schemas import Behavior, EvalContext, StepType, Scope, Verb
runner = KyvvuRunner(
api_url="https://platform.kyvvu.com",
api_key="KvKey-…",
agent_key="customer-support-agent",
)
ctx = EvalContext(agent_id="agent-123", environment="production")
# 1. Preflight: evaluate the intended behaviour before executing.
intended = Behavior(
agent_id="agent-123",
task_id="task-abc",
scope=Scope.step,
step_type=StepType.step_model,
verb=Verb.POST,
step_name="chat_gpt-4o",
input={"user_message": "Hi, my SSN is 123-45-6789"},
)
result = runner.evaluate(intended, ctx)
# action == "allow" → returns normally
# action == "warn" → emits warnings.warn() and returns
# action == "block" → raises KyvvuBlockedError
# 2. Execute the step.
output = your_llm_call(intended.input)
# 3. Record the completed step. It becomes visible to future evaluate() calls.
runner.record(intended.model_copy(update={"output": output}))
# 4. Close the task when finished. No policies run here; this is cleanup.
runner.end_task("task-abc")
Mental model
The engine is a stateful decision machine.
┌──────────────────────────────────────────┐
policies ──▶ │ │
│ PolicyEngine │
intended ──▶ │ (zero-I/O, sub-ms decisions) │ ──▶ allow | warn | block
behaviour │ │ + per-policy outcomes
+ context │ internal state: per-task history │ + aggregate risk score
└──────────────────────────────────────────┘
Terminology
- A behaviour is an atomic action an agent takes. It is the smallest governable unit.
- A step is a triple
{input, behaviour, output}— an executed behaviour with its context. The completed history of a task is an ordered sequence of steps, also called a path. - An intended behaviour is
{input, behaviour, void}— a behaviour about to execute, with input but no output yet. This is whatevaluate()inspects. - A policy is an instantiation of a rule function with specific parameters, severity, and scope.
- The organisational context (
EvalContext) carries the agent's identity, classification, environment, user settings, and pre-fetched cross-task counts — everything rules may need that isn't in the behaviour or history itself.
What evaluate() does
- Reads the task's completed history from its internal tracker.
- Filters policies applicable to this agent and classification.
- Runs each applicable rule function, passing the flattened behaviour data, the policy's params, and a
RuleContextthat gives access to history and organisational context. - Each rule returns a boolean. Per-policy outcomes are weighted by severity and aggregated into a risk score
∈ [0.0, 1.0]. The default aggregator isaggregate_max(worst-case severity wins); this is pluggable. - The final risk score maps to an action:
0.0→allow,(0, 1)→warn,1.0→block.
Two evaluation points
- Agent registration — once, before the first task begins, via
evaluate_registration(). Agent-level policies run here (declared purpose, tool allowlist, classification). - Every atomic step — before each step executes, via
evaluate(). Step-level policies run here (path history, content, classification, rate limits).
Task completion is cleanup, not a decision point. end_task(task_id) evicts the task's history from memory and flushes buffered logs. It does not evaluate policies. To enforce task-end invariants, model them as policies on the task.end behaviour — templates emit a task.end Behavior that flows through the normal evaluate() path.
Runner action semantics
allow—evaluate()returns normally.warn—evaluate()emitswarnings.warn(...)and returns; incident webhook fires if configured.block—evaluate()raisesKyvvuBlockedError; incident webhook fires if configured. The caller can catch the exception to continue execution (retry, fall back, notify the user, abort the task).
Zero-I/O core
PolicyEngine never calls out, never queries a database, never fetches anything. KyvvuRunner is a wrapper that adds HTTP (fetching policies, flushing logs, firing incident webhooks). All network code is isolated in the io/ module; the core engine has no awareness of the network.
One engine per agent
Each KyvvuRunner is configured with a single agent_key and owns one PolicyEngine instance. Engines are per-agent by construction and are not designed to be shared across agents.
Atomic behaviours
Every action an agent takes is classified into one of 12 atomic behaviour types. Four describe the task lifecycle; eight describe the agent's moves within a task. Together with a scope (task or step) and an HTTP-style verb (GET, POST, PATCH, DELETE, or none), they form the canonical vocabulary the engine operates on.
| Step type | Scope | Valid verbs | What it represents |
|---|---|---|---|
task.start |
task |
— | A task begins. |
task.end |
task |
— | A task completes normally. |
task.error |
task |
— | A task terminates with an error. |
task.idle |
task |
— | The agent is idle within a task (heartbeat / keepalive). |
step.resource |
step |
GET/POST/PATCH/DELETE | Read or mutate an external resource (DB, file, API). |
step.message |
step |
GET/POST | Receive (GET) or send (POST) a message — user input, UI events, outbound communication. |
step.self |
step |
GET/POST/PATCH/DELETE | Read/write the agent's own internal state (memory, scratchpad, plan). |
step.model |
step |
POST | Send a prompt to an LLM and receive a completion. |
step.credential |
step |
GET | Retrieve a secret, token, or credential. |
step.exec |
step |
— | Execute code (run a script, call a function, shell out). |
step.gate |
step |
— | Cross a gate — a human approval, a policy check, a guardrail. |
step.unknown |
step |
— | Uncategorisable behaviour (template fallback). |
These combinations are enumerated in schemas.VALID_COMBINATIONS and enforced by Behavior's model validator. Any (step_type, scope, verb) tuple outside this set raises on construction.
task.* behaviours evaluate against step_execution-scoped policies, exactly like step.* behaviours. There is no separate "task_execution" policy scope. task.* behaviours are atomic steps that happen to sit at the boundaries of a task.
Task lifecycle events
task.start— emitted when the agent begins a task. Often the first behaviour in a path.task.end— normal completion.evaluate()runs as usual; a policy matching onstep_type == task.endcan check whole-task invariants.task.error— abnormal termination. History is evicted onend_task(task_id)identically totask.end. For forensic retention after errors, handle at the log-sink layer.task.idle— emitted periodically when the agent is idle but the task isn't over. Keeps rate-limit and working-hours rules accurate across long pauses. Does not trigger cleanup.
Properties
Everything beyond the (type, scope, verb) tuple lives in properties, a nested dict that policies can inspect. Properties distinguish a step.resource reading customer-data from one reading product-data — the type is the same, the property is different.
Standard property groups:
target— the thing being acted on (domain, resource URI, table name).auth— authentication scope (read, write, admin).data— payload classification (sensitive fields, size, schema).model— forstep.model: provider, model id, parameters.exec— forstep.exec: runtime, isolation level, side-effect class.guard— forstep.gate: gate type (human_approval, policy_check, static_check).message— forstep.message: channel, sender, recipient.usage— forstep.modeloutputs: prompt_tokens, completion_tokens, cost_usd.
Custom groups are permitted; the engine passes them through unchanged, and rule functions read them via dot-path accessors (_get_prop(data, "target.table")).
Worked example
A step.resource GET reading customer data with realistic properties:
Behavior(
agent_id="agent-123",
task_id="task-abc",
scope=Scope.step,
step_type=StepType.step_resource,
verb=Verb.GET,
step_name="read_customer_record",
input={"customer_id": "CUST-9981"},
properties={
"target": {
"system": "salesforce",
"table": "customer-data",
"object_id": "CUST-9981",
"domain": "internal.crm.acme.com",
},
"auth": {"scope": "read", "principal": "agent-123"},
"data": {"classification": "pii", "fields": ["name", "email", "phone"]},
},
)
The evaluation lifecycle
Three calls per step, one per task end.
1. evaluate(intended, context) → EvalResult
Called before a step executes. Reads the task's history, filters applicable policies, runs each rule, aggregates, returns. Does not modify history.
Outputs:
result.action—"allow","warn", or"block".result.risk_score— normalised[0.0, 1.0].result.policies— onePolicyResultper evaluated policy.
2. Execute the step
The caller runs the tool, the LLM, the database write. The engine has no opinion about execution.
3. record(step) → Behavior
Called after the step executes. Assigns a monotonic step number within the task and appends the completed Behavior (with output populated) to the task's history. Future evaluate() calls in the same task_id see this step.
4. end_task(task_id)
Called when the task terminates. Cleanup only — no policy evaluation. Evicts the task's history from memory and, via the runner, flushes any buffered step logs.
Calling end_task() for an unknown task_id is a no-op. A new task_id is a fresh history with no relationship to any previous task — histories are keyed by task_id.
Memory management
History lives in memory, keyed by task_id. Two mechanisms prevent unbounded growth:
- Normal termination:
end_task(task_id)evicts history explicitly. - Abandoned tasks:
runner.sweep_stale_tasks(), called periodically, evicts tasks older thanKV_TASK_MAX_AGE_SECONDS(default 3600s). Tasks that crash beforeend_task()are cleaned up this way.
Wire sweep_stale_tasks() into a background thread or scheduler in production.
Interaction diagram
agent: evaluate(intended) ───▶ engine: check policies against history + context
◀─── EvalResult{allow/warn/block, risk_score, policies}
agent: execute step, capture output
agent: record(completed_step) ─▶ engine: append to history, assign step number
◀─ Behavior{step=N, ...}
(repeat per step)
agent: end_task(task_id) ─────▶ engine: evict history, flush logs
Agent registration
Before the first task, agents register themselves with the Kyvvu platform. Registration is where agent-level policies are evaluated — declared purpose, tool allowlist, owner domain, classification consistency.
Registration policies have scope: "agent_registration" and run against the agent's metadata rather than a Behavior:
result = runner.evaluate_registration(
agent_data={
"name": "customer-support-agent",
"purpose": "Triage inbound customer tickets and draft responses",
"owner": "support-team@acme.com",
"declared_tools": ["zendesk_read", "llm_call"],
"risk_classification": "limited",
},
context=EvalContext(
agent_id="agent-123",
environment="production",
risk_classification="limited",
),
)
Semantics are identical to evaluate(): same EvalResult, same allow/warn/block, same runner behaviour (warn emits warnings.warn, block raises KyvvuBlockedError). The difference is which policies run — only those with scope=agent_registration.
Registration is typically called once at agent startup. A block at registration means the agent should not start at all — typically an illegally configured agent (empty purpose, disallowed tools, classification mismatch).
Rule functions
Rule functions are the unit of decidability. Each rule is a small pure Python function with the signature:
def rule(data: dict, params: dict, context: RuleContext) -> bool:
"""Return True if the policy passes; False if it is violated."""
A policy is an instantiation of a rule: policy = rule + params + (scope, severity, agent_id, risk_classification). The same rule backs many policies — field_matches_regex instantiated once for SSNs, once for credit cards, once for email domains.
Rule context
Every rule receives a RuleContext, the only surface through which rules read state beyond their own params:
context.agent_id,context.task_id,context.scope,context.now,context.hourcontext.get_current_agent() → AgentRecord | None— agent metadata.context.user_settings → dict | None— pre-fetched user preferences.context.get_previous_step() → Behavior | None— last completed step.context.get_all_steps_in_task() → List[Behavior]— full task history.context.count_steps_of_type(step_type: str) → int— counter helper.context.count_recent_nodes_across_executions(step_type, window_minutes, attribute_filter) → int— pre-fetched cross-task counts.
All surfaces are in-memory and pre-fetched. Rules perform no I/O.
Built-in rule functions
The engine ships with 26 built-in rules (as of v0.2.0) grouped into six categories. Each category lives in its own module (kyvvu_engine/rules/<category>.py) with a mirror test file.
Field rules (rules/field.py) — applicable to agent_registration and step_execution:
| Rule | What it checks |
|---|---|
field_not_empty |
Named field has a non-empty value. |
field_in_list |
Named field's value is in an allowlist. |
field_matches_regex |
Named field matches a regex pattern. |
Path rules (rules/path.py) — require history, step_execution only:
| Rule | What it checks |
|---|---|
step_directly_preceded_by |
Previous step in history has a given type. |
step_requires_predecessor |
Some earlier step in history has a given type. |
step_preceded_by_without_intervening |
A required predecessor exists with no forbidden steps between. |
step_requires_dedicated_predecessor |
Immediate predecessor matches type and property filter. |
step_requires_gate |
A step.gate precedes this step. |
sequence_forbidden |
A forbidden ordered sequence has not occurred. |
step_not_after |
This step type is forbidden once a specified predecessor has occurred (permanently tainted). |
history_contains |
The history contains a step matching type + optional verb + optional property filter. |
current_is |
The intended behaviour matches type + optional verb + optional property filter. |
Count rules (rules/count.py):
| Rule | What it checks |
|---|---|
execution_max_steps |
Task has not exceeded a maximum step count. |
max_consecutive_same_type |
No run of the same step type exceeds a limit. |
cross_execution_rate_limit |
This agent has not exceeded N of this step_type in the last M minutes across tasks. |
usage_budget |
Cumulative usage metric (tokens, cost) across task history has not exceeded budget. |
Classification rules (rules/classification.py):
| Rule | What it checks |
|---|---|
step_forbidden_for_classification |
This step type is not permitted for the agent's risk classification. |
working_hours_only |
Current time is within a permitted window. Supports overnight wraparound and timezones. |
step_name_in_allowlist |
This step's name is in the agent's declared tool allowlist. |
Content rules (rules/content.py):
| Rule | What it checks |
|---|---|
pii_in_request |
Step input does not contain PII matching configured regex patterns. Patterns are required; no defaults. |
domain_allowlist |
Step's target domain is in an allowlist. |
Flow rules (rules/flow.py):
| Rule | What it checks |
|---|---|
conditional_successor_required |
If a condition held at some prior step, a specific successor must eventually follow. |
tainted_path_block |
If any prior step is tainted, certain downstream steps are forbidden. |
all_of |
Compound: passes iff all sub-conditions pass. |
any_of |
Compound: passes iff any sub-condition passes. |
not |
Compound: passes iff the sub-condition fails. |
Each rule exposes a description, parameter schema, and example parameters programmatically:
from kyvvu_engine import PolicyRule
metadata = PolicyRule.get_all_rules(scope="step_execution")
# → {"field_not_empty": {"description": "...", "scopes": [...], "params_schema": {...}}, ...}
The table above is derived from this metadata.
Compound policies
The three compound rules accept sub-conditions as params. Compound rules recurse freely: all_of can contain any_of can contain not can contain a primitive.
Important: rule functions return True to pass and False to block. This means all_of returns True (passes) when all sub-conditions are met. If your intent is "block when conditions A, B, and C are all present," you need not(all_of(A, B, C)) — the all_of detects the dangerous combination, and the not inverts it into a block. Using bare all_of for a blocking trigger is a common authoring mistake: it would block every step where any condition is not met, which is the opposite of what you want.
Example: "If the agent has read customer-data AND product-data AND called a model, then POSTing a message requires a human-approval gate":
{
"name": "PII + product data + model requires human approval",
"rule_type": "all_of",
"params": {
"conditions": [
{"rule_type": "current_is",
"params": {"step_type": "step.message", "verb": "POST"}},
{"rule_type": "history_contains",
"params": {"step_type": "step.resource", "verb": "GET",
"property_filter": {"target.table": "customer-data"}}},
{"rule_type": "history_contains",
"params": {"step_type": "step.resource", "verb": "GET",
"property_filter": {"target.table": "product-data"}}},
{"rule_type": "history_contains",
"params": {"step_type": "step.model"}},
{"rule_type": "not",
"params": {"condition": {"rule_type": "step_requires_gate",
"params": {"target_step_types": ["step.message"],
"target_verb": "POST",
"gate_check_type": "human_approval"}}}}
]
},
"severity": "critical",
"scope": "step_execution"
}
Incidents from a failed compound policy carry one incident with the condition tree in violation_details.
Rule-specific notes
step_requires_gate— the gate may be any distance earlier in history; this rule does not enforce gate freshness. For fresh-approval semantics, compose withstep_directly_preceded_by.step_not_after— once any forbidden predecessor has occurred, the target is blocked for the rest of the task (tainted-path semantics).working_hours_only— acceptstimezone: str(IANA name); falls back to UTC. Supports overnight windows (start_hour=22,end_hour=6).pii_in_request—patternsparam is required. Step input is serialised viajson.dumpsso nested dicts are scanned correctly.usage_budget— sums a numeric property from completed steps in history and blocks when the cumulative value exceeds the budget. The first occurrence is always allowed; only subsequent steps see an accumulating total.
Writing your own rule function
Registering a rule
from kyvvu_engine import PolicyRule
@PolicyRule.register(
name="step_name_forbidden",
description="The step's name must not match a forbidden pattern.",
params_schema={
"patterns": {"type": "array", "required": True, "description": "Regex patterns"},
},
scopes=["step_execution"],
example_params={"patterns": ["dangerous_tool"]},
)
def check_step_name_forbidden(data, params, context):
import re
name = data.get("step_name", "")
for pattern in params["patterns"]:
if re.match(pattern, name):
return False
return True
The rule is immediately available as a rule_type in any policy definition. The Kyvvu platform UI discovers it via PolicyRule.get_all_rules() and renders a form from params_schema.
Rules must live in the appropriate module under kyvvu_engine/rules/ and must have a mirror test in tests/rules/. Tests use PolicyEngine directly:
# tests/rules/test_field_rules.py
from datetime import datetime
from kyvvu_engine import PolicyEngine
from kyvvu_engine.schemas import Behavior, EvalContext, Scope, StepType, Action
def test_step_name_forbidden_blocks_matching_name():
engine = PolicyEngine()
engine.load_policies([{
"id": 1, "name": "no-dangerous", "scope": "step_execution",
"rule_type": "step_name_forbidden",
"params": {"patterns": [r"^dangerous_tool"]},
"severity": "critical", "enabled": True,
}])
b = Behavior(
agent_id="a", task_id="t", timestamp=datetime(2026, 1, 1),
scope=Scope.step, step_type=StepType.step_exec,
step_name="dangerous_tool_v2",
)
result = engine.evaluate(b, EvalContext(agent_id="a", task_id="t", environment="prod"))
assert result.action == Action.block
New rules without matching tests fail CI.
Worked example: token-usage / cost budget
To block an agent once it has spent a budget on LLM calls within a task, use usage_budget:
{
"name": "Per-task $5 LLM budget",
"rule_type": "usage_budget",
"params": {"step_type": "step.model",
"property_path": "usage.cost_usd",
"budget": 5.0},
"severity": "high",
"scope": "step_execution"
}
This sums properties.usage.cost_usd across completed step.model behaviours in the current task; once the total exceeds 5.0, further model calls are blocked. The same pattern works for tokens (property_path: "usage.total_tokens", budget: 100000) or any numeric property templates emit.
The two-tier API
PolicyEngine — the pure core
Zero I/O. Zero logging config. Only dependency: Pydantic. For embedding, for running policies from an in-memory store, and for unit-testing policy logic.
| Method | Purpose |
|---|---|
load_policies(policies: List[dict]) → None |
Replace the active policy set. Idempotent. |
evaluate(intended: Behavior, context: EvalContext) → EvalResult |
Preflight a step. |
evaluate_registration(agent_data: dict, context: EvalContext) → EvalResult |
Evaluate agent-registration policies. |
record(step: Behavior) → Behavior |
Append a completed step to history; assigns step number. |
end_task(task_id: str) → None |
Evict a task's history from memory. |
get_history(task_id: str) → List[Behavior] |
Read the task's completed steps (snapshot). |
evaluate_and_record(intended, context, output=None) → EvalResult |
Convenience: evaluate; if not blocked, record with the given output. |
explain(intended, context) → str |
Human-readable per-policy evaluation trace. |
policy_count() → int |
Number of loaded policies (diagnostic). |
validate_rule_params(rule_type, params) → (bool, str | None) |
Check a rule name is registered. |
KyvvuRunner — the I/O wrapper
PolicyEngine + HTTP + log buffering. For use when policies come from the Kyvvu platform.
| Method | Purpose |
|---|---|
fetch_policies() → None |
Force a policy refresh (ignores TTL). |
sweep_stale_tasks(max_age_seconds=None) → int |
Evict abandoned task buffers. |
policy_status() → PolicyStatus |
Policy cache status: loaded, stale, source (api/disk_cache/none), timestamps, policy count, TTL remaining. |
settings (property) |
The resolved KyvvuSettings. |
All PolicyEngine methods are available on KyvvuRunner with the same names. KyvvuRunner.evaluate() additionally:
- Ensures policies are loaded (fetches if TTL expired).
- Emits
warnings.warn()onwarn. - Fires the incident webhook on
warnorblock(if configured). - Raises
KyvvuBlockedErroronblock.
HTTP endpoints (the runner)
KyvvuRunner makes up to three kinds of HTTP requests. All endpoints are configurable. The log endpoint defaults to stdout (JSON-line output to the terminal for development). The incident webhook is off by default. Set KV_LOG_ENDPOINT= (empty string) to disable log output entirely. Both endpoints accept stdout as a value for local debugging.
Authentication: all requests carry Authorization: Bearer <api_key>. The instance identifier is sent as both ?instance={instance_id} in the query string and X-Kyvvu-Instance-Id: {instance_id} in a header. Both carry the same value.
1. GET /api/v1/policies — policy fetch
Called on first use and whenever the policy TTL expires (default 300 seconds).
GET {api_url}/api/v1/policies?agent_key={agent_key}&instance={instance_id}&enabled=true&limit=1000
Authorization: Bearer {api_key}
X-Kyvvu-Instance-Id: {instance_id}
Response: JSON array of PolicyDefinition dicts:
[
{
"id": 1,
"name": "No PII to external LLMs",
"scope": "step_execution",
"rule_type": "pii_in_request",
"params": {"patterns": ["\\d{3}-\\d{2}-\\d{4}"]},
"severity": "critical",
"enabled": true,
"agent_id": null,
"risk_classification": null
}
]
Fields consumed at evaluation time: id, name, scope, rule_type, params, severity, enabled, agent_id, risk_classification.
Network failures are logged and swallowed. The runner falls back to the previously loaded policy set and retries after the TTL. The TTL clock is stamped on failure to prevent a down API from causing every evaluate() call to block on a re-fetch attempt.
HMAC verification (opt-in). When KV_POLICY_HMAC_SECRET is set on both the engine and the API, the API computes HMAC-SHA256(secret, response_body) and includes it in the X-Kyvvu-Policy-Signature header. The engine verifies the signature on receipt. If the signature is missing or invalid, the fetch is rejected and cached policies are kept. This prevents policy tampering by a compromised proxy or MITM within the internal network.
Disk cache (opt-in). When KV_POLICY_CACHE_PATH is set, the runner writes the fetched policies to disk after each successful fetch (atomic write via temp file + rename). On cold start, if the API is unreachable, the runner loads policies from this disk cache. A staleness warning is emitted if the cache exceeds KV_POLICY_CACHE_MAX_AGE_SECONDS (default 24h), but the cache is still used.
Fail-mode. When KV_POLICY_FAIL_MODE=closed, the runner blocks all step_execution behaviors if no policies could be loaded (from API or disk cache). Default is open (current behavior — allow all when no policies are available).
2. POST {log_endpoint} — step log flush
Called on end_task() when KV_LOG_ENDPOINT is configured and steps are buffered.
POST {log_endpoint}
Authorization: Bearer {api_key}
X-Kyvvu-Instance-Id: {instance_id}
Content-Type: application/json
{
"agent_id": "agent-123",
"task_id": "task-abc",
"steps": [
{
"step_type": "step.model",
"verb": "POST",
"step_name": "chat_gpt-4o",
"properties": {"model": {"provider": "openai", "name": "gpt-4o"},
"usage": {"total_tokens": 1250}},
"meta": null,
"input": {"user_message": "..."},
"output": {"response": "..."},
"timestamp": "2026-04-23T10:00:00+00:00"
}
]
}
Payload redaction. For GDPR-sensitive environments, set KV_LOG_PAYLOADS=metadata_only. In this mode, each step's input and output fields are replaced with {"redacted": true, "keys": [...], "length": N} — shape preserved, content stripped. Default is full.
Response: {"steps_logged": N, "hash_tail": "..."} — only these two fields are consumed. HTTP errors are logged at WARNING and swallowed.
3. POST {incident_endpoint} — incident webhook
Fired from evaluate() or evaluate_registration() when the action is warn or block. Off unless KV_INCIDENT_ENDPOINT is set.
Step-execution incident:
{
"agent_id": "agent-123",
"scope": "step_execution",
"task_id": "task-abc",
"step_name": "chat_gpt-4o",
"step_type": "step.model",
"action": "block",
"risk_score": 1.0,
"violations": [
{
"policy_name": "No PII to external LLMs",
"severity": "critical",
"details": {"matched_pattern": "\\d{3}-\\d{2}-\\d{4}"}
}
],
"timestamp": "2026-04-23T10:00:00+00:00"
}
Agent-registration incident: same shape with scope: "agent_registration" and no task_id / step_name / step_type.
Response: status code only; body is ignored. Errors are logged at WARNING and swallowed.
Configuration
KyvvuRunner is configured via KyvvuSettings. Three equivalent patterns:
# Explicit kwargs
runner = KyvvuRunner(api_url="…", api_key="…", agent_key="…")
# Shared settings object
settings = KyvvuSettings(api_url="…", api_key="…")
runner = KyvvuRunner(settings=settings)
# Pure env-var driven
# export KV_API_URL=… KV_API_KEY=… KV_AGENT_KEY=…
runner = KyvvuRunner()
Precedence (highest to lowest): explicit kwargs → environment variables → .env in cwd → built-in defaults.
Authentication and identity
| Setting | Env var | Default | Purpose |
|---|---|---|---|
api_url |
KV_API_URL |
http://localhost:8000 |
Base URL of the Kyvvu platform API. |
api_key |
KV_API_KEY |
— | Bearer API key. Required for policy fetch. |
agent_key |
KV_AGENT_KEY |
— | Stable agent identifier used to fetch policies. |
instance_id |
KV_INSTANCE_ID |
auto-generated | Identifier for this runner instance. |
Endpoints (output endpoints off by default)
| Setting | Env var | Default | Purpose |
|---|---|---|---|
log_endpoint |
KV_LOG_ENDPOINT |
stdout |
URL for HTTP batch logging, stdout for JSON-line logs, or empty string to disable. |
incident_endpoint |
KV_INCIDENT_ENDPOINT |
unset → disabled | URL for incident webhooks, or stdout for JSON-line incidents. |
Behaviour
| Setting | Env var | Default | Purpose |
|---|---|---|---|
environment |
KV_ENV |
production |
Forwarded to EvalContext.environment. |
log_payloads |
KV_LOG_PAYLOADS |
full |
full includes step input/output; metadata_only redacts them. |
Cache and limits
| Setting | Env var | Default | Purpose |
|---|---|---|---|
policy_ttl_seconds |
KV_POLICY_TTL_SECONDS |
300 |
How long to cache fetched policies. |
http_timeout_seconds |
KV_HTTP_TIMEOUT_SECONDS |
10 |
Per-request HTTP timeout. |
task_max_age_seconds |
KV_TASK_MAX_AGE_SECONDS |
3600 |
Abandoned-task eviction threshold for sweep_stale_tasks(). |
Resilience
| Setting | Env var | Default | Purpose |
|---|---|---|---|
fail_mode |
KV_POLICY_FAIL_MODE |
open |
open = allow all when no policies loaded; closed = block all step_execution behaviors. |
policy_cache_path |
KV_POLICY_CACHE_PATH |
empty (disabled) | File path for on-disk policy cache. Written after each successful fetch; loaded on cold start if API is down. |
policy_cache_max_age_seconds |
KV_POLICY_CACHE_MAX_AGE_SECONDS |
86400 |
Max age (seconds) of disk cache before a staleness warning. Cache is still used when stale. |
policy_hmac_secret |
KV_POLICY_HMAC_SECRET |
empty (disabled) | Shared secret for HMAC-SHA256 verification of the X-Kyvvu-Policy-Signature header on policy fetch responses. |
Logging
| Setting | Env var | Default | Purpose |
|---|---|---|---|
log_level |
KV_LOG_LEVEL |
WARNING |
Log level for kyvvu / kyvvu_engine loggers. |
Instance identification
Each runner instance gets a unique instance_id to disambiguate observability across horizontally scaled agents:
- If
KV_INSTANCE_IDis set (e.g. injected by Kubernetes as a pod name), a random 5-character suffix is appended to prevent collisions when orchestrators reuse names:KV_INSTANCE_ID=worker-3becomesworker-3-a8f92. - If
KV_INSTANCE_IDis unset, a random UUID is generated at runner construction time and remains stable for the runner's lifetime.
The instance_id is sent on every HTTP request as both a query parameter (?instance=...) and a header (X-Kyvvu-Instance-Id: ...).
Policy fetch resilience
The runner provides four opt-in mechanisms to harden policy delivery. All are backward compatible — when unconfigured, the runner behaves exactly as before.
Fail-open vs fail-closed
By default, the runner operates in fail-open mode: if no policies can be loaded, all steps are allowed. This keeps agents running during API outages.
For high-risk production deployments, set KV_POLICY_FAIL_MODE=closed. In this mode, if the engine has zero policies (no API, no disk cache), evaluate() raises KyvvuBlockedError with a synthetic no_policies_available violation. The agent must handle this — typically by pausing work until policies are restored.
Disk cache
Set KV_POLICY_CACHE_PATH=/var/lib/kyvvu/policy-cache.json to enable the on-disk policy cache.
- Write: After each successful API fetch, policies are written to disk atomically (temp file +
os.replace). Concurrent readers never see a partial file. - Read: On cold start, if the API fetch fails and the engine has zero policies, the runner loads from the disk cache. A staleness warning is emitted if the cache exceeds
KV_POLICY_CACHE_MAX_AGE_SECONDS(default 24 hours). - The disk cache is a fallback only — in-memory policies from the API always take precedence.
- When the API recovers, fresh policies replace the disk-cached set.
HMAC policy signing
Set KV_POLICY_HMAC_SECRET to the same value on both the API and the engine. The API computes HMAC-SHA256(secret, response_body) and sends it in the X-Kyvvu-Policy-Signature response header. The engine verifies the signature; if it is missing or invalid, the fetch is rejected and cached policies are kept.
This prevents a compromised proxy from silently modifying policies to weaken enforcement (e.g. disabling a critical rule) — even on networks where TLS is terminated upstream.
Policy status observability
runner.policy_status() returns a PolicyStatus object with programmatic fields:
| Field | Type | Meaning |
|---|---|---|
loaded |
bool |
True if policies have been loaded at least once. |
stale |
bool |
True if last fetch failed and cache has exceeded TTL. |
source |
str |
"api", "disk_cache", or "none". |
last_success |
datetime | None |
Wall-clock time of last successful fetch. |
last_attempt |
datetime | None |
Wall-clock time of last fetch attempt (success or failure). |
policy_count |
int |
Number of active policies. |
ttl_remaining_seconds |
float |
Seconds until cache expires and a re-fetch is triggered. |
Use this in health checks, observability dashboards, or agent startup logic to decide whether to proceed when policies are stale.
Debugging and explainability
Set KV_LOG_LEVEL=DEBUG for full per-evaluation traces:
kyvvu_engine.engine DEBUG load_policies(): loaded 8/8 policies (0 dropped)
kyvvu_engine.engine.load DEBUG policy id=1 name='no_pii' rule_type=pii_in_request severity=critical scope=step_execution
kyvvu_engine.engine.load DEBUG policy id=2 name='domain_allowlist' rule_type=domain_allowlist severity=medium scope=step_execution
...
kyvvu_engine.engine DEBUG evaluate(): agent_id=agent-123 task_id=task-abc step_type=step.model verb=POST
kyvvu_engine.engine.eval DEBUG policy 'no_pii': rule=pii_in_request → FAIL
kyvvu_engine.engine.eval DEBUG policy 'domain_allowlist': rule=domain_allowlist → pass
kyvvu_engine.engine DEBUG evaluate(): agent_id=agent-123 step_type=step.model → action=block risk_score=1.00 (2 policies)
DEBUG-level output includes every policy loaded (on load_policies) and every policy's result (on each evaluate). If a policy does not appear here, the platform did not send it.
For structured JSON logging:
from kyvvu_engine import setup_logging
setup_logging(level="DEBUG", json=True)
For human-readable per-evaluation traces:
print(engine.explain(intended, context))
Evaluated 8 policies for step.model/POST "chat_gpt-4o" (task=task-abc step=5):
✓ domain_allowlist (medium) passed
✗ pii_in_request (critical) FAILED: matched \d{3}-\d{2}-\d{4}
✓ step_requires_gate (high) passed
...
→ action=block (risk_score=1.00)
For compound rules, explain() renders the condition tree with pass/fail at each node.
Performance
The engine is designed for sub-millisecond evaluation on the hot path. Targets are indicative — actual numbers are machine-dependent and are measured per-release via tests/test_latency.py. Tests use absolute thresholds (e.g. p99 < 10 ms) as hard gates to catch catastrophic regressions while tolerating normal CI variance.
| Scenario | Target (p95) |
|---|---|
| Evaluate with 0 policies | < 50 µs |
| Evaluate with 10 policies, empty history | < 200 µs |
| Evaluate with 10 policies, 20-step history | < 500 µs |
End-to-end latency including KyvvuRunner.evaluate() is dominated by network I/O when a policy refresh or incident webhook fires; the engine-only numbers are the floor.
Run benchmarks locally:
pip install -e ".[dev]"
pytest tests/test_latency.py -v -s
Running as a standalone service
For callers that aren't Python, kyvvu-engine runs as a local HTTP server. Install the SDK (which includes the engine) and use the kyvvu serve command:
pip install kyvvu
kyvvu serve --host 127.0.0.1 --port 8080 --agent-key my-agent
CLI arguments:
| Flag | Default | Purpose |
|---|---|---|
--host |
127.0.0.1 |
Bind address. |
--port |
8080 |
Bind port. |
--agent-key |
from KV_AGENT_KEY |
Agent key for policy fetch. |
--api-url |
from KV_API_URL |
Kyvvu platform API URL. |
--api-key |
from KV_API_KEY |
Bearer API key. |
All KV_* environment variables and .env files work identically to KyvvuRunner.
Endpoints
| Method | Path | Wraps | Purpose |
|---|---|---|---|
GET |
/health |
policy_status() |
Liveness probe — returns PolicyStatus JSON. |
POST |
/evaluate |
evaluate() |
Preflight evaluation of an intended behaviour. |
POST |
/register_agent |
evaluate_registration() |
Evaluate agent-registration policies. |
POST |
/record |
record() |
Record a completed step. Returns {"step": <int>, "task_id": "<str>"}. |
POST |
/end_task |
end_task() |
Close a task — evict history and flush logs. Returns {"status": "ok", "task_id": "<str>"}. |
Example
curl http://127.0.0.1:8080/health
{"loaded": true, "stale": false, "source": "api",
"policy_count": 8, "last_success": "2026-04-24T10:00:00+00:00",
"last_attempt": "2026-04-24T10:00:00+00:00",
"last_fetch_at": "2026-04-24T10:00:00+00:00",
"last_fetch_succeeded": true, "instance_id": "worker-3-a8f92",
"ttl_remaining_seconds": 280.5}
curl -X POST http://127.0.0.1:8080/evaluate \
-H "Content-Type: application/json" \
-d '{
"intended": {
"agent_id": "agent-123",
"task_id": "task-abc",
"scope": "step",
"step_type": "step.model",
"verb": "POST",
"step_name": "chat_gpt-4o",
"input": {"user_message": "Hello"}
},
"context": {
"agent_id": "agent-123",
"task_id": "task-abc",
"environment": "production",
"risk_classification": "limited"
}
}'
Response:
{
"action": "allow",
"risk_score": 0.0,
"policies": [
{"policy_id": 1, "name": "pii_in_request", "severity": "critical",
"violated": false, "violation_details": null}
],
"blocked": false
}
When a policy blocks, blocked is true and action is "block". The server never returns a non-200 status for policy decisions — the caller reads blocked to decide whether to proceed.
The serve layer inherits the runner's sink configuration. Set KV_LOG_ENDPOINT=stdout and/or KV_INCIDENT_ENDPOINT=stdout to emit JSON-line output for local debugging without an API backend.
The Python SDK uses KyvvuRunner directly and does not need this server.
Multi-agent and branching patterns
The engine is framework-agnostic. It does not know LangGraph, AutoGen, or CrewAI exist. Multi-agent and branching patterns are handled entirely in the kyvvu SDK via behavioural templates — the engine evaluates policies against whatever Behavior objects templates emit.
The engine commits to two conventions for template authors:
Reserved meta keys. When a Behavior represents a step in a subtask, the template sets:
meta.parent_task_id— the task_id of the invoking parent task.meta.parent_agent_id— the agent_id of the invoking parent agent.
Rules can read these via dot-path accessors. No rule primitives specific to multi-agent reasoning are required — the generic compound rules (all_of / any_of / not) plus history_contains cover the cases.
Cross-subtask aggregation. Policies that reason across sibling branches or parent/child tasks use EvalContext.cross_execution_counts, pre-fetched by the platform aggregating over parent_task_id. This is the same mechanism cross_execution_rate_limit uses.
The engine does not track branching paths as a DAG; histories are linear per task_id. If a DAG-aware history model is needed, it is a future-version change — sibling-subtask modelling is sufficient in the cases encountered so far.
Stability and versioning
Semantic versioning. The public API surface is:
PolicyEngineand its documented methods.KyvvuRunnerand its documented methods.KyvvuSettingsand its documented fields.Behavior,EvalContext,EvalResult,PolicyResult,PolicyDefinition,PolicyStatus,AgentRecord,Action,Scope,StepType,Verb.PolicyRuleand the names of the 26 built-in rules.- Aggregators:
aggregate_max,aggregate_mean,aggregate_weighted_sum. KyvvuBlockedError,KyvvuConfigError.setup_logging.
Everything else (internal helpers, underscore-prefixed modules, deeper import paths) is private and may change between minor versions.
- Before 1.0: minor versions may introduce breaking changes with a CHANGELOG entry.
- From 1.0: breaking changes require a major-version bump.
See also
- Runtime Governance for AI Agents: Policies on Paths — formal model.
- docs.kyvvu.com — platform documentation.
- The
kyvvuSDK package — behavioural templates mapping framework events to theBehaviorvocabulary.
Licence
kyvvu-engine is source-available under the Business Source License 1.1
(BSL 1.1). It is not open source in the OSI sense.
- Free use is permitted for development, testing, research, evaluation, and personal non-commercial purposes.
- Production use requires a Kyvvu commercial subscription or a separate license agreement with Kyvvu B.V.
- Each release converts to Apache License 2.0 four years after its publication date.
See LICENSE in this directory for the full terms.
Commercial licences: licensing@kyvvu.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kyvvu_engine-0.3.0.tar.gz.
File metadata
- Download URL: kyvvu_engine-0.3.0.tar.gz
- Upload date:
- Size: 114.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cfa059b12417acb0c25addb5da03a951340d87da4eaa85a4ef02ff6c8f3baa17
|
|
| MD5 |
8c7c4d801aef73a46a0c3b5ff94e3593
|
|
| BLAKE2b-256 |
75bcec18722cab4dfceac1c5ed7314a50325900fd839efbefc08757d6eb68f42
|
File details
Details for the file kyvvu_engine-0.3.0-py3-none-any.whl.
File metadata
- Download URL: kyvvu_engine-0.3.0-py3-none-any.whl
- Upload date:
- Size: 78.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d097c9fe21c6df21bf270ce30c6e49d2edab0396ba6839a6a92ffd3894089f97
|
|
| MD5 |
01dc78dd7f744db75e64d4d5b25553ed
|
|
| BLAKE2b-256 |
afa718ec0a24ccf1e87b92f3099836fb79e64072660d456de0ca2d847e21c650
|