Skip to main content

A small, backend-neutral Modbus connection abstraction (pymodbus / tmodbus).

Project description

modbus-connection

A small, backend-neutral Modbus connection abstraction.

The top-level modbus_connection package is a pure interface — the ModbusConnection / ModbusUnit Protocols, the shared WordOrder type, and a tiny exception hierarchy. It imports no Modbus library, so consumers can type against it without committing to a backend.

Two interchangeable backends implement that interface:

Backend Module Extra
pymodbus modbus_connection.pymodbus [pymodbus]
tmodbus modbus_connection.tmodbus [tmodbus]

The bare install pulls neither backend.

Why

One physical Modbus link addresses many units (1–247). Sharing a single, internally-serialized connection across many consumers is strictly better than each opening a competing socket. This package is the connection abstraction that makes that sharing possible while keeping the backend swappable: the Protocol never changes when the backend does.

Design

  • A connection is transient and owner-held. A backend connect function returns a live, already-connected instance — there is no connect() on the object.
  • Requests are serialized per connection — but by the backend library, not by this wrapper: pymodbus's transaction manager and tmodbus's smart transport each hold a lock for the full request/response cycle, so concurrent unit calls on one connection can't interleave.
  • A connection can enforce a minimum gap between messages for devices that need a pause between frames. Pass message_spacing (seconds) to a connect function and each request — from any unit sharing the link — waits until that gap has elapsed since the previous one finished. tmodbus enforces it through its native wait_between_requests; pymodbus has no such knob, so the package applies the same gap itself. It is the spacing between requests only; to delay the first request, the owner sleeps before issuing it. Default 0 disables it.
  • The connection does not self-reconnect. On a drop it fires on_connection_lost (best-effort) and stops; recreating it is the owner's job.
  • Consumers receive a ModbusUnit (via connection.for_unit(unit_id)), a stateless per-unit handle with no lifecycle methods. Every method raises on failure — it never returns None.
  • The full 19-function-code Modbus surface is exposed — the register/coil reads and writes plus the diagnostic and identification codes (exception status, diagnostics, comm-event counter/log, report-server-id, FIFO queue, file records, device identification). A backend that cannot implement a code raises NotImplementedError (tmodbus does, for diagnostics and the comm-event codes). Datatype and word/byte-order decoding lives one layer up, in modbus_connection.decode / .encode and the modbus_connection.model device framework.

Install

pip install "modbus-connection[pymodbus]"   # pymodbus backend
pip install "modbus-connection[tmodbus]"    # tmodbus backend

Use

import asyncio
from modbus_connection.pymodbus import connect_tcp


async def main() -> None:
    conn = await connect_tcp("192.168.1.50", port=502)
    try:
        unit = conn.for_unit(1)
        from modbus_connection.decode import decode_int16, decode_float32

        outside_temp = decode_int16(await unit.read_holding_registers(9, 1))
        flow_setpoint = decode_float32(await unit.read_holding_registers(40, 2))
        pump_on = (await unit.read_coils(56, 1))[0]
        print(outside_temp, flow_setpoint, pump_on)
    finally:
        await conn.close()


asyncio.run(main())

Swapping to tmodbus is a one-line import change:

from modbus_connection.tmodbus import connect_tcp

Transports

Each backend ships a set of connect functions, one per wire transport:

Function Transport framer options
connect_tcp(host, *, port=502, framer="socket") Modbus TCP, or RTU-/ASCII-over-TCP (transparent serial-to-Ethernet gateways) socket / rtu / ascii
connect_udp(host, *, port=502, framer="socket") Modbus UDP (MBAP, RTU, or ASCII framing over UDP) socket / rtu / ascii
connect_serial(port, *, framer="rtu", baudrate=…, bytesize=…, parity=…, stopbits=…) Modbus serial — binary RTU or ASCII transmission mode rtu / ascii
connect_tls(host, *, port=802, verify=True, check_hostname=True, client_cert=None, client_key=None, client_key_password=None, sslctx=None) Modbus/TLS (Modbus Security) — (always TLS framing)

framer names the wire framing across every transport (its value set differs by transport: socket/rtu/ascii for TCP/UDP, rtu/ascii for serial; TLS is fixed).

from modbus_connection.pymodbus import connect_udp, connect_serial, connect_tls

