Skip to main content

On-prem agent for the Mars® Operating System

Project description

Causum Agent

On-premises worker for the Mars Operating System. Runs inside your network, connects to Causum's SaaS control plane over Redis Streams, and executes database queries with automatic data desensitization. Credentials never leave your infrastructure.

Database connections are with the pymnemon library.

How it works

Your network                          Causum SaaS
+--------------------------+          +------------------+
|  Agent                   |  Redis   |                  |
|  - materializes secrets  | <------> |  Orchestrator    |
|  - executes DB queries   |  Streams |                  |
|  - desensitizes results  |          +------------------+
|  - publishes results     |
+--------------------------+
  1. The agent boots with a connection token (base64-encoded Redis URL + agent key).
  2. It registers with the orchestrator and receives runtime configuration over Redis Streams.
  3. Secrets are resolved locally through your chosen provider (environment variables, HashiCorp Vault, file system, or Kubernetes secrets).
  4. The agent executes database tasks (queries, metadata introspection, sampling) and desensitizes results through the customer's LLM before returning them.

Secrets are never sent to the SaaS. Config payloads carry secret mappings (references), not values.

Config modes

Mode When db_query behavior
DB + LLM Both connectors configured Results desensitized through customer's LLM before returning.
DB only, passthrough LLM absent, llm_passthrough: true (default) Raw rows returned to SaaS for managed desensitization.
DB only, blocked LLM absent, llm_passthrough: false Agent refuses to return data. Error raised.

Desensitization pipeline

When the LLM is active, db_query results pass through a multi-stage pipeline:

  1. Classify -- LLM classifies each field as sensitive or safe using table/field descriptions from the SaaS + sample values from the query result.
  2. Mask -- Sensitive values are replaced with deterministic SHA256-based tokens (e.g. NAM_a1b2c3d4). Entity ID fields are preserved for joining.
  3. Chunk -- Masked data is split into token-bounded chunks sized to the model's context window.
  4. Transform -- Each chunk is sent through the LLM for final transformation into a clean text format.
  5. Return -- Concatenated desensitized text is returned to the SaaS.

Source layout

actor/
  __init__.py                 Package root; exposes __version__
  identity.py                 Stable instance ID generation and persistence
  schemas.py                  Shared JSON schema loader

  authentication/
    secrets.py                SecretProvider implementations (env, vault, file, k8s)
    config.py                 Pydantic models for secret store configuration
    database.py               Database credential builders (20+ auth types)
    llm.py                    LLM credential builders
    manager.py                ConnectionManager: high-level build + validate
    exceptions.py             Typed exception hierarchy (see below)

  cli/
    main.py                   Click CLI: start, validate, health, config, debug
    utils.py                  Shared helpers: redaction, error sanitization, CLIContext

  connectors/
    db.py                     SQLAlchemy client with read-only enforcement + dialect queries
    llm.py                    OpenAI-compatible chat client with token limit cascade
    desensitize.py            Desensitization pipeline (classify, mask, chunk, transform)
    _openai_common.py         Shared OpenAI client construction

  queue/
    service.py                QueueService facade (preserves public API)
    connection.py             Redis connection management with reconnect/backoff
    consumer.py               Task consumption loop with dedup and timeout
    dedup.py                  Deduplication tracker with at-least-once semantics
    publisher.py              All publish methods for the results stream
    config_listener.py        Config and registration ack listener

  runtime/
    config_store.py           RuntimeConfig, config parsing, schema + business validation
    task_router.py            Routes tasks to connectors, tracks metrics
    db_tasks.py               Database task executor with desensitization integration

  observability/
    service.py                Heartbeat, validation, and metrics publishing
    metrics.py                In-process counters and histograms
    models.py                 Health status models

  logging/
    service.py                JSON/text log buffer with optional file output

schemas/                      JSON Schema contracts for all Redis message types
docs/                         Operator guides (installation, operations, scaling)
tests/                        Unit and integration tests

Task types

Task Purpose Requires LLM
db_validate Database connectivity check (SELECT 1). No
db_schemas List all schemas. Uses mnemon dialect queries. No
db_tables List tables in a schema. Uses mnemon dialect queries. No
db_columns Get column metadata for a table. No
db_sample Sample rows from a table for classification or preview. No
db_query Execute a read-only query. Returns desensitized text or raw rows. Optional
classify_fields Classify field sensitivity using LLM + table context. Yes
llm_validate LLM connectivity + latency check. Yes
pipeline_validate End-to-end DB + LLM validation with desensitization status. No

