Skip to main content

OpenBox SDK - Governance and observability for Temporal workflows

Project description

OpenBox SDK for Temporal Workflows

OpenBox SDK provides governance and observability for Temporal workflows by capturing workflow/activity lifecycle events, HTTP telemetry, database queries, and file operations, then sending them to OpenBox Core for policy evaluation.

Key Features:

  • 6 event types (WorkflowStarted, WorkflowCompleted, WorkflowFailed, SignalReceived, ActivityStarted, ActivityCompleted)
  • 5-tier verdict system (ALLOW, CONSTRAIN, REQUIRE_APPROVAL, BLOCK, HALT)
  • HTTP/Database/File I/O instrumentation via OpenTelemetry
  • Guardrails: Input/output validation and redaction
  • Human-in-the-loop approval with expiration handling
  • Zero-code setup via create_openbox_worker() factory

Installation

pip install openbox-temporal-sdk-python

Requirements:

  • Python 3.9+
  • Temporal SDK 1.8+
  • OpenTelemetry API/SDK 1.38.0+

Quick Start

Use the create_openbox_worker() factory for simple integration:

import os
from openbox import create_openbox_worker

worker = create_openbox_worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    # OpenBox config
    openbox_url=os.getenv("OPENBOX_URL"),
    openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)

await worker.run()

The factory automatically:

  1. Validates the API key
  2. Creates span processor
  3. Sets up OpenTelemetry instrumentation
  4. Creates governance interceptors
  5. Adds send_governance_event activity
  6. Returns fully configured Worker

Configuration

Environment Variables

OPENBOX_URL=http://localhost:8086
OPENBOX_API_KEY=obx_test_key_1
OPENBOX_GOVERNANCE_TIMEOUT=30.0
OPENBOX_GOVERNANCE_POLICY=fail_open  # or fail_closed

Factory Function Parameters

worker = create_openbox_worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],

    # OpenBox config
    openbox_url="http://localhost:8086",
    openbox_api_key="obx_test_key_1",
    governance_timeout=30.0,
    governance_policy="fail_open",

    # Event filtering
    send_start_event=True,
    send_activity_start_event=True,
    skip_workflow_types={"InternalWorkflow"},
    skip_activity_types={"send_governance_event"},
    skip_signals={"heartbeat"},

    # Database instrumentation
    instrument_databases=True,
    db_libraries={"psycopg2", "redis"},  # None = all available

    # File I/O instrumentation
    instrument_file_io=False,  # disabled by default

    # Standard Worker options (all supported)
    activity_executor=my_executor,
    max_concurrent_activities=10,
)

Governance Verdicts

OpenBox Core returns a verdict indicating what action the SDK should take.

Verdict Behavior
ALLOW Continue execution normally
CONSTRAIN Log constraints, continue
REQUIRE_APPROVAL Pause, poll for human approval
BLOCK Raise error, stop activity
HALT Raise error, terminate workflow

v1.0 Backward Compatibility:

  • "continue"ALLOW
  • "stop"HALT
  • "require-approval"REQUIRE_APPROVAL

Event Types

Event Trigger Captured Fields
WorkflowStarted Workflow begins workflow_id, run_id, workflow_type, task_queue
WorkflowCompleted Workflow succeeds workflow_id, run_id, workflow_type
WorkflowFailed Workflow fails workflow_id, run_id, workflow_type, error
SignalReceived Signal received workflow_id, signal_name, signal_args
ActivityStarted Activity begins activity_id, activity_type, activity_input
ActivityCompleted Activity ends activity_id, activity_type, activity_input, activity_output, spans, status, duration

Guardrails (Input/Output Redaction)

OpenBox Core can validate and redact sensitive data before/after activity execution:

# Request
{
  "verdict": "allow",
  "guardrails_result": {
    "input_type": "activity_input",
    "redacted_input": {"prompt": "[REDACTED]", "user_id": "123"},
    "validation_passed": true,
    "reasons": []
  }
}

# If validation fails:
{
  "validation_passed": false,
  "reasons": [
    {"type": "pii", "field": "email", "reason": "Contains PII"}
  ]
}