udp = await connect_udp("192.168.1.50", port=502)
ascii_serial = await connect_serial("/dev/ttyUSB0", framer="ascii", baudrate=9600)
tls = await connect_tls("device.local")  # verifies the server certificate by default

connect_tls verifies the server certificate against the system trust store by default (verify=True) and checks the hostname (check_hostname=True). Pass verify=False for a device with a self-signed certificate, verify="/path/to/ca" for a private CA (file or directory), or check_hostname=False to verify the certificate but not the hostname. client_cert / client_key / client_key_password present a client certificate for mutual TLS; pass a ready-made ssl.SSLContext as sslctx for full control.

tmodbus exposes the same functions, except connect_udp and connect_tcp(framer="ascii") — tmodbus has no UDP or ASCII-over-TCP transport, so those raise NotImplementedError; use the pymodbus backend for them.

Exceptions

Both backends map their errors onto the same neutral hierarchy:

  • ModbusError — base class.
  • ModbusConnectionError — link down / not connected / transport failure.
  • ModbusTimeoutError — an operation timed out: a request got no valid response in time, or a connect attempt did not complete in time. Also a builtin TimeoutError, so except TimeoutError catches it too.
  • ModbusExceptionError — device returned a Modbus exception response (.exception_code carries the raw code).
  • ModbusProtocolError — a reply arrived but was not a valid frame (bad CRC/LRC, framing, or a mismatched header). Only backends that can tell a garbled reply from a missing one raise it (tmodbus does; pymodbus cannot, and surfaces both as ModbusTimeoutError).

Device modelling (modbus_connection.model)

An optional, backend-neutral framework for mapping a device's registers and coils to typed Python attributes and reading the whole device — or one sub-system — in as few Modbus calls as possible. It talks only to a ModbusUnit, so it runs over any backend (or the mock).

from modbus_connection.model import Component, gauge, uint32, coil

class Meter(Component):
    voltage = gauge(0, 0.1, unit="V")        # scaled 16-bit
    current = gauge(1, 0.1, unit="A")
    energy = uint32(2, unit="Wh")            # 32-bit over two registers
    relay = coil(0, writable=True)

meter = Meter(unit)
await meter.async_update()                   # one block read
meter.voltage                                # float | None
await meter.write("relay", True)

Generic field types ship here — integer, gauge, raw_register, uint32 / int32 / uint64 / int64, float32 / float64, string, enum / flags (map to an IntEnum / IntFlag), and the bit fields coil (FC01, writable) / discrete_input (FC02, read-only) — plus an optional nan sentinel and word_order.

Numeric fields decode affinely as raw * scale + offset. Pass offset for a device that reports a shifted value (e.g. gauge(0, 0.1, offset=-100) for a temperature stored as raw * 0.1 - 100); writable fields invert it as (value - offset) / scale. Anything more exotic is a RegisterField subclass.

writable=True lets write() send a field. Pass a validator callable instead to both mark the field writable and vet the value before each write — it is called with the requested value and returns the value to actually write (vetted or coerced), or raises to reject it, before anything reaches the device:

def in_range(value: int) -> int:
    if not 0 <= value <= 100:
        raise ValueError(f"{value} out of range")
    return value

class Boiler(Component):
    setpoint = integer(0, writable=in_range)

We don't ship validators of our own; for ready-made ones, reach for probatio.

The SunSpec module modbus_connection.model.sunspec adds the same types pre-wired with their "unimplemented" sentinels, plus the address types (ipaddr / ipv6addr / eui48).

word_order selects the order of the 16-bit registers in a multi-register value; it defaults to "big" (the Modbus convention), covering the ABCD and CDAB arrangements (word_order="little" for CDAB).

Shaping that neither covers — composing or transforming a value, packed dates/times — is left to the consumer via a private field + a @property, so static typing stays exact. For example, prefixing a version register with a hard-coded model name:

from modbus_connection.model import Component, string

class Controller(Component):
    _firmware = string(10, 4)  # 4 registers of ASCII, e.g. "1.23"

    @property
    def model(self) -> str | None:
        firmware = self._firmware
        return f"TROVIS 5576 ({firmware})" if firmware is not None else None

Writing registers

Component.write(field, value) writes a writable register or coil by attribute name. For registers it picks the function code by payload width — FC06 (write-single-register) for a one-word value, FC16 (write-multiple-registers) otherwise. Some devices honour only FC16, even for a single register; pass force_fc16=True on the field to always use FC16:

