Skip to main content

Async-first IEC 61850 client for Python.

Project description

iec61850

Async-first, type-hinted IEC 61850 client for Python.

Features

  • TCP connect / disconnect with timeout
  • TLS connect (IEC 62351-3 cipher whitelist, mutual TLS, TLS 1.2 + 1.3, known-peer pinning, CRL, configurable validation knobs)
  • Per-connection tuning: request timeout, max outstanding invocations, local max PDU size
  • High-level Iec61850Client async context manager wrapping the lifecycle of an IedConnection plus an optional background ReportDispatcher
  • Typed scalar read / write: bool, int32, int64, uint32, float, float64, string, timestamp (decoded to datetime), quality (decoded to a Quality dataclass)
  • Generic read / write with array-element and sub-component selection
  • Directory queries: get_server_directory, get_logical_device_directory, get_logical_node_directory(AcsiClass), get_data_directory
  • Schema introspection: get_variable_specification(ref, fc) (recursive MMS type tree) and get_device_model() (per-LD named-variable index)
  • Dataset admin: create_data_set, delete_data_set, get_data_set_values, set_data_set_values
  • Connection control: disconnect (graceful) and abort (rude close — drop TCP without sending MMS Conclude)
  • URCB / BRCB reporting: get_rcb_values, set_rcb_values, install_report_handler, poll_reports, background ReportDispatcher
  • Log service: query_journal_by_time and query_journal_after_entry for paginating Log Control Block contents
  • SCL / ICD / CID document parser: load_scl(path) / parse_scl(xml)Scl handle exposing IED inventory, the full document as a nested dict, and a canonical text summary per IED
  • Server hosting: IedServer.from_scl(path, ied_name=...) instantiates an MMS server from an SCL document. Bind, configure (vendor / model name / max connections), enter via async with, push value updates with typed update_* methods, intercept reads / writes with on_read / on_write callbacks, serve control commands (SBO / direct, normal / enhanced) with on_controloperate and wait callbacks may be either sync or async — and expose URCB-driven reporting via add_dataset / register_urcb; atomic multi-attribute updates with with server.batch():
  • Control: select, select_with_value, operate, cancel across the four IEC 61850 control models (direct-normal / direct-enhanced / sbo-normal / sbo-enhanced)
  • Typed exception hierarchy: IedError, IedConnectionError, IedTimeoutError, IedDataAccessError, IedServiceError, IedControlError, IedServerError

Install

pip install iec61850

Requires Python 3.11+. Wheels are published for Windows x86_64 and Linux x86_64 (manylinux 2014).

Conformance (PICS)

ACSI service support per IEC 61850-7-2 Edition 2.1, exposed through this binding. = supported, = not currently exposed, (n) = footnote. Sections are listed in service-class order so that an unsupported row is visible rather than absent.

Application Association (§7)

Service Client Server
Associate (Two-Party, MMS/TCP)
Abort (graceful Conclude)
Abort (rude — TCP drop)
Release
TLS 1.2 / 1.3 (IEC 62351-3) ✓ (1)
Mutual TLS / pinned peer / CRL
Authentication (ACSE password)
  1. Includes 62351-3 cipher whitelist, verify_hostname knob, version pinning, and known-peer profile.

Server class (§8)

Service Client Server
GetServerDirectory
GetServerCapabilities (Ed.2)

Logical Device class (§9)

Service Client Server
GetLogicalDeviceDirectory

Logical Node class (§10)

Service Client Server
GetLogicalNodeDirectory
GetAllDataValues

Data class (§11)

Service Client Server
GetDataValues ✓ (2)
SetDataValues ✓ (2)
GetDataDirectory
GetDataDefinition / GetVariableSpec
  1. Server applies the configured WriteAccessPolicies (default SP | SV | SE) and any registered on_read / on_write callbacks.

Data Set class (§12)

Service Client Server
GetDataSetValues
SetDataSetValues
CreateDataSet (dynamic)
DeleteDataSet
GetDataSetDirectory
Static (SCL-defined) datasets ✓ (3)
  1. Server registers static datasets through add_dataset(); either bound to a URCB or exposed standalone for GetDataSetValues.

