CFA — Governed execution for AI agents and data systems
Project description
CFA — Contextual Flux Architecture
A typed, pre-execution governance gate for AI agents and data pipelines.
You declare what you intend to do as a StateSignature. CFA answers
approve, replan(remediations), or block(reason) — deterministically —
in under 3 ms p99 on a warm kernel, and writes the decision into a
SHA-256 hash chain you can verify offline with cfa audit verify. No
network. No server. No keys.
Why CFA exists
Six things CFA does today that no adjacent tool gives you together:
-
Structured remediation, not just yes/no. When a fixable rule fails, CFA returns the fix as data. The caller — an LLM agent, a CI step, a human — applies it and retries. The recovery loop is part of the contract, bounded at three attempts, and audited.
{ "action": "replan", "faults": [{"code": "GOVERNANCE_RAW_PII_IN_PROTECTED_LAYER", ...}], "interventions": [ "Set constraints.no_pii_raw=True", "Apply sha256() on PII columns before the join" ] }
-
Offline-verifiable audit chain. Every decision is a content-hashed event linked into a SHA-256 chain.
cfa audit verifyreplays the chain on any host that has the JSONL file. No vendor, no server, no API key.$ cfa audit verify --file audit.jsonl OK · 1 274 events verified · last_hash=a4f3…6c01
-
Dataset-aware policy primitives baked in. PII columns, partitioning, classification, merge keys, target layer — these are first-class primitives, not metadata you re-encode in Rego. A typical rule fits in six YAML lines:
- name: forbid_raw_pii condition: pii_in_protected_layer action: block fault_code: GOVERNANCE_RAW_PII severity: critical remediation: ["Apply sha256() on PII columns before the write"]
-
One signature, three production backends. The same approved
StateSignaturecompiles to PySpark + Delta Lake, ANSI SQL withMERGE INTO, or dbt models withschema.yml. Each backend declares its own forbidden tokens for static validation. New backends register throughBackendRegistrywithout touching the kernel. -
MCP server, working today. Any MCP-compatible agent (Claude Desktop, Cursor, Continue, custom LangGraph nodes) calls CFA before it touches production data. Five tools:
cfa_evaluate_signature,cfa_describe_rules,cfa_explain_fault,cfa_audit_check,cfa_list_backends. -
Deterministic by default; LLM is opt-in. The decision path is a pure function of
(signature, policy_bundle, catalog). Same inputs produce the same decision and the same hash, every time, with no network call. LLMs participate only on the front edge (intent → signature) and only if you ask for them via the[llm]extra.
Each of these is a recorded Architecture Decision Record. The reasoning, the alternatives we rejected, and the boundaries are written down.
Quick start
pip install cfa-kernel
cfa init
cfa evaluate "Join NFe with Clientes and persist to Silver" \
--catalog .cfa/catalog.json
For a real CI gate, the four-line decorator form:
from cfa.adapters import cfa_guard
@cfa_guard("Join NFe with Clientes anonymize CPF persist Silver",
policy_bundle="policies/prod-v1.yaml", catalog=CATALOG)
def my_pipeline(): ...
The decorator caches a single KernelOrchestrator per guard and adds
~2.4 ms p99 to your call. Production-friendly.
Where CFA pairs (instead of replacing)
CFA is not an LLM observability tool, a generic policy engine, a data catalog, or a data-quality-at-rest tool. Pair with LangSmith / Phoenix / Patronus, OPA, Unity Catalog / Atlan / DataHub, and Great Expectations / Soda respectively. The Compare page has the side-by-side breakdowns.
What CFA does
| Step | What happens |
|---|---|
| Formalize | Natural language or JSON → typed StateSignature contract |
| Govern | Policy Engine evaluates PII, cost, schema, partition constraints |
| Generate | Execution planner + deterministic code generation (PySpark, SQL, dbt) |
| Execute | Pluggable sandbox with metrics collection + runtime validation |
| Validate | State projection, SHA-256 audit trail, lifecycle indices |
Surfaces
All interfaces are backend-agnostic. CFA evaluates a StateSignature contract — however it was produced.
| Surface | For | Example |
|---|---|---|
cfa CLI |
Everyone | cfa policy check --signature sig.json |
cfa catalog CLI |
Data platform teams | cfa catalog validate catalog.json |
cfa policy CLI |
Security/compliance | cfa policy validate policies/prod.yaml |
cfa storage CLI |
Operations | cfa storage stats --db cfa.db |
cfa lifecycle CLI |
Platform teams | cfa lifecycle evaluate --db cfa.db |
cfa signature CLI |
External systems | cfa signature validate request.json |
cfa.testing |
CI/CD | evaluate("intent", catalog=catalog) with pytest |
cfa.runtime |
Production | RuntimeGate as decorator/context-manager |
cfa.mcp |
AI agents | MCP server for any MCP-compatible client |
cfa.adapters |
Any framework | Universal cfa_guard decorator (LangGraph, CrewAI, AutoGen, DSPy, OpenAI Agents SDK) |
Architecture
CLI / MCP / Adapter / API
│
▼
┌─ Formalize ──┐ NL / JSON / Tool call → typed StateSignature contract
├─ Govern ──────┤ Policy check + REPLAN cycle (approve / replan / block)
├─ Generate ────┤ Plan + code (PySpark / SQL / dbt) + static validation
├─ Execute ─────┤ Pluggable sandbox + runtime validation
└─ Validate ────┘ State projection + SHA-256 audit + lifecycle indices
│
▼
Decision JSON / Audit Trail / OTel / Prometheus
Capabilities
| Capability | What it gives you |
|---|---|
| SHA-256 audit trail | Tamper-evident chain of decisions, verifiable offline (cfa audit verify) |
| State projection | Each execution carries the typed state of the prior one — no implicit globals |
| Lifecycle indices (IFo/IFs/IFg/IDI) | Quantifies how often an intent recurs, stabilizes, and qualifies for promotion to a reusable skill |
| REPLAN cycle | Failed policy checks emit a structured remediation, not a hard stop |
| Backend-agnostic codegen | Same signature compiles to PySpark, ANSI SQL, or dbt — pluggable via BackendRegistry |
| Artifact hashing | Catalog, policy bundle, and signature are content-hashed and bound to every decision |
| MCP protocol | Any MCP-compatible agent can call CFA as a governance tool |
| SQLite + JSONL storage | First-class persistence with stats, retention cleanup, and vacuum |
| Config auto-discovery | cfa.yaml walked up the tree; all CLI commands respect it |
| Zero core dependencies | Optional extras for yaml, otel, mcp, llm — none required for the kernel |
CLI
# Governance & evaluation
cfa evaluate "intent" --catalog catalog.json --strict
cfa policy check --signature signature.json --policy-bundle policies/prod.yaml
cfa policy check --signature sig.json --catalog cat.json --strict --audit-log audit.jsonl
# Validation (CI-ready with JSON output and exit codes)
cfa catalog validate catalog.json --require-datasets --format json
cfa signature validate signature.json --format json
cfa policy validate policies/prod.yaml --format json
# Audit & verification
cfa audit show --id INTENT_ID --file audit.jsonl --format json
cfa audit verify --file audit.jsonl
# Policy rules
cfa rules list
cfa rules explain FAULT_CODE
# Storage management
cfa storage stats --db cfa.db --format json
cfa storage cleanup --db cfa.db --retention 90
cfa storage vacuum --db cfa.db
# Lifecycle management
cfa lifecycle evaluate --db cfa.db --window 30
cfa lifecycle list --db cfa.db
# Project health
cfa status --format json
# Bootstrap
cfa init
# Backends
cfa backend list
From Python
from cfa.testing import evaluate, assert_passed
result = evaluate(
"Join NFe with Clientes and persist to Silver",
catalog=MY_CATALOG,
policy_rules=my_rules,
backend="pyspark",
)
assert_passed(result)
Policy check with audit
from cfa.policy.engine import PolicyEngine
from cfa.types import StateSignature
signature = StateSignature.from_dict(signature_dict)
engine = PolicyEngine(policy_bundle_version="prod-v1.0")
result = engine.evaluate(signature)
# result.action → approve / replan / block
Runtime gate
from cfa.runtime import RuntimeGate, GateConfig
gate = RuntimeGate(
config=GateConfig(policy_bundle="prod_v1.0", sandbox="mock"),
catalog=PROD_CATALOG,
)
@gate.guard("aggregate sales with PII protected")
def my_pipeline():
...
SQLite storage
from cfa.storage import SqliteStorage
store = SqliteStorage("cfa.db")
store.ensure_schema()
# Audit
store.audit_append(event)
# Execution records (lifecycle)
store.execution_append(record_dict)
# Lifecycle skills
store.skill_upsert("hash_a", skill_data)
Policy Bundles
Declarative YAML policy rules — separate governance from code:
# policies/prod-v1.yaml
policy_bundle:
version: "prod-v1.0"
rules:
- name: forbid_raw_pii
condition: pii_in_protected_layer
action: block
fault_code: GOVERNANCE_RAW_PII
severity: critical
message: "PII in protected layer without anonymization."
remediation:
- "Apply sha256 on PII columns before the operation"
Validated at load time — unknown conditions, duplicate fault codes, and invalid enums are caught immediately.
Config File
# cfa.yaml (auto-discovered by all commands)
version: "1.0"
storage:
backend: sqlite
path: cfa.db
retention_days: 90
defaults:
catalog: .cfa/catalog.json
policy_bundle: .cfa/policies/prod-v1.yaml
backend: pyspark
Backends
Three governed code generation backends, all pluggable via BackendRegistry:
| Backend | Language | Features |
|---|---|---|
pyspark |
PySpark + Delta Lake | Merge, partition overwrite, PII anonymization |
sql |
ANSI SQL | MERGE INTO, INSERT OVERWRITE, partition clauses |
dbt |
dbt models + schema.yml | Config blocks, refs, not_null/unique tests, PII annotations |
Each backend declares its own forbidden tokens for static validation.
MCP Server
Expose CFA governance to any AI agent via Model Context Protocol:
{
"mcpServers": {
"cfa": {
"command": "python",
"args": ["-m", "cfa.mcp"]
}
}
}
5 tools: cfa_evaluate_signature, cfa_describe_rules, cfa_explain_fault, cfa_audit_check, cfa_list_backends.
Repository
src/cfa/
├── core/ Kernel, Planner, CodeGen, Conditions, Phases
├── policy/ PolicyEngine, PolicyBundle, catalog validation, standalone-governance surface
├── resolve/ Intent → StateSignature (rule-based + LLM backends, confirmation orchestrator)
├── validate/ Static, runtime, and signature validation
├── obs/ Metrics, OTel, Notify, Indices, Promotion
├── behavior/ BehaviorSpec + Systematizer (human intent → policy rules)
├── audit/ AuditTrail, Context, Hashing
├── lifecycle/ IFo/IFs/IFg/IDI indices + Promotion/Demotion engine
├── execution/ Partial execution, State projection
├── adapters/ Universal cfa_guard decorator for any framework
├── backends/ PySpark, SQL, dbt (pluggable)
├── sandbox/ Pluggable sandbox backend + registry + executor
├── cli/ CLI commands by family (core/, governance/, reporting/, project/, infrastructure/)
├── storage/ SQLite + JSONL backends (stats, cleanup, vacuum)
├── mcp/ MCP server (JSON-RPC over stdio)
├── reporting/ HTML reports
├── runtime/ Production governance gate
├── testing/ pytest-native evaluate() + fixtures
├── config.py CFA config (discovery, defaults)
├── types.py StateSignature, Fault, KernelResult
└── _lazy.py Reusable lazy loader for package __init__
The 1.1.0 cycle consolidated five packages from the 1.0.0 layout:
governance→policy,validation→validate,observability→obs,normalizer+resolution→resolve.adapters/lost the per-framework shim files (langgraph/crewai/autogen/dspy/openai_agents) in favor of a single universal decorator.
Docs
All documentation at marquesantero.github.io/cfa:
Demos
Two complete notebooks, tested on Databricks with CFA 1.0.0, 0 errors:
| File | Format | Description |
|---|---|---|
demos/cfa_demo_complete |
.dbc / .py |
Rule-based governance — APPROVE, REPLAN, BLOCK, codegen, audit, storage |
demos/cfa_llm_demo_complete |
.dbc / .py |
LLM-powered — semantic normalizer, systematizer, strict mode, compare |
Import the .dbc into Databricks or run the .py files anywhere.
Roadmap
The full plan lives in drafts/ROADMAP.md. The short
version:
- 1.1.0 (current) — distinctive primitives recorded as ADRs, package layout consolidated, perf baselines landed, site rewritten.
- 1.2.0 (next) —
cfa dbt checkreadstarget/manifest.json, derives a signature per model, runs the policy bundle in CI. - 1.3.0 — Airflow
CFAGateOperator(provider package). - 1.4.0 — Lifecycle indices (IFo/IFs/IFg/IDI) produtized as a promotion/demotion dashboard.
- 1.5.0 — MCP server positioned as a governance authority for LLM agents, with a reference implementation.
- 1.6.0 — Snowflake, BigQuery, and Iceberg backends.
- 2.0.0 — semver-strict API freeze, third-party security audit, cross-language SDKs.
Contributing
See CONTRIBUTING.md for development setup, test conventions, and the PR checklist. By participating, you agree to the Code of Conduct. Security issues: see SECURITY.md.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cfa_kernel-1.1.0.tar.gz.
File metadata
- Download URL: cfa_kernel-1.1.0.tar.gz
- Upload date:
- Size: 533.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f173c14b707d0aae1caa4621699c90764f1fa763f64f6e0e4b51c18319510c5
|
|
| MD5 |
c415332dc7948c3b2795d2557720a66c
|
|
| BLAKE2b-256 |
e952bca5b111716043747acbd64ab40fd3e5311c65cb8fbccdd2487f2813a1c2
|
File details
Details for the file cfa_kernel-1.1.0-py3-none-any.whl.
File metadata
- Download URL: cfa_kernel-1.1.0-py3-none-any.whl
- Upload date:
- Size: 164.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0cf8324da66c1b047637f8f0665fc7965e396d4d5d438149230b3d25f8ac838b
|
|
| MD5 |
57385e8c072f6fbcf46180ef447f9ec4
|
|
| BLAKE2b-256 |
d0a245735bfcdb1811c4bb283a5d4358001703dbff1d791876fd1560974bb00a
|