from modbus_connection.model import Component, integer

class Inverter(Component):
    # A device that honours only FC16, even for a single register.
    limit = integer(0, writable=True, force_fc16=True)

Override write() in a subclass for any device-specific write sequencing.

Each component can refresh independently and has its own update listeners. To refresh several components that share a unit in one consolidated set of reads, group them in a ComponentGroup and call async_update() on it:

group = ComponentGroup(unit, [water_heater, circuit_1, circuit_2, circuit_3])
await group.async_update()  # one pooled set of reads; each component notified

The ComponentGroup builds its pooled read plan from the components' static layout on the first update and reuses it on every later poll. The component list, their fields, and the ranges are read once and cached — mutating them after the first update is not supported; build a new ComponentGroup (or Component) instead.

Readable address ranges

Reads are pooled into block reads — addresses close together are fetched in one call. By default the planner merges anything within a small gap, which assumes every address in between is readable. Many devices only answer reads inside specific ranges, and a read that crosses a gap is rejected.

Declare the device's readable ranges and the planner merges only within a range, never across a boundary, and still clips each read to the addresses actually used. Set them as a class attribute (shared by every instance) or per instance. A ComponentGroup reads the ranges off its components, so every component in a group must declare the same ranges (it raises otherwise):

class Thermostat(Component):
    # (low, high) inclusive. The device answers 0–6 and 9–40 but nothing in
    # between, so 7–8 are never read and a 0..40 block is split at the gap.
    register_ranges = ((0, 6), (9, 40))
    coil_ranges = ((0, 15),)

    model = integer(0)
    outside = gauge(9, 0.1, unit="°C")

group = ComponentGroup(unit, [thermostat])  # ranges come from the components
await group.async_update()

Leave them as the default None for devices with a contiguous map (plain gap-based planning).

Two planning limits are tunable as Component class attributes (and validated to agree across a ComponentGroup):

  • max_gap (default 16) — only used in gap-based planning (no ranges): fields within this many addresses share one read. Higher means fewer requests but more over-reading; lower is safer for devices that reject reads of unmapped registers. (With register_ranges declared, max_gap is ignored.)
  • max_span (default 125, the Modbus per-request ceiling) — the widest a single block read may be. Lower it for a gateway that caps reads shorter.

Repeated sub-units (stride / index)

Devices that expose several identical sub-units — heating circuits, channels, phases — repeat the same registers at a fixed step.

Prefer repeating_group for these. It models the sub-unit once as a Component and hands back a typed list of instances with pooled reads — and the count can be fixed or read from the device at poll time. The index / stride and base_offset knobs below are what it is built on; reach for them directly only for a layout it can't express — chiefly a sub-unit whose registers are interleaved by type across the map (a different stride per field).

To use them directly: model the sub-unit once and instantiate it per index — pass index (1-based) to Component(...), and give each field a stride (the address step between sub-units for that register). The absolute address read is field.address + field.stride * (index - 1).

Each field carries its own stride because devices usually group registers by type, not by sub-unit — so one logical sub-unit's fields are interleaved across the map at different steps:

class Circuit(Component):
    flow_temp = gauge(12, 0.1, stride=1)          # circuits 1–3 at 12, 13, 14
    control_signal = integer(106, stride=2)       # ...        at 106, 108, 110
    flow_setpoint = gauge(999, 0.1, stride=200)   # ...        at 999, 1199, 1399

circuits = [Circuit(unit, index=n) for n in (1, 2, 3)]

A field with the default stride=0 is at a fixed address shared by every index.

When instead every field of a sub-unit shares one step — the common case for a self-contained, contiguous repeating block (e.g. a SunSpec multiple-MPPT module) — pass base_offset rather than repeating the same stride on each field. It shifts every field and bit address by a fixed amount, so you model the block once at instance 0's addresses and read instance i with base_offset = i * block_len:

class MPPTModule(Component):
    dc_w = integer(11, scale_register=2)   # one module; addresses are instance 0's
    dc_v = integer(10, scale_register=1)

modules = [MPPTModule(unit, base_offset=i * 20) for i in range(n)]

base_offset composes additively with index / stride and applies to reads and writes alike. Scale-factor registers (scale_register) are not shifted — a SunSpec repeating block's scale factors live in the shared fixed block, so they keep their absolute address (a per-instance scale register stays governed by scale_register_stride).