Substitution (§13)

Service Client Server
Set substituted value (FC=SV writes)
Dedicated Substitution service API

Setting Group Control Block — SGCB (§14)

Service Client Server
SelectActiveSG / SelectEditSG
GetSGCBValues / SetSGCBValues
ConfirmEditSGValues
Setting access (FC=SG / FC=SE writes)

Reporting — URCB / BRCB (§17)

Service Client Server
URCB — GetURCBValues / SetURCBValues
URCB — Report (TrgOps, OptFlds, BufTm, IntgPd)
URCB — General Interrogation
BRCB — GetBRCBValues / SetBRCBValues / Report
Background report dispatcher n/a

Logging — LCB / Log (§15)

Service Client Server
ReadJournal (by time / by entry)
LCB — GetLCBValues / SetLCBValues
QueryLogByTime / QueryLogAfter
Log purging

Generic Substation Event — GOOSE / GSE (§18)

Service Client Server
GoCB — Get / Set values
GOOSE publish
GOOSE subscribe

Transmission of Sampled Values — SVCB (§19)

Service Client Server
MSVCB / SVCB — Get / Set values
Sampled-value publish
Sampled-value subscribe

Control (§20)

Service Client Server
status-only
direct-normal
sbo-normal
direct-enhanced
sbo-enhanced
Select / SelectWithValue
Operate / Cancel
Test mode, ctlNum, origin
TimeActivatedOperate
AddCause feedback

Time and Time Synchronization (§21)

Service Client Server
UTC time read (Timestamp DA)
Time-quality flags on update n/a
SNTP / NTP responder
SNTP / NTP client

File Transfer (§23)

Service Client Server
GetFile / SetFile / DeleteFile
GetFileAttributeValues
GetServerDirectory(FILE)

Tooling (out-of-band)

Capability Status
SCL / ICD / CID parser
Two-stage SCL pipeline (XML + cross-element resolution)
Typed-spec introspection (TypeSpec)

Quick start

import asyncio
import iec61850

async def main():
    conn = await iec61850.IedConnection.connect("127.0.0.1:102", timeout_ms=5000)
    try:
        status = await conn.read_int32("simpleIOGenericIO/LLN0.Mod.stVal", iec61850.FC.ST)
        vendor = await conn.read_string("simpleIOGenericIO/LLN0.NamPlt.vendor", iec61850.FC.DC)
        quality = await conn.read_quality("simpleIOGenericIO/GGIO1.Ind1.q", iec61850.FC.ST)
        print(status, vendor, quality.validity)
    finally:
        await conn.disconnect()

asyncio.run(main())

TLS

ca_pem = open("ca.pem", "rb").read()
tls = iec61850.TlsConfig(ca_pem=ca_pem)

conn = await iec61850.IedConnection.connect_tls(
    "ied.example.com:3782",
    tls,
    server_name="ied.example.com",
    timeout_ms=5000,
)

Mutual TLS adds a client cert and key:

tls = iec61850.TlsConfig(
    ca_pem=open("ca.pem", "rb").read(),
    client_cert_pem=open("client.crt", "rb").read(),
    client_key_pem=open("client.key", "rb").read(),
)

Defaults: TLS 1.2–1.3, IEC 62351-3 cipher whitelist, chain and time validation on, session resumption on. Set verify_hostname=False on TlsConfig to skip SNI / SAN hostname matching for closed-network commissioning (other validation still applies).

Pinned peers, CRL, version pinning

tls = iec61850.TlsConfig(
    ca_pem=open("ca.pem", "rb").read(),
    # Restrict accepted server certificates to a fixed allow-list
    # (IEC 62351-3 known-peer profile).
    allow_only_known_peers=True,
    known_peer_pems=(open("ied1.crt", "rb").read(),),
    # Pin a single TLS version.
    min_version=iec61850.TlsVersion.TLS_1_3,
    max_version=iec61850.TlsVersion.TLS_1_3,
    # Revocation checks.
    crl_pems=(open("ca.crl.pem", "rb").read(),),
)

High-level client

