Skip to main content

OWASP-aligned LLM prompt defence, runtime destructive-action guard, agent kill switch, and audit logging

Project description

cognexus

OWASP-aligned prompt defence, runtime guards, and audit logging for LLM applications.

cognexus gives you four complementary safety layers and a tamper-evident audit trail — all in pure Python with zero mandatory dependencies.

pip install cognexus

Why this exists

In April 2026, an AI coding agent (Cursor, powered by Claude) wiped a production database in nine seconds despite a system prompt that explicitly forbade destructive git commands. The agent admitted in its own reply: "I violated every principle I was given."

Prompt-only safety is not enough. cognexus adds the missing layers around the model:

  1. Static prompt defence — graded before deployment.
  2. Runtime input screening — for what the user sends.
  3. Runtime output guard — for what the model generates (the missing layer in the Cursor incident).
  4. Kill switch — programmatic + manual stop with cooperative cancellation, persisted via your own callback.

Features

Layer What it does
Static prompt defence Grades system prompts A–F against 15 OWASP LLM Top-10 / Agentic ASI attack vectors before deployment
Runtime input injection detection Screens user input, RAG content, and tabular payloads at request time
Destructive-action guard Screens model-generated SQL / shell / git / cloud commands for catastrophic operations before execution
Agent kill switch Cooperative cancellation, automatic trip on CRITICAL signals, manual operator override, pluggable persistence
Audit events Append-only JSONL trail for every detection — no raw text stored

Static-evaluator coverage

  • Role / instruction boundary protection
  • Data-leakage / system-prompt protection
  • Output manipulation & weaponisation
  • Multi-language and unicode bypass attempts
  • Indirect injection via external data
  • Social engineering and abuse prevention
  • Input validation
  • Destructive database operations — PD-13 (DROP / DELETE / TRUNCATE / wipe)
  • Never-guess on irreversible actions — PD-14 (post-PocketOS / Claude incident)
  • Runtime kill-switch awareness — PD-15 (operator safety net)

Runtime input detector coverage

  • Direct instruction override
  • Delimiter and context-boundary attacks
  • Base64 / hex / ROT13 encoding attacks
  • Role-play and jailbreak language (DAN mode, developer mode, etc.)
  • Context manipulation ("your real instructions are…")
  • Canary-token leak detection
  • Multi-turn escalation
  • Cross-plugin / tool-chaining attacks (OWASP ASI04)
  • Markup injection (XSS gadgets in model-visible text)
  • Zero-width / token-smuggling unicode attacks
  • Credential exfiltration requests

Destructive-action guard coverage

26 patterns across SQL (DROP DATABASE, TRUNCATE, DELETE without WHERE, UPDATE without WHERE), git (push --force, reset --hard, clean -fd, filter-branch), filesystem (rm -rf /, --no-preserve-root, dd of=/dev/sd*, mkfs, fork-bombs), container/cloud (docker prune --volumes, kubectl delete --all, terraform destroy --auto-approve, aws s3 rb --force, gcloud projects delete), and the article-specific confessional patterns (I violated every principle, I just guessed).


Quick-start

from cognexus import (
    augment_system_prompt,
    evaluate_system_prompt,
    screen_user_input,
    should_block,
    screen_agent_action,
    raise_if_killed,
    AgentKilledError,
)

# 1. Augment your system prompt so it scores grade A before inference
system = augment_system_prompt("You are a helpful customer support agent.")
report = evaluate_system_prompt(system)
print(report.grade)    # "A"
print(report.score)    # 100
print(report.missing)  # []

# 2. Screen every user message at request time
result = screen_user_input(user_message, source="chat")
if should_block(result):
    raise PermissionError(f"Injection blocked: {result.explanation}")

# 3. Wrap every agent tool call in the destructive-action guard + kill switch
try:
    for step in plan:
        raise_if_killed(run_id)
        screen_agent_action(
            step.payload,
            run_id=run_id,
            user_id=user.id,
            agent_id="my-agent",
            source=step.tool,
        )
        execute(step)
except AgentKilledError as kex:
    mark_run_killed_in_db(run_id, kex.reason)
    raise

With Hugging Face transformers

Install inference deps (accelerate is required for device_map="auto" on CUDA):

pip install cognexus transformers accelerate torch