Building that instance list by hand, as above, is the manual form; repeating_group (below) is the same thing as a managed field — and the only way to size the list from a count the device reports at poll time. Prefer it unless you specifically need the loose list.

Runtime-counted repeats (repeating_group)

stride / base_offset cover repeats whose count is known when you write the code. Some devices instead advertise the count in a register, read at poll time — a SunSpec multiple-MPPT model (160) carries an N point saying how many modules follow. repeating_group is a field for that: model one instance as a Component, and the parent reads the count each poll and exposes a list of that many instances, each fully typed:

from modbus_connection.model import Component, integer, repeating_group
from modbus_connection.model.sunspec import uint16

class MPPTModule(Component):                 # one module, at instance 0's addresses
    dc_w = integer(11, scale_register=2)
    dc_v = integer(10, scale_register=1)

class Inverter(Component):
    modules = repeating_group(uint16(8), MPPTModule, stride=20)  # N at register 8

inv = Inverter(unit)
await inv.async_update()
inv.modules                # list[MPPTModule]
inv.modules[0].dc_w        # typed per-instance access
await inv.modules[2].write("dc_w", ...)   # writes go through the instance

count is a RegisterField (read each poll) or a fixed int; instance i is read at base_offset = i * stride, so stride is the block length. A fixed int count is static, so its instances fold into the component's normal read. A RegisterField count needs a second pass — the count is read first, then the sized-out instances (pooled among themselves) — since the count must be known before the instances it sizes can be planned. An unimplemented or unreadable count yields no instances. A component with a repeating_group can refresh on its own or be pooled in a ComponentGroup — the group reads the counts in its pooled read, then refreshes each member's groups.

Register spaces (holding vs input)

A component's register fields default to the holding space (FC03). For a read-only sub-system whose data lives in input registers (FC04), set register_space = "input" on the component — fields and factories are unchanged:

class Sensors(Component):
    register_space = "input"
    flow_temp = gauge(5, 0.1, unit="°C")   # read with FC04

Input and holding are separate address spaces (input 507 ≠ holding 507), so the planner never merges them into one read, and register_ranges applies within the component's own space. A ComponentGroup may mix input and holding components: it reads each space with its own block reads, and components only need matching register_ranges with others in the same space. Input registers are physically read-only, so writing a field on an "input" component raises.

Bit spaces (coils vs discrete inputs)

Bits work the same way over their own pair of spaces: coil fields are read and written via the coils space (FC01), and discrete_input fields are read from the discrete inputs space (FC02, read-only). The space is carried by the field type, so a single component may declare both — they are planned and read separately (coil 12 ≠ discrete input 12), exactly like input vs holding registers:

class IO(Component):
    relay = coil(0, writable=True)       # FC01, read/write
    fault = discrete_input(0)            # FC02, read-only — distinct from coil 0

Discrete inputs are physically read-only, so writing a discrete_input field raises. The two bit spaces have their own readable maps, so coil_ranges constrains coils and discrete_ranges constrains discrete inputs.

Runtime-built groups (ManualComponent)

When the field layout comes from config (e.g. YAML) rather than a typed class — there's no Component subclass to declare — use a ManualComponent. It's the imperative twin: add targets by key at runtime and it pools them into as few reads as possible, mixing all four tables (holding, input, coils, discrete inputs) in one update.

mc = ManualComponent(unit, max_gap=16)
mc.add("flow_temp", gauge(40, 0.1))                 # holding (default)
mc.add("energy",    uint32(2),  space="input")      # input registers
mc.add("relay",     coil(5, writable=True))         # coils (FC01)
mc.add("alarm",     discrete_input(9))              # discrete inputs (FC02)

data = await mc.async_update()    # {"flow_temp": 21.5, "energy": 100000, ...}
mc.get("flow_temp")               # 21.5
await mc.write("relay", True)     # per-key write (holding / coils only)

A register target takes its space ("holding" / "input") on add(); a bit target's space is fixed by the factory (coil / discrete_input). The field address is absolute (no index / stride), values come out via get(key) and the dict async_update() returns (no typed attribute access — there's no class), and add() / remove() invalidate the cached plan so it re-plans on the next update. A repeating_group can be add()ed like any other target; its instances come out via get(key) as a list of sub-components (sized at poll time for a register count). It reuses the same planning, write (validator / force_fc16), bit and repeating-group machinery as Component; it does not pool into a ComponentGroup. Readable ranges are per-table kwargs — holding_ranges / input_ranges / coil_ranges / discrete_ranges (any left unset falls back to gap-based planning):