Secret policy

The agent enforces a strict separation between configuration and credentials:

  1. Config payloads must not contain raw secrets. Validation rejects direct api_key, password, and url fields.
  2. Configs carry secret_mapping references (e.g. {"api_key": "OPENAI_API_KEY"}).
  3. Secrets are resolved at runtime by a SecretProvider (env, vault, file, k8s) and held only in process memory.
  4. Secrets are excluded from logs, Redis messages, config cache, and CLI output.
  5. Debug commands redact sensitive values by default (--show-secrets to reveal).

Exception hierarchy

Authentication errors are structured, not stringly typed. All inherit from _AuthBaseError and accept optional component and detail keyword arguments for programmatic inspection:

Exception Raised when Pipeline stage
ConfigurationError Static validation fails (bad auth_type, missing mapping keys) Before any network call
SecretProviderError Secret store is unreachable or returns an error Secret fetch
AuthMaterializationError Secrets fetched OK but credentials cannot be assembled Credential construction
ValidationError Live connectivity check fails (SELECT 1, LLM ping) Outbound validation

Redis protocol

Two streams per agent:

Stream Direction Purpose
agent:{key}:tasks SaaS to agent Task messages (consumer group workers)
agent:{key}:results Agent to SaaS Results, heartbeats, config requests, metrics

Message types and their schemas are in schemas/. Key types:

  • config / config_request / config_applied -- configuration lifecycle
  • instance_register / instance_register_ack -- identity handshake
  • task_result -- success or sanitized error for each task
  • heartbeat / validation_report / metrics_report -- observability
  • backlog_report -- pending message audit

Task deduplication uses an in-memory set of the last 1,000 message IDs plus a persisted last_ack in Redis. Messages with IDs at or below last_ack are skipped on restart. Delivery is at-least-once; callbacks must be idempotent.

Query safety

