OpenBox SDK - Governance and observability for Temporal workflows
Project description
OpenBox SDK for Temporal Workflows
OpenBox SDK provides governance and observability for Temporal workflows by capturing workflow/activity lifecycle events, HTTP telemetry, database queries, and file operations, then sending them to OpenBox Core for policy evaluation.
Key Features:
- 6 event types (WorkflowStarted, WorkflowCompleted, WorkflowFailed, SignalReceived, ActivityStarted, ActivityCompleted)
- 5-tier verdict system (ALLOW, CONSTRAIN, REQUIRE_APPROVAL, BLOCK, HALT)
- Hook-level governance — per-operation evaluation (HTTP requests, file I/O, database queries, function tracing) with started/completed stages
- HTTP/Database/File I/O instrumentation via OpenTelemetry
- Guardrails: Input/output validation and redaction
- Human-in-the-loop approval with expiration handling
- Zero-code setup via
create_openbox_worker()factory
Installation
pip install openbox-temporal-sdk-python
Requirements:
- Python 3.11+
- Temporal SDK 1.23+ (1.8+ for factory-only usage)
- OpenTelemetry API/SDK 1.38.0+
Plugin Integration (Recommended)
Use OpenBoxPlugin for drop-in integration with Temporal Workers:
import os
from temporalio.worker import Worker
from openbox.plugin import OpenBoxPlugin
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[
OpenBoxPlugin(
openbox_url=os.getenv("OPENBOX_URL"),
openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)
],
)
await worker.run()
The plugin automatically configures governance interceptors, OTel instrumentation,
sandbox passthrough, and the send_governance_event activity.
Composing with Other Plugins
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[
OpenTelemetryPlugin(),
OpenBoxPlugin(openbox_url=..., openbox_api_key=...),
],
)
Requires
temporalio >= 1.23.0. For older versions, usecreate_openbox_worker()below.
Quick Start (Factory)
Use the create_openbox_worker() factory for simple integration:
import os
from openbox import create_openbox_worker
worker = create_openbox_worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
# OpenBox config
openbox_url=os.getenv("OPENBOX_URL"),
openbox_api_key=os.getenv("OPENBOX_API_KEY"),
)
await worker.run()
The factory automatically:
- Validates the API key
- Creates span processor
- Sets up OpenTelemetry instrumentation
- Creates governance interceptors
- Adds
send_governance_eventactivity - Returns fully configured Worker
Configuration
Environment Variables
OPENBOX_URL=http://localhost:8086
OPENBOX_API_KEY=obx_test_key_1
OPENBOX_GOVERNANCE_TIMEOUT=30.0
OPENBOX_GOVERNANCE_POLICY=fail_open # or fail_closed
Factory Function Parameters
worker = create_openbox_worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
# OpenBox config
openbox_url="http://localhost:8086",
openbox_api_key="obx_test_key_1",
governance_timeout=30.0,
governance_policy="fail_open",
# Event filtering
send_start_event=True,
send_activity_start_event=True,
skip_workflow_types={"InternalWorkflow"},
skip_activity_types={"send_governance_event"},
skip_signals={"heartbeat"},
# Database instrumentation
instrument_databases=True,
db_libraries={"psycopg2", "sqlalchemy"}, # None = all available
sqlalchemy_engine=engine, # pass pre-existing engine for query capture
# File I/O instrumentation
instrument_file_io=False, # disabled by default
# Standard Worker options (all supported)
activity_executor=my_executor,
max_concurrent_activities=10,
)
Governance Verdicts
OpenBox Core returns a verdict indicating what action the SDK should take.
| Verdict | Behavior |
|---|---|
ALLOW |
Continue execution normally |
CONSTRAIN |
Log constraints, continue |
REQUIRE_APPROVAL |
Pause, poll for human approval |
BLOCK |
Raise error, stop activity |
HALT |
Raise error, terminate workflow |
v1.0 Backward Compatibility:
"continue"→ALLOW"stop"→HALT"require-approval"→REQUIRE_APPROVAL
Event Types
| Event | Trigger | Captured Fields |
|---|---|---|
| WorkflowStarted | Workflow begins | workflow_id, run_id, workflow_type, task_queue |
| WorkflowCompleted | Workflow succeeds | workflow_id, run_id, workflow_type |
| WorkflowFailed | Workflow fails | workflow_id, run_id, workflow_type, error |
| SignalReceived | Signal received | workflow_id, signal_name, signal_args |
| ActivityStarted | Activity begins | activity_id, activity_type, activity_input |
| ActivityCompleted | Activity ends | activity_id, activity_type, activity_input, activity_output, spans, status, duration |
Guardrails (Input/Output Redaction)
OpenBox Core can validate and redact sensitive data before/after activity execution:
# Request
{
"verdict": "allow",
"guardrails_result": {
"input_type": "activity_input",
"redacted_input": {"prompt": "[REDACTED]", "user_id": "123"},
"validation_passed": true,
"reasons": []
}
}
# If validation fails:
{
"validation_passed": false,
"reasons": [
{"type": "pii", "field": "email", "reason": "Contains PII"}
]
}
Error Handling
Configure error policy via on_api_error (constants available as hook_governance.FAIL_OPEN / FAIL_CLOSED):
| Policy | Behavior |
|---|---|
fail_open (default) |
If governance API fails, allow workflow to continue |
fail_closed |
If governance API fails, terminate workflow |
Supported Instrumentation
HTTP Libraries
httpx(sync + async) - full body capturerequests- full body captureurllib3- full body captureurllib- request body only
Databases
Fully supported — any dbapi-compatible library using OTel's CursorTracer.traced_execution():
psycopg2,pymysql,mysql-connector-python, and other dbapi-compliant drivers
Custom hooks (best-effort) — these libraries use non-dbapi instrumentation paths; governance hooks may not work correctly in all scenarios:
asyncpg— wrapt wrapper on Connection methods (governance runs outside OTel span context)pymongo— CommandListener monitoring + wrapt Collection wrappers (dedup via thread-local flag; some internal commands likeendSessionsonly producecompletedstage)redis— native OTelrequest_hook/response_hooksqlalchemy—before/after_cursor_executeevent listeners
SQLAlchemy Note: If your SQLAlchemy engine is created before create_openbox_worker() runs (e.g., at module import time), you must pass it via the sqlalchemy_engine parameter. Without this, SQLAlchemyInstrumentor only patches future create_engine() calls and won't capture queries on pre-existing engines.
from db.engine import engine
worker = create_openbox_worker(
...,
db_libraries={"psycopg2", "sqlalchemy"},
sqlalchemy_engine=engine,
)
File I/O
open(),read(),write(),readline(),readlines()- Skips system paths (
/dev/,/proc/,/sys/,__pycache__)
Hook-Level Governance
Every HTTP request, file operation, and database query made during an activity is evaluated by OpenBox Core in real-time at two stages:
HTTP Requests
| Stage | Trigger | Data Available |
|---|---|---|
started |
Before request is sent | Method, URL, request headers, request body |
completed |
After response received | All of above + response headers, response body, status code |
File Operations
Per-operation governance evaluates every read()/write()/readline()/readlines()/writelines() call, not just open/close:
| Operation | Stage | Trigger | Data Available |
|---|---|---|---|
open |
started |
Before file is opened | File path, open mode |
read |
started |
Before read executes | File path, mode |
read |
completed |
After read returns | data (content read), bytes_read |
readline |
started |
Before readline executes | File path, mode |
readline |
completed |
After readline returns | data (line read), bytes_read |
readlines |
started |
Before readlines executes | File path, mode |
readlines |
completed |
After readlines returns | data (lines read), bytes_read, lines_count |
write |
started |
Before write executes | File path, mode |
write |
completed |
After write returns | data (content written), bytes_written |
writelines |
started |
Before writelines executes | File path, mode |
writelines |
completed |
After writelines returns | data (lines written), bytes_written, lines_count |
close |
completed |
After file is closed | bytes_read, bytes_written, operations list |
How it works (HTTP):
- OTel httpx instrumentation fires a request hook → SDK sends
startedgovernance evaluation with request data - If verdict is BLOCK/HALT → request is aborted before it leaves the process
- After response arrives → SDK sends
completedgovernance evaluation with full request+response data - If verdict is BLOCK/HALT →
GovernanceBlockedErroris raised, activity fails withGovernanceStop
How it works (File I/O):
- Activity calls
open()→ SDK sendsstartedgovernance evaluation with file path and mode - If verdict is BLOCK/HALT → file is never opened,
GovernanceBlockedErroris raised - Each
read()/write()/readline()/readlines()/writelines()call sendsstarted(before) andcompleted(after) governance evaluations — enabling content-based policy enforcement - After file is closed → SDK sends
completedgovernance with lifecycle summary (total bytes, operations list) - File governance requires
instrument_file_io=True(disabled by default)
A simple open-read-close produces 4 governance evaluations: open(started) → read(started) → read(completed) → close(completed).
Database Queries
Every database operation is evaluated at started (pre-query, can block) and completed (post-query, reports outcome):
| Field | Started | Completed |
|---|---|---|
type |
"db_query" |
"db_query" |
stage |
"started" |
"completed" |
db_system |
postgresql, mysql, mongodb, redis, sqlite | same |
db_name |
Database name | same |
db_operation |
SQL verb or command (SELECT, INSERT, GET, etc.) | same |
db_statement |
Query string or command | same |
server_address |
Host | same |
server_port |
Port | same |
duration_ms |
— | Query duration in ms |
error |
— | Error message or None |
How it works:
- Activity executes a DB query (via any supported library)
- SDK governance hook intercepts before the query → sends
startedevaluation - If verdict is BLOCK/HALT → query is aborted,
GovernanceBlockedErrorraised - Query executes normally → SDK sends
completedevaluation with duration and error status - DB governance is automatic when
instrument_databases=True(default)
Per-library strategy:
| Library | Hook Method | Can Block? | Reliability |
|---|---|---|---|
| psycopg2, pymysql, mysql-connector-python | CursorTracer.traced_execution patch |
Yes | Fully supported |
| asyncpg | wrapt wrapper on Connection methods |
Yes | Best-effort |
| pymongo | CommandListener + wrapt Collection wrappers |
Yes (wrapt only) | Best-effort |
| redis | Native OTel request_hook/response_hook |
Yes | Best-effort |
| sqlalchemy | before/after_cursor_execute events |
Yes | Best-effort |
Note: Libraries marked "Fully supported" use OTel's
CursorTracer, which guarantees governance hooks run inside the OTel span context. Best-effort libraries use custom hooks that may produce inconsistencies (e.g., missing stages for internal commands). Some C extension types (e.g.,psycopg2.extensions.cursor) cannot be patched withwrapt— in those cases governance hooks are silently skipped, but OTel span capture still works normally.
Function Tracing
Functions decorated with @traced are automatically governed when hook_governance is configured:
| Stage | Trigger | Data Available |
|---|---|---|
started |
Before function executes | Function name, module, arguments (if capture_args=True) |
completed |
After function returns/raises | All of above + result (if capture_result=True) or error info |
How it works (Function Tracing):
@traceddecorator wrapper starts OTel span- → SDK sends
startedgovernance evaluation with function name and module - If verdict is BLOCK/HALT →
GovernanceBlockedErrorraised, function never executes - Function executes normally
- → SDK sends
completedgovernance evaluation with result or error info - If verdict is BLOCK/HALT →
GovernanceBlockedErrorraised after execution - When
hook_governanceis not configured → zero overhead, no governance calls
Governed span tracking: When hook-level governance is active, the SDK marks HTTP spans as "governed" so the OTel on_end processor skips buffering them — preventing duplicate spans.
Architecture
See System Architecture for detailed component design.
High-Level Flow:
Workflow/Activity → Interceptors → Span Processor → OpenBox Core API
↓
Returns Verdict
↓
(ALLOW, BLOCK, HALT, etc.)
Hook-Level (per HTTP request):
Activity HTTP Call → OTel Hook → hook_governance → API (started) → Allow/Block
→ Response → hook_governance → API (completed) → Allow/Block
Hook-Level (per file operation):
Activity open() → hook_governance → API (started) → Allow/Block
Activity read/write() → hook_governance → API (started) → Allow/Block
→ hook_governance → API (completed) → Allow/Block
Activity close() → hook_governance → API (completed) → Allow/Block
Hook-Level (per DB query):
Activity DB Call → db_governance_hooks → hook_governance → API (started) → Allow/Block
→ Query executes → hook_governance → API (completed)
Hook-Level (per @traced function):
@traced call → hook_governance → API (started) → Allow/Block
→ Function runs → hook_governance → API (completed)
Module responsibilities:
otel_setup.py— OTel instrumentation, hooks, TracedFile wrapperhook_governance.py— Shared governance evaluator (payload building, API calls, verdict handling)db_governance_hooks.py— Per-library DB governance wrappers (wrapt, OTel hooks, SQLAlchemy events)span_processor.py— Span buffering, activity context trackingactivity_interceptor.py— Activity lifecycle governance events
Advanced Usage
For manual control, import individual components:
from openbox import (
initialize,
WorkflowSpanProcessor,
GovernanceInterceptor,
GovernanceConfig,
)
from openbox.otel_setup import setup_opentelemetry_for_governance
from openbox.activity_interceptor import ActivityGovernanceInterceptor
from openbox.activities import send_governance_event
from openbox.hook_governance import FAIL_OPEN, FAIL_CLOSED # error policy constants
# 1. Initialize SDK
initialize(api_url="http://localhost:8086", api_key="obx_test_key_1")
# 2. Create span processor
span_processor = WorkflowSpanProcessor(
ignored_url_prefixes=["http://localhost:8086"]
)
# 3. Setup OTel instrumentation (governance always enabled)
setup_opentelemetry_for_governance(
span_processor,
api_url="http://localhost:8086",
api_key="obx_test_key_1",
sqlalchemy_engine=engine, # optional: instrument pre-existing engine
)
# 4. Create governance config
config = GovernanceConfig(
on_api_error="fail_closed",
api_timeout=30.0,
)
# 5. Create interceptors
workflow_interceptor = GovernanceInterceptor(
api_url="http://localhost:8086",
api_key="obx_test_key_1",
span_processor=span_processor,
config=config,
)
activity_interceptor = ActivityGovernanceInterceptor(
api_url="http://localhost:8086",
api_key="obx_test_key_1",
span_processor=span_processor,
config=config,
)
# 6. Create worker
from temporalio.worker import Worker
worker = Worker(
client=client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity, send_governance_event],
interceptors=[workflow_interceptor, activity_interceptor],
)
Documentation
- Project Overview & PDR - Requirements, features, constraints
- System Architecture - Component design, data flows, security
- Codebase Summary - Code structure and component details
- Code Standards - Coding conventions and best practices
- Project Roadmap - Future enhancements and timeline
Testing
The SDK includes comprehensive test coverage with 15 test files:
pytest tests/
Test files: test_activities.py, test_activity_interceptor.py, test_config.py, test_db_governance_hooks.py, test_file_governance_hooks.py, test_otel_hook_pause.py, test_otel_hook_pause_db.py, test_otel_setup.py, test_plugin.py, test_plugin_integration.py, test_span_processor.py, test_tracing.py, test_types.py, test_worker.py, test_workflow_interceptor.py
License
MIT License - See LICENSE file for details
Support
- Issues: GitHub Issues
- Documentation: See
./docs/
Version: 1.2.0 | Last Updated: 2026-04-05
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file openbox_temporal_sdk_python-1.1.1.tar.gz.
File metadata
- Download URL: openbox_temporal_sdk_python-1.1.1.tar.gz
- Upload date:
- Size: 483.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
767c6132849ada9bf9990fdc48b002e568316497be7b3594e4359cd2982f1e1c
|
|
| MD5 |
8aca7a2cebe7177a0fcf192e30ea8fc9
|
|
| BLAKE2b-256 |
970a9c40ce9902db01197ca0a751cbd06b34a6affc62d55541deb0ab2dcc13af
|
File details
Details for the file openbox_temporal_sdk_python-1.1.1-py3-none-any.whl.
File metadata
- Download URL: openbox_temporal_sdk_python-1.1.1-py3-none-any.whl
- Upload date:
- Size: 69.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
95beaf1b40f7c18bdd87161737955886ee8fcd6250a1ed80aa8365a30eca6a3d
|
|
| MD5 |
d4b737e51eb35c58eb201133bada170d
|
|
| BLAKE2b-256 |
1c78c12fb6f647ef9207bfcee9aaa828ba001904be0dda54f9fe879fee613736
|