OPC UA integration for the BTEng Behavior Tree engine
Project description
bteng_opcua
OPC UA integration for the BTEng Behavior Tree engine — 12 async leaf nodes covering connect, read/write, subscriptions, history, and method calls.
Requirements
Installation
pip install bteng asyncua pyyaml
pip install -e . # from this repo
For development and testing:
pip install -e ".[dev]"
Quick start
import bteng_opcua
from bteng import Blackboard, NodeConfig, SequenceNode, NodeStatus
from bteng_opcua import OpcUaConnect, OpcUaRead, OpcUaDisconnect
# Load node definitions once at startup
bteng_opcua.load_nodes("nodes.yaml")
bb = Blackboard.create("plant")
connect = OpcUaConnect("Connect", NodeConfig(
blackboard=bb,
params={"endpoint": "opc.tcp://localhost:4840", "session_id": "plant"},
))
read = OpcUaRead("ReadTemp", NodeConfig(
blackboard=bb,
params={"session_id": "plant", "node_alias": "temperature"},
))
disconnect = OpcUaDisconnect("Disconnect", NodeConfig(
blackboard=bb,
params={"session_id": "plant"},
))
tree = SequenceNode("Task", [connect, read, disconnect])
while True:
status = tree.execute_tick()
if status != NodeStatus.RUNNING:
break
print("temperature:", bb.get("temperature"))
Nodes
All nodes are StatefulActionNode leaf nodes. They return RUNNING while an async operation is in flight, then SUCCESS or FAILURE when it completes.
| Node | What it does | Key ports |
|---|---|---|
OpcUaConnect |
Establish a named OPC UA session | endpoint (required), session_id, timeout, username, password, certificate_path, private_key_path |
OpcUaDisconnect |
Close a named session (best-effort) | session_id |
OpcUaReconnect |
Re-establish a dropped session using its saved config | session_id |
OpcUaRead |
Read one variable node, write to blackboard | session_id, node_alias or node_id+node_index, data_type, output_key, output value |
OpcUaWrite |
Write one variable node from blackboard | session_id, node_alias or node_id+node_index, data_type, value |
OpcUaBatchRead |
Read multiple nodes in one round-trip | session_id, node_aliases or node_ids, output_keys, data_types |
OpcUaBatchWrite |
Write multiple nodes in parallel | session_id, node_ids, input_keys, data_types |
OpcUaCallMethod |
Invoke an OPC UA method with typed args | method_alias, session_id, output_prefix |
OpcUaSubscribe |
React to data-change notifications | session_id, node_alias or node_id, mode, expected_value, period_ms, timeout, persistent, output value |
OpcUaWaitForValue |
Poll a node until its value matches (no subscription required) | session_id, node_alias or node_id, expected_value, poll_interval, timeout, output_key |
OpcUaReadHistory |
Read a time-windowed history from a HistoricalAccess node | session_id, node_id, output_key, duration_seconds, end_time, max_values |
OpcUaCondition |
Read a node and check its value against an expected value | session_id, node_alias or node_id, data_type, expected_value |
Port details
OpcUaConnect
| Port | Default | Purpose |
|---|---|---|
endpoint |
— | OPC UA server URL, e.g. opc.tcp://localhost:4840 |
session_id |
"default" |
Unique name for the session |
timeout |
10.0 |
Connection timeout in seconds |
username |
"" |
Username for authenticated sessions |
password |
"" |
Password for authenticated sessions |
certificate_path |
"" |
Path to client certificate (.pem/.der) |
private_key_path |
"" |
Path to client private key |
OpcUaDisconnect
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
Session to close |
OpcUaReconnect
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
Session to reconnect (must have been connected before) |
OpcUaRead
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_alias |
"" |
Registry alias (recommended) |
node_id |
"" |
Raw node ID override, e.g. ns=2;s=Temp |
node_index |
0 |
Namespace index (used with node_id) |
data_type |
"" |
Type override when not using alias |
output_key |
"" |
Blackboard key to store the value |
value (output) |
— | Value read from the server |
OpcUaWrite
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_alias |
"" |
Registry alias (recommended) |
node_id |
"" |
Raw node ID override |
node_index |
0 |
Namespace index (used with node_id) |
data_type |
"" |
Required when using node_id directly |
value |
— | Value to write (from blackboard or params) |
OpcUaBatchRead
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_aliases |
"" |
Comma-separated registry aliases |
node_ids |
"" |
Comma-separated raw node IDs (alternative) |
output_keys |
"" |
Comma-separated blackboard keys (defaults to alias names) |
data_types |
"" |
Comma-separated type names (optional) |
OpcUaBatchWrite
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_ids |
"" |
Comma-separated raw node IDs |
input_keys |
"" |
Comma-separated blackboard keys to read from |
data_types |
"" |
Comma-separated type names (required) |
OpcUaCallMethod
| Port | Default | Purpose |
|---|---|---|
method_alias |
"" |
Registry alias of the method node |
session_id |
"default" |
OPC UA session identifier |
output_prefix |
"" |
Prefix prepended to output blackboard keys |
Input arguments are read from the blackboard by arg name (remappable via NodeConfig.input_ports). Output arguments are written as {output_prefix}{arg_name} (remappable via NodeConfig.output_ports). If condition_node_id is set in the node definition, that node is read and compared to condition_value before the call; a mismatch returns FAILURE.
OpcUaSubscribe
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_alias |
"" |
Registry alias |
node_id |
"" |
Raw node ID override |
node_index |
0 |
Namespace index (used with node_id) |
data_type |
"" |
Type override |
mode |
"on_change" |
on_change — any notification; on_value — wait for expected_value |
expected_value |
None |
Required value when mode=on_value |
period_ms |
500 |
Subscription publish interval in ms |
timeout |
0.0 |
Max wait in seconds; 0 = infinite |
output_key |
"" |
Blackboard key to store received value |
persistent |
"false" |
Keep subscription alive across activations |
value (output) |
— | Value received from the subscription |
OpcUaWaitForValue
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_alias |
"" |
Registry alias |
node_id |
"" |
Raw node ID override |
node_index |
0 |
Namespace index (used with node_id) |
data_type |
"" |
Type override |
expected_value |
"" |
Value to wait for |
output_key |
"" |
Blackboard key to store matched value |
poll_interval |
0.5 |
Seconds between reads |
timeout |
0.0 |
Max wait in seconds; 0 = infinite |
OpcUaReadHistory
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_id |
"" |
Full node ID (required) |
output_key |
"history" |
Blackboard key for the result list of (datetime, value) tuples |
duration_seconds |
60.0 |
Length of the history window in seconds |
end_time |
"now" |
End of window: ISO-8601 UTC string or "now" |
max_values |
100 |
Maximum data points to return |
OpcUaCondition
| Port | Default | Purpose |
|---|---|---|
session_id |
"default" |
OPC UA session identifier |
node_alias |
"" |
Registry alias |
node_id |
"" |
Raw node ID override |
node_index |
0 |
Namespace index (used with node_id) |
data_type |
"" |
Type for value conversion |
expected_value |
"" |
Expected value (string coercion applies) |
Node registry
Node definitions are loaded from YAML (or JSON) files. Call bteng_opcua.load_nodes(path) once at startup to populate the global registry.
nodes:
# Data node
- alias: "temperature"
node_index: 2
node_id: "s=Temperature"
data_type: "float"
# Method node
- alias: "move_to_position"
node_index: 2
node_id: "" # unused for method nodes
data_type: "method"
object_id: "ns=2;i=100"
method_id: "ns=2;i=101"
condition_node_id: "ns=2;i=50" # optional: read before calling
condition_value: true
input_args:
- "target_x:double"
- "target_y:double"
- "speed:float"
output_args:
- "result_code:int32"
- "reached_x:double"
- "reached_y:double"
Required fields for every node: alias, node_index, node_id, data_type.
Method nodes additionally require object_id and method_id.
Args use the shorthand name:type string or a {name, type} dict.
Multiple files can be loaded — later definitions with the same alias overwrite earlier ones.
Data types
The OpcUaDataType enum defines all valid values for data_type fields.
| Value | Python type | Notes |
|---|---|---|
int32 |
int |
|
int64 |
int |
|
float |
float |
|
double |
float |
|
string |
str |
|
boolean |
bool |
|
uint16 |
int |
|
uint32 |
int |
|
uint64 |
int |
|
byte |
int |
|
method |
— | Reserved for method nodes; not valid in arg definitions |
int32[] |
list[int] |
|
int64[] |
list[int] |
|
float[] |
list[float] |
|
double[] |
list[float] |
|
string[] |
list[str] |
|
boolean[] |
list[bool] |
|
uint16[] |
list[int] |
|
uint32[] |
list[int] |
|
uint64[] |
list[int] |
|
byte[] |
list[int] |
Array types pass a Python list to ua.Variant; asyncua infers the array flag automatically.
Session management
Sessions are identified by a string session_id (default: "default"). Multiple sessions can be active simultaneously, pointing at different OPC UA servers.
OpcUaConnectstores the connection config after a successful handshake.OpcUaDisconnectremoves the live client but preserves the config.OpcUaReconnectuses the saved config to re-establish the connection without requiring theendpointport again.
All nodes share the singleton OpcUaSessionManager, which runs a background async event loop. To inject a custom backend (e.g. for tests):
from bteng_opcua.session import OpcUaSessionManager
from bteng_opcua.backend.base import OpcUaBackend
class MockBackend(OpcUaBackend):
async def connect(self, endpoint, **kw): return object()
async def disconnect(self, client): pass
async def read_node(self, client, node_id): return 42.0
async def write_node(self, client, node_id, value, vt): pass
async def read_nodes(self, client, node_ids): return [42.0] * len(node_ids)
async def call_method(self, client, obj, method, args): return [0]
async def read_history(self, client, node_id, start, end, max_v): return []
async def create_subscription(self, client, node_id, cb, period_ms): return object()
async def delete_subscription(self, handle): pass
OpcUaSessionManager.set_instance(OpcUaSessionManager(backend=MockBackend()))
Running tests
pip install -e ".[dev]"
pytest tests/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bteng_opcua-0.1.0.tar.gz.
File metadata
- Download URL: bteng_opcua-0.1.0.tar.gz
- Upload date:
- Size: 50.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
748e44ff962df7d24a36a95e7158d9469cae448cf9f5b1561fa7e5e5541d10b4
|
|
| MD5 |
52cf80e08640270cf6e25264498400b8
|
|
| BLAKE2b-256 |
a985c4ab6964724b3ca517e1205cf2398150d182b245d83c7e68dda4cca7c5ae
|
File details
Details for the file bteng_opcua-0.1.0-py3-none-any.whl.
File metadata
- Download URL: bteng_opcua-0.1.0-py3-none-any.whl
- Upload date:
- Size: 43.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9cb726b30bc982290769dfe3d84d7180e031dc89aca10b798e55211bfa3c27ef
|
|
| MD5 |
7252774287b73b28f3af06cdb16039b4
|
|
| BLAKE2b-256 |
bf76c7bb59c44f5a9dd63fc0fcbe1decfe59efe66cb13469ef9412bfa82dda01
|