Skip to main content

OpenBox SDK - Governance and observability for Temporal workflows

Project description

OpenBox SDK for Temporal Workflows

OpenBox SDK provides governance and observability for Temporal workflows by capturing workflow/activity lifecycle events, HTTP telemetry, database queries, and file operations, then sending them to OpenBox Core for policy evaluation.

Key Features:

  • 6 event types (WorkflowStarted, WorkflowCompleted, WorkflowFailed, SignalReceived, ActivityStarted, ActivityCompleted)
  • 5-tier verdict system (ALLOW, CONSTRAIN, REQUIRE_APPROVAL, BLOCK, HALT)
  • HTTP/Database/File I/O instrumentation via OpenTelemetry
  • Guardrails: Input/output validation and redaction
  • Human-in-the-loop approval with expiration handling
  • Zero-code setup via create_openbox_worker() factory

Installation

pip install openbox-temporal-sdk-python

Requirements:

  • Python 3.9+
  • Temporal SDK 1.8+
  • OpenTelemetry API/SDK 1.38.0+

Quick Start

Use the create_openbox_worker() factory for simple integration:

import os
from openbox import create_openbox_worker

worker = create_openbox_worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    # OpenBox config
    openbox_url=os.getenv("OPENBOX_URL"),
    openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)

await worker.run()

The factory automatically:

  1. Validates the API key
  2. Creates span processor
  3. Sets up OpenTelemetry instrumentation
  4. Creates governance interceptors
  5. Adds send_governance_event activity
  6. Returns fully configured Worker

Configuration

Environment Variables

OPENBOX_URL=http://localhost:8086
OPENBOX_API_KEY=obx_test_key_1
OPENBOX_GOVERNANCE_TIMEOUT=30.0
OPENBOX_GOVERNANCE_POLICY=fail_open  # or fail_closed

Factory Function Parameters

worker = create_openbox_worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],

    # OpenBox config
    openbox_url="http://localhost:8086",
    openbox_api_key="obx_test_key_1",
    governance_timeout=30.0,
    governance_policy="fail_open",

    # Event filtering
    send_start_event=True,
    send_activity_start_event=True,
    skip_workflow_types={"InternalWorkflow"},
    skip_activity_types={"send_governance_event"},
    skip_signals={"heartbeat"},

    # Database instrumentation
    instrument_databases=True,
    db_libraries={"psycopg2", "sqlalchemy"},  # None = all available
    sqlalchemy_engine=engine,  # pass pre-existing engine for query capture

    # File I/O instrumentation
    instrument_file_io=False,  # disabled by default

    # Standard Worker options (all supported)
    activity_executor=my_executor,
    max_concurrent_activities=10,
)

Governance Verdicts

OpenBox Core returns a verdict indicating what action the SDK should take.

Verdict Behavior
ALLOW Continue execution normally
CONSTRAIN Log constraints, continue
REQUIRE_APPROVAL Pause, poll for human approval
BLOCK Raise error, stop activity
HALT Raise error, terminate workflow

v1.0 Backward Compatibility:

  • "continue"ALLOW
  • "stop"HALT
  • "require-approval"REQUIRE_APPROVAL

Event Types

Event Trigger Captured Fields
WorkflowStarted Workflow begins workflow_id, run_id, workflow_type, task_queue
WorkflowCompleted Workflow succeeds workflow_id, run_id, workflow_type
WorkflowFailed Workflow fails workflow_id, run_id, workflow_type, error
SignalReceived Signal received workflow_id, signal_name, signal_args
ActivityStarted Activity begins activity_id, activity_type, activity_input
ActivityCompleted Activity ends activity_id, activity_type, activity_input, activity_output, spans, status, duration

Guardrails (Input/Output Redaction)

OpenBox Core can validate and redact sensitive data before/after activity execution:

# Request
{
  "verdict": "allow",
  "guardrails_result": {
    "input_type": "activity_input",
    "redacted_input": {"prompt": "[REDACTED]", "user_id": "123"},
    "validation_passed": true,
    "reasons": []
  }
}

# If validation fails:
{
  "validation_passed": false,
  "reasons": [
    {"type": "pii", "field": "email", "reason": "Contains PII"}
  ]
}

Error Handling

Configure error policy via on_api_error:

Policy Behavior
fail_open (default) If governance API fails, allow workflow to continue
fail_closed If governance API fails, terminate workflow

Supported Instrumentation

