On-prem agent for the Mars® Operating System
Project description
Causum Agent
On-premises worker for the Mars Operating System. Runs inside your network, connects to Causum's SaaS control plane over Redis Streams, and executes database queries with automatic data desensitization. Credentials never leave your infrastructure.
How it works
Your network Causum SaaS
+--------------------------+ +------------------+
| Agent | Redis | |
| - materializes secrets | <------> | Orchestrator |
| - executes DB queries | Streams | |
| - desensitizes results | +------------------+
| - publishes results |
+--------------------------+
- The agent boots with a connection token (base64-encoded Redis URL + agent key).
- It registers with the orchestrator and receives runtime configuration over Redis Streams.
- Secrets are resolved locally through your chosen provider (environment variables, HashiCorp Vault, file system, or Kubernetes secrets).
- The agent executes database tasks (queries, metadata introspection, sampling) and desensitizes results through the customer's LLM before returning them.
Secrets are never sent to the SaaS. Config payloads carry secret mappings (references), not values.
Config modes
| Mode | When | db_query behavior |
|---|---|---|
| DB + LLM | Both connectors configured | Results desensitized through customer's LLM before returning. |
| DB only, passthrough | LLM absent, llm_passthrough: true (default) |
Raw rows returned to SaaS for managed desensitization. |
| DB only, blocked | LLM absent, llm_passthrough: false |
Agent refuses to return data. Error raised. |
Desensitization pipeline
When the LLM is active, db_query results pass through a multi-stage pipeline:
- Classify -- LLM classifies each field as sensitive or safe using table/field descriptions from the SaaS + sample values from the query result.
- Mask -- Sensitive values are replaced with deterministic SHA256-based tokens (e.g.
NAM_a1b2c3d4). Entity ID fields are preserved for joining. - Chunk -- Masked data is split into token-bounded chunks sized to the model's context window.
- Transform -- Each chunk is sent through the LLM for final transformation into a clean text format.
- Return -- Concatenated desensitized text is returned to the SaaS.
Source layout
actor/
__init__.py Package root; exposes __version__
identity.py Stable instance ID generation and persistence
schemas.py Shared JSON schema loader
authentication/
secrets.py SecretProvider implementations (env, vault, file, k8s)
config.py Pydantic models for secret store configuration
database.py Database credential builders (20+ auth types)
llm.py LLM credential builders
manager.py ConnectionManager: high-level build + validate
exceptions.py Typed exception hierarchy (see below)
cli/
main.py Click CLI: start, validate, health, config, debug
utils.py Shared helpers: redaction, error sanitization, CLIContext
connectors/
db.py SQLAlchemy client with read-only enforcement + dialect queries
llm.py OpenAI-compatible chat client with token limit cascade
desensitize.py Desensitization pipeline (classify, mask, chunk, transform)
_openai_common.py Shared OpenAI client construction
queue/
service.py QueueService facade (preserves public API)
connection.py Redis connection management with reconnect/backoff
consumer.py Task consumption loop with dedup and timeout
dedup.py Deduplication tracker with at-least-once semantics
publisher.py All publish methods for the results stream
config_listener.py Config and registration ack listener
runtime/
config_store.py RuntimeConfig, config parsing, schema + business validation
task_router.py Routes tasks to connectors, tracks metrics
db_tasks.py Database task executor with desensitization integration
observability/
service.py Heartbeat, validation, and metrics publishing
metrics.py In-process counters and histograms
models.py Health status models
logging/
service.py JSON/text log buffer with optional file output
schemas/ JSON Schema contracts for all Redis message types
docs/ Operator guides (installation, operations, scaling)
tests/ Unit and integration tests
Task types
| Task | Purpose | Requires LLM |
|---|---|---|
db_validate |
Database connectivity check (SELECT 1). |
No |
db_schemas |
List all schemas. Uses mnemon dialect queries. | No |
db_tables |
List tables in a schema. Uses mnemon dialect queries. | No |
db_columns |
Get column metadata for a table. | No |
db_sample |
Sample rows from a table for classification or preview. | No |
db_query |
Execute a read-only query. Returns desensitized text or raw rows. | Optional |
classify_fields |
Classify field sensitivity using LLM + table context. | Yes |
llm_validate |
LLM connectivity + latency check. | Yes |
pipeline_validate |
End-to-end DB + LLM validation with desensitization status. | No |
Secret policy
The agent enforces a strict separation between configuration and credentials:
- Config payloads must not contain raw secrets. Validation rejects direct
api_key,password, andurlfields. - Configs carry
secret_mappingreferences (e.g.{"api_key": "OPENAI_API_KEY"}). - Secrets are resolved at runtime by a
SecretProvider(env, vault, file, k8s) and held only in process memory. - Secrets are excluded from logs, Redis messages, config cache, and CLI output.
- Debug commands redact sensitive values by default (
--show-secretsto reveal).
Exception hierarchy
Authentication errors are structured, not stringly typed.
All inherit from _AuthBaseError and accept optional component and detail keyword arguments for programmatic inspection:
| Exception | Raised when | Pipeline stage |
|---|---|---|
ConfigurationError |
Static validation fails (bad auth_type, missing mapping keys) | Before any network call |
SecretProviderError |
Secret store is unreachable or returns an error | Secret fetch |
AuthMaterializationError |
Secrets fetched OK but credentials cannot be assembled | Credential construction |
ValidationError |
Live connectivity check fails (SELECT 1, LLM ping) | Outbound validation |
Redis protocol
Two streams per agent:
| Stream | Direction | Purpose |
|---|---|---|
agent:{key}:tasks |
SaaS to agent | Task messages (consumer group workers) |
agent:{key}:results |
Agent to SaaS | Results, heartbeats, config requests, metrics |
Message types and their schemas are in schemas/. Key types:
config/config_request/config_applied-- configuration lifecycleinstance_register/instance_register_ack-- identity handshaketask_result-- success or sanitized error for each taskheartbeat/validation_report/metrics_report-- observabilitybacklog_report-- pending message audit
Task deduplication uses an in-memory set of the last 1,000 message IDs plus a persisted last_ack in Redis. Messages with IDs at or below last_ack are skipped on restart. Delivery is at-least-once; callbacks must be idempotent.
Query safety
The database connector enforces read-only access. _assert_read_only() strips both inline (--, #) and block (/* */) comments before checking that:
- The first token is one of
SELECT,WITH,SHOW,DESCRIBE,EXPLAIN, orPRAGMA. - No DML keywords (
INSERT,UPDATE,DELETE,ALTER,DROP,TRUNCATE,CREATE,REPLACE,GRANT,REVOKE) appear in the normalized query.
Table and schema names used in introspection tasks (db_sample, db_columns, etc.) are validated against ^[a-zA-Z0-9_.]+$ to prevent SQL injection.
Installation
Requires Python 3.9+.
# Core install
pip install .
# With all optional providers (Vault, Kubernetes, AWS, cryptography)
pip install ".[all]"
Docker
# Build locally
docker build -t causum/actor:<version> -t causum/actor:latest .
# Run from local image (or from Docker Hub after push)
docker run --rm -e AGENT_CONNECTION_TOKEN=<token> causum/actor:latest
# Publish to Docker Hub
docker login
docker push causum/actor:<version>
docker push causum/actor:latest
The image runs as non-root (UID 1000).
For multi-architecture publishing (amd64 + arm64), use:
scripts/dockerhub_publish.sh <version>
Kubernetes
A Helm chart is provided in chart/. See docs/installation.md and chart/values.yaml.
CLI
The agent command is the primary interface. All commands read configuration from the AGENT_CONNECTION_TOKEN environment variable or a --config YAML file.
agent version Show version
agent start [--config FILE] Start the worker
agent validate [--component X] Validate connections (redis, database, llm, pipeline)
agent health [--watch] Show latest heartbeat from Redis
agent logs [--log-dir DIR] Tail file logs
agent config generate Print a blank YAML template
agent config generate --with-examples
agent config show [--show-secrets] Display loaded config (redacted by default)
agent config test Decode and validate connection token
agent debug redis Ping Redis
agent debug streams Inspect stream lengths and recent messages
agent debug decode-token [TOKEN] Decode connection token (redacted by default)
--show-secrets Show unredacted values
Diagnosis
Use these commands to inspect the state of the agent:
# One-shot health check (includes source stream in output)
agent health --format json
# Live health check every 5s
agent health --watch
# Inspect stream lengths + last messages from tasks/results/telemetry
agent debug streams --show-messages 20
Expected signals after successful startup:
instance_register,config_request,backlog_reportonagent:{key}:resultsconfig_applied,validation_report,heartbeatonagent:{key}:telemetry
Running the tests
# Create a virtualenv and install
python3 -m venv venv
source venv/bin/activate
pip install -e ".[all]"
pip install pytest
# Unit tests (no external services required)
pytest tests/ --ignore=tests/integration/
# Integration tests (require a running Redis instance)
pytest tests/integration/
Unit tests cover:
- All 20+ database credential builders (
tests/test_builders.py) - Runtime config update, llm_passthrough, and token_limit validation (
tests/test_connectors.py) - Queue consume, publish, wait, trim, and consumer group init (
tests/test_queue.py) - Desensitization pipeline: hashing, chunking, classification, masking, end-to-end (
tests/test_desensitize.py) - Task router: old task rejection, passthrough behavior, pipeline validation (
tests/test_desensitize.py) - Log buffering and file output (
tests/test_logging.py) - Component health tracking, metrics snapshots, and validation reports (
tests/test_observability.py)
Configuration
Runtime config is delivered by the SaaS over Redis (schema: schemas/config_message.schema.json).
Example config payload:
{
"type": "config",
"secret_store": { "provider": "env" },
"database": {
"enabled": true,
"type": "postgresql",
"auth_type": "password",
"secret_mapping": {
"username": "DB_USER",
"password": "DB_PASS",
"host": "DB_HOST",
"database": "DB_NAME"
},
"port": 5432,
"schema": "public"
},
"llm": {
"enabled": true,
"endpoint": "https://api.openai.com/v1",
"auth_type": "api_key",
"model": "gpt-4o-mini",
"secret_mapping": {
"endpoint": "LLM_ENDPOINT",
"api_key": "OPENAI_API_KEY",
"model": "LLM_MODEL"
},
"llm_passthrough": true,
"token_limit": 128000
}
}
Supported secret providers: env, vault, file, k8s.
Supported LLM auth types: api_key, basic_auth, bearer, azure_key, no_auth.
Supported database auth types: password, ssl, ssl_cert, ssl_verify, scram, service_account, iam_credentials, iam_role, token, oauth_m2m, jwt, certificate, wallet, ldap, local_file, motherduck, windows_auth, key_pair, none.
Endpoint patterns and model allowlists can be enforced via environment variables (LLM_ENDPOINT_PATTERN, LLM_MODEL_ALLOWLIST).
Further documentation
docs/overview.md-- Architecture and data flowdocs/installation.md-- Deployment guidedocs/operations.md-- Operator runbookdocs/horizontal_scaling.md-- Multi-instance setupdocs/identity.md-- Instance identity and persistencedocs/data_api.md-- Complete task API reference
License
Proprietary, source-available. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file causum-1.1.6.tar.gz.
File metadata
- Download URL: causum-1.1.6.tar.gz
- Upload date:
- Size: 199.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9e11556e11c3d68e64bebab09f41c00e4d86ef9baf1792844ef99401b248d83f
|
|
| MD5 |
bbdd3b3b80a805a1e6cd7ebcc3cd6091
|
|
| BLAKE2b-256 |
7501a5f1157ac633aa393071b5fe07334009ad6da09ac421ff23869421e287ae
|
File details
Details for the file causum-1.1.6-py3-none-any.whl.
File metadata
- Download URL: causum-1.1.6-py3-none-any.whl
- Upload date:
- Size: 72.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52f6532796357a35d1e456022c9ceb7e1a0599c8eeef428c9cd9ef6bf58bbc90
|
|
| MD5 |
ae5e62369a7b53493bbbaed68e033853
|
|
| BLAKE2b-256 |
9fed6ccd55feee34e5acbfd6f994b26eedbf85d86c25590fc0cb72498c09de08
|