Error Handling

Configure error policy via on_api_error:

Policy Behavior
fail_open (default) If governance API fails, allow workflow to continue
fail_closed If governance API fails, terminate workflow

Supported Instrumentation

HTTP Libraries

  • httpx (sync + async) - full body capture
  • requests - full body capture
  • urllib3 - full body capture
  • urllib - request body only

Databases

  • PostgreSQL: psycopg2, asyncpg
  • MySQL: mysql-connector-python, pymysql
  • MongoDB: pymongo
  • Redis: redis
  • ORM: sqlalchemy

File I/O

  • open(), read(), write(), readline(), readlines()
  • Skips system paths (/dev/, /proc/, /sys/, __pycache__)

Architecture

See System Architecture for detailed component design.

High-Level Flow:

Workflow/Activity → Interceptors → Span Processor → OpenBox Core API
                                                    ↓
                                            Returns Verdict
                                                    ↓
                                    (ALLOW, BLOCK, HALT, etc.)

Advanced Usage

For manual control, import individual components:

from openbox import (
    initialize,
    WorkflowSpanProcessor,
    GovernanceInterceptor,
    GovernanceConfig,
)
from openbox.otel_setup import setup_opentelemetry_for_governance
from openbox.activity_interceptor import ActivityGovernanceInterceptor
from openbox.activities import send_governance_event

# 1. Initialize SDK
initialize(api_url="http://localhost:8086", api_key="obx_test_key_1")

# 2. Create span processor
span_processor = WorkflowSpanProcessor(
    ignored_url_prefixes=["http://localhost:8086"]
)

# 3. Setup OTel instrumentation
setup_opentelemetry_for_governance(span_processor)

# 4. Create governance config
config = GovernanceConfig(
    on_api_error="fail_closed",
    api_timeout=30.0,
)

# 5. Create interceptors
workflow_interceptor = GovernanceInterceptor(
    api_url="http://localhost:8086",
    api_key="obx_test_key_1",
    span_processor=span_processor,
    config=config,
)

activity_interceptor = ActivityGovernanceInterceptor(
    api_url="http://localhost:8086",
    api_key="obx_test_key_1",
    span_processor=span_processor,
    config=config,
)

# 6. Create worker
from temporalio.worker import Worker
worker = Worker(
    client=client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity, send_governance_event],
    interceptors=[workflow_interceptor, activity_interceptor],
)

Documentation


Testing

The SDK includes comprehensive test coverage with 10 test files:

pytest tests/

Test files: test_activities.py, test_activity_interceptor.py, test_config.py, test_otel_setup.py, test_span_processor.py, test_tracing.py, test_types.py, test_worker.py, test_workflow_interceptor.py


License

MIT License - See LICENSE file for details


Support

  • Issues: GitHub Issues
  • Documentation: See ./docs/

Version: 1.0.0 | Last Updated: 2026-02-04

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openbox_temporal_sdk_python-1.0.1.tar.gz (460.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openbox_temporal_sdk_python-1.0.1-py3-none-any.whl (39.8 kB view details)

Uploaded Python 3

File details

Details for the file openbox_temporal_sdk_python-1.0.1.tar.gz.

File metadata

File hashes

Hashes for openbox_temporal_sdk_python-1.0.1.tar.gz
Algorithm Hash digest
SHA256 509150cb891cc7fc976142b0c2119793ff1d7484a0e592cbab5388379c1cb2cc
MD5 a9534596744f4894eaea1e3d0ac7165e
BLAKE2b-256 7bd631ab974947a0b9457f055daf4d9edce7d54b459ba3c1cf09f9749d8ea30c

See more details on using hashes here.

File details

Details for the file openbox_temporal_sdk_python-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for openbox_temporal_sdk_python-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 063d8d8782484aa662461a0bff20a33649d89425ef8684d4500f3e6466e223c0
MD5 65a62eda3cbd8329cf702a632e40ed02
BLAKE2b-256 36795516253e710940720268a81277895d7cc25f0b2883846374f28db46abd4d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page