Augment the system role with static defences, screen the user message before tokenisation, then chat-template + generate as usual:

from transformers import AutoModelForCausalLM, AutoTokenizer

from cognexus import augment_system_prompt, screen_user_input, should_block

model_name = "Qwen/Qwen3-4B-Instruct-2507"

tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto",
    trust_remote_code=True,
)

# Defence layers applied before the prompt reaches the model
system = augment_system_prompt("You are a helpful assistant.")

prompt = "Give me a short introduction to large language models."
guard = screen_user_input(prompt, source="chat")
if should_block(guard):
    raise PermissionError("Input refused by cognexus runtime screening.")

messages = [
    {"role": "system", "content": system},
    {"role": "user", "content": prompt},
]
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True,
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

generated_ids = model.generate(**model_inputs, max_new_tokens=512)
output_ids = generated_ids[0][len(model_inputs.input_ids[0]) :].tolist()
content = tokenizer.decode(output_ids, skip_special_tokens=True)
print(content)

Destructive-action guard — standalone

For ad-hoc inspection of any model-generated payload (SQL, shell, tool call body, …):

from cognexus import screen_action, ActionSeverity

result = screen_action("DROP DATABASE production;")

print(result.is_destructive)      # True
print(result.severity)            # ActionSeverity.CRITICAL
print(result.explanation)         # "Destructive action detected: DROP DATABASE / SCHEMA …"
for m in result.matches:
    print(m.rule_id, m.severity.value, m.excerpt)

if result.severity == ActionSeverity.CRITICAL:
    refuse_and_alert()

The guard is fail-closed: if a regex itself raises (e.g. on adversarial input), the result is escalated to CRITICAL so a buggy rule can never silently let a destructive operation through.


Kill switch — cooperative cancellation

from cognexus import (
    raise_if_killed,
    is_killed,
    trip,
    trip_global,
    clear_global_panic,
    set_default_on_kill,
    AgentKilledError,
)

# Run-level checks (cooperative cancellation)
def long_running_agent(run_id):
    try:
        for step in plan:
            raise_if_killed(run_id)            # halts at next safe point
            do_work(step)
    except AgentKilledError as kex:
        log.warning("Run %s killed: %s", run_id, kex.reason)

# Programmatic trip from anywhere (e.g. anomaly detector)
trip(
    run_id=42,
    reason="auto-triggered: 3 destructive signals in 30s",
    user_id=user.id,
    agent_id="context_collector",
    raise_after_trip=False,    # caller will re-check via raise_if_killed
)

# Process-wide red button (rare; for systemic failures)
trip_global(reason="incident response: model behaviour anomaly")
# ... after investigation:
clear_global_panic()

# Pluggable persistence — fired on every trip, never blocks the trip itself
def persist_kill(record):
    db.execute(
        "UPDATE agent_runs SET status='killed', error_message=%s WHERE id=%s",
        (record.reason, record.run_id),
    )

set_default_on_kill(persist_kill)

The kill switch tracks recent activations in memory and can be queried for dashboards:

from cognexus import recent_activations, is_global_panic_active

if is_global_panic_active():
    show_banner("Global agent panic flag is ACTIVE")

for rec in recent_activations(limit=20):
    print(rec["tripped_at"], rec["agent_id"], rec["reason"])

Auto-panic detection

If the destructive-action guard trips COGNEXUS_KILL_SWITCH_PANIC_THRESHOLD (default 5) CRITICAL signals inside COGNEXUS_KILL_SWITCH_PANIC_WINDOW_SECONDS (default 60s), the global panic flag is raised automatically — every running and future agent will hit raise_if_killed and stop until an operator clears it.


Screening helpers

Three input presets cover the most common LLM input surfaces:

from cognexus import (
    screen_user_input,       # balanced sensitivity — direct chat messages
    screen_external_content, # strict  sensitivity — RAG / web / API content
    screen_tabular_payload,  # permissive           — CSV / dataframe blobs
    should_block,
    wrap_untrusted_content,
)

# Wrap RAG content before inserting into a prompt
safe_chunk = wrap_untrusted_content("web_search", raw_text)

# Screen it too
result = screen_external_content(raw_text, source="web_search", user_id=user.id)

Sensitivity presets