HTTP Libraries

  • httpx (sync + async) - full body capture
  • requests - full body capture
  • urllib3 - full body capture
  • urllib - request body only

Databases

  • PostgreSQL: psycopg2, asyncpg
  • MySQL: mysql-connector-python, pymysql
  • MongoDB: pymongo
  • Redis: redis
  • ORM: sqlalchemy

SQLAlchemy Note: If your SQLAlchemy engine is created before create_openbox_worker() runs (e.g., at module import time), you must pass it via the sqlalchemy_engine parameter. Without this, SQLAlchemyInstrumentor only patches future create_engine() calls and won't capture queries on pre-existing engines.

from db.engine import engine

worker = create_openbox_worker(
    ...,
    db_libraries={"psycopg2", "sqlalchemy"},
    sqlalchemy_engine=engine,
)

File I/O

  • open(), read(), write(), readline(), readlines()
  • Skips system paths (/dev/, /proc/, /sys/, __pycache__)

Architecture

See System Architecture for detailed component design.

High-Level Flow:

Workflow/Activity → Interceptors → Span Processor → OpenBox Core API
                                                    ↓
                                            Returns Verdict
                                                    ↓
                                    (ALLOW, BLOCK, HALT, etc.)

Advanced Usage

For manual control, import individual components:

from openbox import (
    initialize,
    WorkflowSpanProcessor,
    GovernanceInterceptor,
    GovernanceConfig,
)
from openbox.otel_setup import setup_opentelemetry_for_governance
from openbox.activity_interceptor import ActivityGovernanceInterceptor
from openbox.activities import send_governance_event

# 1. Initialize SDK
initialize(api_url="http://localhost:8086", api_key="obx_test_key_1")

# 2. Create span processor
span_processor = WorkflowSpanProcessor(
    ignored_url_prefixes=["http://localhost:8086"]
)

# 3. Setup OTel instrumentation
setup_opentelemetry_for_governance(
    span_processor,
    sqlalchemy_engine=engine,  # optional: instrument pre-existing engine
)

# 4. Create governance config
config = GovernanceConfig(
    on_api_error="fail_closed",
    api_timeout=30.0,
)

# 5. Create interceptors
workflow_interceptor = GovernanceInterceptor(
    api_url="http://localhost:8086",
    api_key="obx_test_key_1",
    span_processor=span_processor,
    config=config,
)

activity_interceptor = ActivityGovernanceInterceptor(
    api_url="http://localhost:8086",
    api_key="obx_test_key_1",
    span_processor=span_processor,
    config=config,
)

# 6. Create worker
from temporalio.worker import Worker
worker = Worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity, send_governance_event],
    interceptors=[workflow_interceptor, activity_interceptor],
)

Documentation


Testing

The SDK includes comprehensive test coverage with 10 test files:

pytest tests/

Test files: test_activities.py, test_activity_interceptor.py, test_config.py, test_otel_setup.py, test_span_processor.py, test_tracing.py, test_types.py, test_worker.py, test_workflow_interceptor.py


License

MIT License - See LICENSE file for details


Support

  • Issues: GitHub Issues
  • Documentation: See ./docs/

Version: 1.0.2 | Last Updated: 2026-02-12

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openbox_temporal_sdk_python-1.0.21.tar.gz (461.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openbox_temporal_sdk_python-1.0.21-py3-none-any.whl (40.7 kB view details)

Uploaded Python 3

File details

Details for the file openbox_temporal_sdk_python-1.0.21.tar.gz.

File metadata

File hashes

Hashes for openbox_temporal_sdk_python-1.0.21.tar.gz
Algorithm Hash digest
SHA256 4745cecb4c0d01f15de3c1daeedd34e7cdd6f4136ba3318b7eba8c94737dac8e
MD5 06bddbf45e4e898e181fe5f924d78824
BLAKE2b-256 c0c841902d340dc715ae5b84f25e99e868c2d33e02e94fbded6f8f9f6472f3ba

See more details on using hashes here.

File details

Details for the file openbox_temporal_sdk_python-1.0.21-py3-none-any.whl.

File metadata

File hashes

Hashes for openbox_temporal_sdk_python-1.0.21-py3-none-any.whl
Algorithm Hash digest
SHA256 44d401d9f03047727fb852b58d8be19007c9f3b5db0ea16ebeb69f1016d07b09
MD5 508caa38c69c2beafe21bab229f891d2
BLAKE2b-256 36fd0c9db15843b1404f5f776503aaff357f8929c21ed1eaaf775bef091c8004

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page