Iec61850Client wraps IedConnection as an async context manager and optionally runs a background report dispatcher:

cfg = iec61850.Iec61850ClientConfig(
    address="ied.example.com",
    port=102,
    timeout_ms=5000,
    # Tuning that flows down into the underlying MMS client.
    request_timeout_ms=3000,
    max_outstanding=4,
    local_max_pdu_size=16384,
    # Background dispatcher; None to disable.
    report_dispatcher_interval_ms=100,
)

async with iec61850.Iec61850Client(cfg) as cli:
    val = await cli.connection.read_float(
        "simpleIOGenericIO/GGIO1.AnIn1.mag.f", iec61850.FC.MX
    )

For TLS, pass a TlsConfig on the config and (optionally) override the SNI:

cfg = iec61850.Iec61850ClientConfig(
    address="10.0.0.1",            # network address
    port=3782,
    tls=iec61850.TlsConfig(ca_pem=ca_pem),
    tls_server_name="ied.example.com",  # SNI; defaults to `address`
)

The same request_timeout_ms / max_outstanding / local_max_pdu_size keyword arguments are also accepted on IedConnection.connect and IedConnection.connect_tls for callers that prefer to manage the connection lifecycle directly.

Generic read / write

# Native Python types: scalars surface as bool / int / float / str;
# bytes-like kinds as bytes; arrays and structures as list.
value = await conn.read("simpleIOGenericIO/GGIO1.AnIn1.mag.f", iec61850.FC.MX)

await conn.write(
    "simpleIOGenericIO/LLN0.NamPlt.vendor", iec61850.FC.DC, "Acme"
)

Array elements and sub-components are addressed with keyword arguments:

# Reads the third element of an array DA.
elem = await conn.read("LD/LN.Arr", iec61850.FC.ST, array_index=2)

# Reads `stVal` inside the third element.
sub = await conn.read(
    "LD/LN.Arr", iec61850.FC.ST, array_index=2, component="stVal"
)

Schema introspection

# Per-variable MMS TypeSpecification, returned as a nested dict.
ts = await conn.get_variable_specification(
    "simpleIOGenericIO/LLN0.Mod", iec61850.FC.ST
)
# ts == {"kind": "structure", "components": [
#   {"name": "stVal", "type": {"kind": "integer", "width_bits": 32}},
#   {"name": "q",     "type": {"kind": "bit_string", "bits": 13}},
#   ...
# ]}

# Whole device-model index — list of logical devices with their MMS
# NamedVariable names. First call fetches; subsequent calls hit a cache.
model = await conn.get_device_model()
for ld in model["logical_devices"]:
    print(ld["name"], len(ld["variables"]))

# Force a re-fetch if the server model may have changed.
fresh = await conn.get_device_model(refresh=True)

Every type-spec node carries a "kind" discriminator. Scalar kinds add payload fields appropriate for the type (width_bits, format_width / exponent_width, max_chars, bits, ...). "array" adds element_count plus a recursive element_type. "structure" adds components — a list of {"name", "type"} entries. "unknown" surfaces the raw ASN.1 tag for forward compatibility.

Datasets

await conn.create_data_set(
    "simpleIOGenericIO/LLN0.ds1",
    [
        iec61850.DataSetMember("simpleIOGenericIO/GGIO1.AnIn1.mag.f", iec61850.FC.MX),
        iec61850.DataSetMember("simpleIOGenericIO/GGIO1.Ind1.stVal", iec61850.FC.ST),
    ],
)

values = await conn.get_data_set_values("simpleIOGenericIO/LLN0.ds1")
# values is a list ordered to match the dataset members.

await conn.set_data_set_values("simpleIOGenericIO/LLN0.ds1", [3.14, True])

deleted = await conn.delete_data_set("simpleIOGenericIO/LLN0.ds1")

DataSetMember accepts optional array_index / component to target an array element or a sub-component (component requires array_index); the facade composes the alternate-access reference for you.

get_data_set_values and set_data_set_values raise IedDataAccessError when any single entry's access or write fails on the server, with the entry index in the error message.

Connection control