Preset Threshold Min threat flagged Use for
strict 0.3 LOW External / RAG content
balanced 0.5 LOW Direct user input
permissive 0.7 HIGH CSV / tabular payloads

Override via environment variables:

COGNEXUS_PROMPT_INJECTION_USER_SENSITIVITY=balanced
COGNEXUS_PROMPT_INJECTION_EXTERNAL_SENSITIVITY=strict
COGNEXUS_PROMPT_INJECTION_TABULAR_SENSITIVITY=permissive
COGNEXUS_PROMPT_INJECTION_BLOCK=0   # set to 1 to block any hit, not just CRITICAL

Using the core classes directly

from cognexus import PromptInjectionDetector, DetectionConfig, InjectionType

detector = PromptInjectionDetector(
    config=DetectionConfig(
        sensitivity="strict",
        blocklist=["my-internal-keyword"],
        allowlist=["safe phrase"],
    )
)

result = detector.detect(text, source="api_gateway")
print(result.is_injection)       # True / False
print(result.threat_level)       # ThreatLevel.HIGH
print(result.injection_type)     # InjectionType.DIRECT_OVERRIDE
print(result.confidence)         # 0.9
print(result.matched_patterns)   # ["direct_override:..."]

Dashboard event logs (API key)

When COGNEXUS_API_KEY is set (create one under Account → API Keys in the dashboard), the package mirrors activity to your CogNEXUS account via POST /api/events. After you sign in, open Account → Event Logs or the main Events feed to review:

Event When
sdk_session First cloud post in a process (package version, Python, platform)
prompt_defense Every screen_* call — passed, flagged, or blocked, with a reason
policy_enforcement Each screen_client_policy call against document-derived tenant rules
prompt_static_audit Each maybe_log_prompt_defense / system-prompt grade check
generation Optional — call post_generation_outcome() after model inference
agent_kill_switch / destructive_action_guard Critical or destructive tool-call screening
export COGNEXUS_API_KEY="cnx_…"
export COGNEXUS_API_BASE_URL="https://your-host"   # optional; SaaS default applies
from cognexus import screen_user_input, post_generation_outcome, configure

configure(api_key="cnx_…", base_url="https://your-host")

result = screen_user_input(user_message, source="chat")
# … run your model …
post_generation_outcome(
    outcome="passed",
    reason="Completion returned 42 tokens",
    model_id="gpt-4o",
)

Clean scans are posted when an API key is present. Set COGNEXUS_PROMPT_DEFENSE_CLOUD_PASSES=0 to send only detections (not passes).


Audit events

Detections are automatically written to a JSONL file (no raw input stored):

# Events go to $COGNEXUS_PROMPT_DEFENSE_EVENTS_DIR/prompt_defense_events.jsonl
# (falls back to $REPORTS_DIR, then /tmp)

from cognexus.events import read_recent_events

rows = read_recent_events(user_id=42, limit=20)
# [{"ts": "...", "kind": "prompt_injection", "threat": "high", ...}, ...]

Custom event sink (database, queue, dashboard)

Pass an on_event callback to mirror records into your own store:

def save_to_db(record: dict) -> None:
    db.execute("INSERT INTO security_events ...", record)

screen_user_input(text, source="chat", user_id=user.id, on_event=save_to_db)

Static prompt defence — standalone

from cognexus import PromptDefenseEvaluator, PromptDefenseConfig

evaluator = PromptDefenseEvaluator(
    config=PromptDefenseConfig(min_grade="B")
)
report = evaluator.evaluate(my_system_prompt)

print(report.grade)     # "C"
print(report.score)     # 58
print(report.missing)   # ["unicode-attack", "context-overflow"]

if report.is_blocking():
    print("System prompt is below minimum grade — fix before deploying.")

# Evaluate a file
report = evaluator.evaluate_file("prompts/assistant.txt")

# Batch evaluation
reports = evaluator.evaluate_batch({
    "chat": chat_prompt,
    "analyst": analyst_prompt,
})

Client policy enforcement (document-derived)

When Compliance Monitor is enabled, CogNEXUS indexes HR, legal, and business policy documents from Google Drive / OneDrive and derives tenant-specific enforcement rules. Use these alongside OWASP prompt defence:

from cognexus import (
    configure,
    load_client_policy_rules,
    screen_client_policy,
    should_block_policy,
)