ManualComponent(unit, holding_ranges=((0, 40),), input_ranges=((500, 520),))

Testing

An in-memory mock backend ships as a pytest plugin (auto-registered via an entry point — no conftest wiring). It implements the same Protocols, so code typed against ModbusUnit runs against it unchanged.

async def test_reads_setpoint(mock_modbus_unit):
    mock_modbus_unit.holding[40] = 1234            # single value
    mock_modbus_unit.holding[2] = [0x0001, 0x86A0]  # list -> consecutive registers
    mock_modbus_unit.holding[9] = lambda: 7         # callable -> evaluated per read

    assert await mock_modbus_unit.read_holding_registers(40, 1) == [1234]
    assert await mock_modbus_unit.read_holding_registers(2, 2) == [0x0001, 0x86A0]

Reads resolve against the per-space stores (holding, input, coils, discrete_inputs); writes mutate them and fire on_write callbacks, so a test can react to a write by mocking other registers:

def test_command_sets_ready(mock_modbus_unit):
    def respond(event):
        if event.address == 0:          # a command was written
            mock_modbus_unit.holding[100] = 1   # device flips its "ready" flag
    mock_modbus_unit.on_write(respond)

To simulate a device rejecting a write, arm fail_write. The next write covering that address raises the given error before the store is touched, so the value is left unchanged and on_write callbacks don't fire. register_type defaults to "holding" (use "coil" for coil writes — the two tables are independent); pass None to clear.

async def test_write_rejected(mock_modbus_unit):
    mock_modbus_unit.holding[40] = 7
    mock_modbus_unit.fail_write(40, ModbusExceptionError(3))  # illegal data value
    with pytest.raises(ModbusExceptionError):
        await mock_modbus_unit.write_register(40, 99)
    assert await mock_modbus_unit.read_holding_registers(40, 1) == [7]  # unchanged

    mock_modbus_unit.fail_write(40, None)                     # clear it
    await mock_modbus_unit.write_register(40, 99)             # now succeeds

To simulate a read failing, give the register a callable that raises — it's evaluated on every read:

def boom():
    raise ModbusExceptionError(2)       # illegal data address
mock_modbus_unit.holding[9] = boom

Fixtures: mock_modbus_connection (a MockModbusConnection) and mock_modbus_unit (its unit 1). MockModbusConnection / MockModbusUnit are also importable from modbus_connection.mock for direct construction.

Develop

uv sync --extra pymodbus
uv run pytest

Formatting/linting is ruff, enforced in CI. Install the commit hook with prek so code is formatted on commit:

uvx prek install          # set up the git hook
uvx prek run --all-files  # format + lint everything now

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

modbus_connection-3.3.0.tar.gz (75.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

modbus_connection-3.3.0-py3-none-any.whl (63.6 kB view details)

Uploaded Python 3

File details

Details for the file modbus_connection-3.3.0.tar.gz.

File metadata

  • Download URL: modbus_connection-3.3.0.tar.gz
  • Upload date:
  • Size: 75.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for modbus_connection-3.3.0.tar.gz
Algorithm Hash digest
SHA256 b0836b832531606407d5b965715644a7526a9d9537d4b90ac16867dfc79a8604
MD5 93eb0552f6e807ee94300b81ed4b5985
BLAKE2b-256 0a5ea4f1bf099c480bd030815bdbc61a9677f573df49828aa62f4f711d26d30e

See more details on using hashes here.

Provenance

The following attestation bundles were made for modbus_connection-3.3.0.tar.gz:

Publisher: publish.yml on home-assistant-libs/modbus-connection

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file modbus_connection-3.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for modbus_connection-3.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 134f4a8599f2dd1ff6c4bc77913ebcdec1eeb3d463ca4845e55f2a6440018ee5
MD5 09208a07274492e3655e29c979f797e6
BLAKE2b-256 68f48e1a03c9ba90fea52a8ccd5b1fdf0219cc020636d8fff48fb6e971aeb23d

See more details on using hashes here.

Provenance

The following attestation bundles were made for modbus_connection-3.3.0-py3-none-any.whl:

Publisher: publish.yml on home-assistant-libs/modbus-connection

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page