# Normal close — MMS Conclude exchange, then TCP shutdown.
await conn.disconnect()

# Rude close — drop the TCP socket without negotiation. Use when the peer
# stops responding or a normal disconnect would block.
await conn.abort()

Log service

LOG_REF = "IED1LD0/LLN0$LG$evlog"

# First page — by time range. ``more_follows`` signals that the server
# truncated the response and the caller should resume.
entries, more = await conn.query_journal_by_time(LOG_REF, start_ms, end_ms)
for e in entries:
    print(e.time_ms, e.entry_id.hex(), len(e.variables))

# Resume from the last seen entry. Both arguments — the entry's ``time_ms``
# and 8-byte ``entry_id`` — are applied as filters server-side.
cursor = entries[-1]
more_entries, _ = await conn.query_journal_after_entry(
    LOG_REF, cursor.time_ms, cursor.entry_id
)

JournalEntry.variables is a tuple of JournalEntryVariable(data_ref, value, reason_code). Values follow the same conversion rules as IedConnection.read — scalars surface natively; bytes-like kinds (BIT_STRING / OCTET_STRING / UTC_TIME / BINARY_TIME) as bytes; composites as list.

Reporting

def on_report(report: iec61850.ClientReport) -> None:
    print(report.rcb_reference, len(report.entries))

rcb = await conn.get_rcb_values("simpleIOGenericIO/LLN0$RP$urcb01")
rcb.resv = True
rcb.rpt_ena = True
await conn.set_rcb_values(rcb, iec61850.RcbWriteMask.fields("resv", "rpt_ena"))
await conn.install_report_handler(rcb.object_reference, on_report)

dispatcher = conn.spawn_report_dispatcher(interval_ms=100)
try:
    await asyncio.sleep(10)
finally:
    await dispatcher.aclose()

Control

spc = conn.create_control_object(
    "IED1LD0/GGIO1.SPCSO1",
    iec61850.ControlModel.SBO_ENHANCED,
)
spc.set_origin(iec61850.OriginValue(or_cat=3, or_ident=b"py-client"))

if (await spc.select_with_value(True)).success:
    outcome = await spc.operate(True)
    if not outcome.success:
        print("operate failed:", outcome.add_cause)

SCL / ICD / CID parser

Load an IED configuration document and inspect it as plain Python data:

scl = iec61850.load_scl("MyDevice.icd")

scl.ieds()                          # ['IED1']

doc = scl.to_dict()
doc["ieds"][0]["name"]              # 'IED1'
doc["ieds"][0]["manufacturer"]      # 'ACME'

ld = doc["ieds"][0]["access_points"][0]["server"]["logical_devices"][0]
ld["inst"]                          # 'GenericIO'
[ln["ln_class"] for ln in ld["logical_nodes"]]   # ['LLN0', 'GGIO', ...]

# Resolve a logical-node's data objects via the DataTypeTemplates section:
ln_type_id = ld["logical_nodes"][0]["ln_type"]
ln_type = doc["data_type_templates"]["ln_node_types"][ln_type_id]
[do["name"] for do in ln_type["dos"]]            # ['Mod', 'Beh', ...]

Both load_scl and parse_scl run the full two-stage pipeline (XML syntax → cross-element type-reference resolution). The returned dict mirrors the SCL XML structure; type references stay as strings so callers can index into doc["data_type_templates"] (ln_node_types / do_types / da_types / enum_types) themselves.

For a stable text representation — useful as a regression / diff oracle — ask for the canonical summary of a single IED:

print(scl.summary("IED1"))
# IED name=IED1
#   lds count=1
#     LD inst=GenericIO ld_name=<None> lns=2
#       LN class=LLN0 inst= prefix= dos=2 ...
#       ...

Parse failures surface as SclError, a subclass of IedError, with line, column, element_path, attribute, kind, and message attributes set so the offending location is directly reachable:

try:
    iec61850.load_scl("broken.icd")
except iec61850.SclError as e:
    print(e.kind, "at", e.line, ":", e.column, "→", e.element_path, "@", e.attribute)
    # e.g. UnresolvedTypeReference at 42 : 7 → SCL/IED[name="IED1"]/.../LN[...] @ lnType