The database connector enforces read-only access. _assert_read_only() strips both inline (--, #) and block (/* */) comments before checking that:

  • The first token is one of SELECT, WITH, SHOW, DESCRIBE, EXPLAIN, or PRAGMA.
  • No DML keywords (INSERT, UPDATE, DELETE, ALTER, DROP, TRUNCATE, CREATE, REPLACE, GRANT, REVOKE) appear in the normalized query.

Table and schema names used in introspection tasks (db_sample, db_columns, etc.) are validated against ^[a-zA-Z0-9_.]+$ to prevent SQL injection.

Installation

Requires Python 3.9+.

# Core install
pip install .

# With all optional providers (Vault, Kubernetes, AWS, cryptography)
pip install ".[all]"

Docker

# Build locally
docker build -t causum/actor:<version> -t causum/actor:latest .

# Run from local image (or from Docker Hub after push)
docker run --rm -e AGENT_CONNECTION_TOKEN=<token> causum/actor:latest

# Publish to Docker Hub
docker login
docker push causum/actor:<version>
docker push causum/actor:latest

The image runs as non-root (UID 1000).

For multi-architecture publishing (amd64 + arm64), use:

scripts/dockerhub_publish.sh <version>

Kubernetes

A Helm chart is provided in chart/. See docs/installation.md and chart/values.yaml.

CLI

The agent command is the primary interface. All commands read configuration from the AGENT_CONNECTION_TOKEN environment variable or a --config YAML file.

agent version                       Show version
agent start [--config FILE]         Start the worker
agent validate [--component X]      Validate connections (redis, database, llm, pipeline)
agent health [--watch]              Show latest heartbeat from Redis
agent logs [--log-dir DIR]          Tail file logs

agent config generate               Print a blank YAML template
agent config generate --with-examples
agent config show [--show-secrets]  Display loaded config (redacted by default)
agent config test                   Decode and validate connection token

agent debug redis                   Ping Redis
agent debug streams                 Inspect stream lengths and recent messages
agent debug decode-token [TOKEN]    Decode connection token (redacted by default)
  --show-secrets                    Show unredacted values

Diagnosis

Use these commands to inspect the state of the agent:

# One-shot health check (includes source stream in output)
agent health --format json

# Live health check every 5s
agent health --watch

# Inspect stream lengths + last messages from tasks/results/telemetry
agent debug streams --show-messages 20

Expected signals after successful startup:

  • instance_register, config_request, backlog_report on agent:{key}:results
  • config_applied, validation_report, heartbeat on agent:{key}:telemetry

Running the tests

# Create a virtualenv and install
python3 -m venv venv
source venv/bin/activate
pip install -e ".[all]"
pip install pytest

# Unit tests (no external services required)
pytest tests/ --ignore=tests/integration/

# Integration tests (require a running Redis instance)
pytest tests/integration/

Unit tests cover:

  • All 20+ database credential builders (tests/test_builders.py)
  • Runtime config update, llm_passthrough, and token_limit validation (tests/test_connectors.py)
  • Queue consume, publish, wait, trim, and consumer group init (tests/test_queue.py)
  • Desensitization pipeline: hashing, chunking, classification, masking, end-to-end (tests/test_desensitize.py)
  • Task router: old task rejection, passthrough behavior, pipeline validation (tests/test_desensitize.py)
  • Log buffering and file output (tests/test_logging.py)
  • Component health tracking, metrics snapshots, and validation reports (tests/test_observability.py)

Configuration

Runtime config is delivered by the SaaS over Redis (schema: schemas/config_message.schema.json).

Example config payload:

{
  "type": "config",
  "secret_store": { "provider": "env" },
  "database": {
    "enabled": true,
    "type": "postgresql",
    "auth_type": "password",
    "secret_mapping": {
      "username": "DB_USER",
      "password": "DB_PASS",
      "host": "DB_HOST",
      "database": "DB_NAME"
    },
    "port": 5432,
    "schema": "public"
  },
  "llm": {
    "enabled": true,
    "endpoint": "https://api.openai.com/v1",
    "auth_type": "api_key",
    "model": "gpt-4o-mini",
    "secret_mapping": {
      "endpoint": "LLM_ENDPOINT",
      "api_key": "OPENAI_API_KEY",
      "model": "LLM_MODEL"
    },
    "llm_passthrough": true,
    "token_limit": 128000
  }
}

Supported secret providers: env, vault, file, k8s. Supported LLM auth types: api_key, basic_auth, bearer, azure_key, no_auth. Supported database auth types: password, ssl, ssl_cert, ssl_verify, scram, service_account, iam_credentials, iam_role, token, oauth_m2m, jwt, certificate, wallet, ldap, local_file, motherduck, windows_auth, key_pair, none.

Endpoint patterns and model allowlists can be enforced via environment variables (LLM_ENDPOINT_PATTERN, LLM_MODEL_ALLOWLIST).

Further documentation

  • docs/overview.md -- Architecture and data flow
  • docs/installation.md -- Deployment guide
  • docs/operations.md -- Operator runbook
  • docs/horizontal_scaling.md -- Multi-instance setup
  • docs/identity.md -- Instance identity and persistence
  • docs/data_api.md -- Complete task API reference

License

Proprietary, source-available. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

causum-1.2.1.tar.gz (219.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

causum-1.2.1-py3-none-any.whl (81.7 kB view details)

Uploaded Python 3

File details

Details for the file causum-1.2.1.tar.gz.

File metadata

  • Download URL: causum-1.2.1.tar.gz
  • Upload date:
  • Size: 219.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for causum-1.2.1.tar.gz
Algorithm Hash digest
SHA256 98b5248a0ab10ca6ea043553ae5da036e49cc1b269f4debbfe608471baddd29e
MD5 b1a2b7d4abbf55efff612c0ce4f64e01
BLAKE2b-256 44ee6c71450174a56fd77682b85602387c45e1c2b80f0b8d6fa85e0a4dc6c37b

See more details on using hashes here.

File details

Details for the file causum-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: causum-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 81.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for causum-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c61fdb6825eacf8a4ae774c06798f9417b4505f0fe735dc9c6a9b5c1b192fcfe
MD5 6cb5432f0ff1bdd934347fcd3394a34c
BLAKE2b-256 3ba62e1c2cf992929d9a96572d02d5f7e544a35893dd259377cdd6a3d24f986e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page