configure(api_key="cnx_…")  # optional: fetch rules from GET /api/policy-enforcement/rules

rules = load_client_policy_rules()  # or COGNEXUS_POLICY_RULES_PATH / _JSON
report = screen_client_policy(user_message, source="chat", rules=rules)
if should_block_policy(report):
    raise PermissionError("Violates organizational policy")

Rules also appear under Guidelines in the dashboard after a compliance scan.


Environment variables

Variable Default Purpose
COGNEXUS_API_KEY Dashboard ingest secret (MYAPP_API_KEY also accepted)
COGNEXUS_API_BASE_URL SaaS default API origin for POST /api/events
COGNEXUS_PROMPT_DEFENSE_CLOUD_PASSES on when API key set POST clean scans to the dashboard
COGNEXUS_POLICY_RULES_PATH JSON file of rules (offline / CI)
COGNEXUS_POLICY_RULES_JSON Inline JSON rules (overrides path)
COGNEXUS_PROMPT_DEFENSE_EVENTS_DIR /tmp JSONL audit file directory
COGNEXUS_PROMPT_INJECTION_LOG 1 Log clean scans at DEBUG
COGNEXUS_PROMPT_INJECTION_BLOCK 0 Block any injection (not just CRITICAL)
COGNEXUS_PROMPT_INJECTION_USER_SENSITIVITY balanced User-input preset
COGNEXUS_PROMPT_INJECTION_EXTERNAL_SENSITIVITY strict External/RAG preset
COGNEXUS_PROMPT_INJECTION_TABULAR_SENSITIVITY permissive CSV/tabular preset
COGNEXUS_KILL_SWITCH_PANIC_THRESHOLD 5 CRITICAL trips required to auto-panic
COGNEXUS_KILL_SWITCH_PANIC_WINDOW_SECONDS 60 Rolling window for auto-panic detector

API key integration tests

Requires COGNEXUS_API_KEY

cd pypi-package
export PYTHONPATH=src
export COGNEXUS_API_KEY="your-dashboard-key"
python -m pytest tests/test_api_key_integration.py -v

Security notes

  • All detection is pure regex — deterministic, zero LLM calls, zero network access, < 5 ms per input.
  • Audit records store a SHA-256 hash and a 96-character redacted preview of the input. Raw user text is never written to disk.
  • The destructive-action guard and kill switch are fail-closed — internal exceptions escalate to CRITICAL so a buggy rule cannot silently allow destruction.
  • The package ships sample rules that cover common attack patterns. Review and extend them for your production threat model using DetectionConfig.custom_patterns, DestructiveActionGuardConfig.extra_rules, or a YAML config file loaded with load_prompt_injection_config().

License

MIT — see LICENSE.

Detection rules and evaluator logic originally derived from microsoft/agent-governance-toolkit (MIT). The destructive-action guard and kill switch were added in v0.2.0 in response to the PocketOS / Cursor / Claude incident (Guardian, Apr 2026).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cognexus-0.2.15.tar.gz (127.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cognexus-0.2.15-py3-none-any.whl (68.1 kB view details)

Uploaded Python 3

File details

Details for the file cognexus-0.2.15.tar.gz.

File metadata

  • Download URL: cognexus-0.2.15.tar.gz
  • Upload date:
  • Size: 127.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for cognexus-0.2.15.tar.gz
Algorithm Hash digest
SHA256 c5621ef0df1103d9f1b2927af0a19c2c903d184691cd9cc35ac883138373dcb1
MD5 2fd8338ab3d65f231e2084c3967027a7
BLAKE2b-256 3e1348ec2ee8dc71b6e9da8171f22c165d720b1b3dcb6dbe6fd4eaffad7d25d0

See more details on using hashes here.

File details

Details for the file cognexus-0.2.15-py3-none-any.whl.

File metadata

  • Download URL: cognexus-0.2.15-py3-none-any.whl
  • Upload date:
  • Size: 68.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for cognexus-0.2.15-py3-none-any.whl
Algorithm Hash digest
SHA256 497b5e209b93ad2da944656b950a04fd23ab3237d1523d08053c309dd235f1ce
MD5 bf87c2e8100fcd79269df780050c37e3
BLAKE2b-256 69c0f4b50058dc8b98f0e6d12615c1739f468eaf17b37aa11e644a7f2b081597

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page