Server hosting

Host an IED defined by an SCL / ICD / CID document as an MMS server. IedServer.from_scl() builds the runtime model, bind() selects the TCP address (port 0 requests an OS-assigned port), and async with manages the lifecycle. While running, push value updates with the typed update_* methods, addressing data attributes by "<LD>/<LN>.<DO>.<DA>[.<sub>]*".

import asyncio
import iec61850

async def main():
    server = iec61850.IedServer.from_scl("plant.icd", ied_name="IED1")
    server.bind("0.0.0.0:0")
    server.vendor = "ACME"
    server.model_name = "Generic-IO"
    server.max_connections = 5

    async with server:
        print("listening on", server.bound_addr)
        while True:
            server.update_bool("GenericIO/GGIO1.Ind1.stVal", True)
            server.update_float32("GenericIO/MMXU1.TotW.mag.f", measure_power())
            await asyncio.sleep(0.1)

asyncio.run(main())

Configuration setters (vendor, model_name, revision, max_connections) must be called before start(). Updating an unknown path raises KeyError; a type mismatch (e.g. pushing update_float32 to a BOOLEAN attribute) raises IedDataAccessError. Bind failures and other lifecycle errors surface as IedServerError.

The supported typed updates are update_bool, update_int32, update_int64, update_uint32, update_float32, update_float64, and update_string.

Read / write callbacks

Register per-attribute callbacks to override cached reads or intercept incoming writes. Both on_read and on_write may be called before start() (queued and installed at startup) or while the server is running (installed immediately). Re-registering the same path replaces the previous callback.

def measure_indication(path: str) -> bool:
    # Sampled from physical I/O on every client read.
    return read_io(path)

def validate_setpoint(path: str, value: int) -> bool:
    if value < 0 or value > 100:
        err = iec61850.IedDataAccessError("setpoint out of range")
        err.code = "ObjectValueInvalid"
        raise err
    apply_setpoint(value)
    return True   # also store value in the server-side cache

server.on_read("GenericIO/GGIO1.Ind1.stVal", measure_indication)
server.on_write("GenericIO/GGIO1.SetPt1.setVal", validate_setpoint)

on_read return values:

Return Behaviour
any scalar the value is returned to the client
None fall through to the cached value
raises read fails with IedDataAccessError

on_write return values:

Return Behaviour
True accept; cache is updated with the incoming value
False / None accept; cache is not updated (you manage it)
raises reject; client sees IedDataAccessError

Set a code attribute on the raised exception to control the reported DataAccessError variant — "HardwareFault", "TemporarilyUnavailable", "ObjectAccessDenied", "ObjectValueInvalid", etc. Without code the server reports ObjectAccessDenied.

Control callbacks

on_control binds the server-side execution of a control object. Address it at the DO ("<LD>/<LN>.<DO>") and declare which IEC 61850 control model the DO uses. Up to three callbacks may be supplied:

  • check — sync. Static validation before the operate phase fires (interlocks, mode, permissions). Raise to reject; the return value is ignored.
  • operate — sync or async. The actual command execution. Raise on failure; the return value is ignored.
  • wait — sync or async. Dynamic check during the operate phase for sbo-enhanced controls (e.g. wait for synchro-check confirmation).

Each callback receives (path, ctl_val, action). action is a dict with ctl_num, test, synchro_check, interlock_check, is_select, ctl_time_ms, and origin (a sub-dict with or_cat and or_ident).

async def operate(path: str, value: bool, action: dict) -> None:
    if action["test"]:
        return                                  # test command — no I/O
    await drive_breaker(path, value)

def check(_path: str, _value: bool, _action: dict) -> None:
    if interlock_blocked():
        err = iec61850.IedControlError("interlocked")
        err.add_cause = "BlockedByInterlocking"
        raise err

server.on_control(
    "GenericIO/GGIO1.SPCSO1",
    ctl_model="direct-normal",
    check=check,
    operate=operate,
)

ctl_model is one of "status-only", "direct-normal", "sbo-normal", "direct-enhanced", "sbo-enhanced". For SBO models the optional sbo_timeout_ms (default 30000) and sbo_class ("operate-once" or "operate-many", default "operate-once") configure the select-phase behaviour.

Raise an exception from any callback to reject the command. Set add_cause on the exception to the variant name (e.g. "BlockedByInterlocking", "BlockedByProcess", "NotSupported") or its numeric MMS code; absence falls back to "Unknown".

Datasets and unbuffered reporting

Declare a server-side dataset and bind it to an Unbuffered Report Control Block (URCB) before start(). The same client APIs (get_rcb_values, set_rcb_values, install_report_handler) consume reports from the URCB once the server is running.

server.add_dataset(
    "GGIO1$ds1",
    [
        "GenericIO/GGIO1.Ind1.stVal",
        "GenericIO/GGIO1.AnIn1.mag.f",
    ],
)

server.register_urcb(
    "GenericIO/LLN0.urcb01",
    dataset="GGIO1$ds1",
    trg_ops=["data_changed", "gi"],
    opt_flds=["seq_num", "time_stamp", "reason", "data_set"],
    buf_tm_ms=50,
)

Dataset names follow the IEC 61850 convention "<LN>$<dsName>". Every entry in a dataset must belong to the same logical device. register_urcb accepts rpt_id (defaults to "<domain>/<LN>$RP$<rcb_name>"), conf_rev, trg_ops, opt_flds, buf_tm_ms, and intg_pd_ms.

Trigger options: "data_changed", "quality_changed", "data_update", "integrity", "gi", plus the aliases "all" and "none".

Optional fields: "seq_num", "time_stamp", "reason", "data_set", "data_reference", "conf_rev", "buffer_overflow", "entry_id". Per IEC 61850-7-2 §15, buffer_overflow and entry_id are masked out on the wire for unbuffered reports.

Datasets without a URCB are still reachable via get_data_set_values.

Atomic batch updates

server.batch() returns a synchronous context manager that holds the server's data-model lock for the duration of the with block. Concurrent batches raise RuntimeError rather than deadlocking, so callers can choose to retry or fail fast.

with server.batch():
    server.update_bool("GenericIO/GGIO1.Ind1.stVal", True)
    server.update_float32("GenericIO/GGIO1.AnIn1.mag.f", 12.5)
    server.update_int32("GenericIO/GGIO1.SetPt1.setVal", 7)

Error handling

try:
    conn = await iec61850.IedConnection.connect("10.0.0.1:102", timeout_ms=2000)
except iec61850.IedTimeoutError:
    ...   # connection timed out
except iec61850.IedConnectionError:
    ...   # TCP / OSI stack failure
except iec61850.IedError:
    ...   # catch-all base for any IEC 61850 error

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

iec61850-0.10.0-cp311-abi3-win_amd64.whl (2.6 MB view details)

Uploaded CPython 3.11+Windows x86-64

iec61850-0.10.0-cp311-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB view details)

Uploaded CPython 3.11+manylinux: glibc 2.17+ x86-64

File details

Details for the file iec61850-0.10.0-cp311-abi3-win_amd64.whl.

File metadata

  • Download URL: iec61850-0.10.0-cp311-abi3-win_amd64.whl
  • Upload date:
  • Size: 2.6 MB
  • Tags: CPython 3.11+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for iec61850-0.10.0-cp311-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 611277678fecd954a6620854a60d80e7672abf72da316015d76f4190b4945797
MD5 ad5c9465e727222adaf983694aab2afa
BLAKE2b-256 50dbde7ff5d720b50d9fd5cdcd072f65bf6200b6f4b4179534142b8af3ed118d

See more details on using hashes here.

File details

Details for the file iec61850-0.10.0-cp311-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for iec61850-0.10.0-cp311-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 7f7b6d9e9aea7417286065af2fd9bf870c2c18e2fe8b3003cbc56374c00e19c7
MD5 1eacbfed3ead4925333596a43eddeb6d
BLAKE2b-256 277273e39b88fe554d58c625ac7727a0d343111db0c0f8c966865005c7dece57

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page