Skip to main content

PLC Simulation and Remote Control

Project description

SDPLC

SDPLC is a python library that is able to simulate a Modbus server, OPC UA server and a HTTPS server at the same time.

The Modbus and OPC UA server is compatable with standard clients and most SCADA systems. You could use SDPLC as a simulated PLC for developping and testing.

The HTTPS server allows RESTful API calls to retrive the available variables/registers, as well as read and write operations.

Addtionally, you can use the "Akatosh" library (pre-bundled) to create logic that mimic a real PLC that has been programmed.

Thanks "asyncua" and "pymodbus" for the implementation of OPC UA and Modbus protocols.

Update

1.0.0

Initial release.

1.0.1

  1. Implemented configuration file validation
  2. Rewrite value sync logic

1.0.2

This version will include the capability to connect with down stream ModBus/OPC UA server, so that your logic written with Akatosh will be able to actually control real PLCs!

1.0.3

  1. Bug fixes

Examples

The following example shows how to run a simulated tank system. Then, a controller will be implemented to control the tank level.

Simulated PLC:

from math import inf

import uvicorn
from Akatosh.event import event
from Akatosh.universe import Mundus
from FasterAPI.app import app
from asyncua import ua
from sdplc import logger
from sdplc.sdplc import simPLC
from sdplc.router import sim_plc_router
from FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate

app.include_router(sim_plc_router)

time = 0
@event(at=0, step=1, till=inf, label="Tank Level Sensor", priority=2)
async def tank_level_sensor():
    current_tank_level = await simPLC.read_node("Tank Level")
    variable = [
        variable for variable in simPLC.nodes if variable.qualified_name == "Tank Level"
    ][0]
    logger.info(f"Tank Level Sensor: {current_tank_level}")


@event(at=0, till=inf, label="Tank Level Simulation", priority=2)
async def sim_tank_level():
    global time
    eclpsed_time = Mundus.time - time
    inlet = await simPLC.read_node("Inlet Valve")
    outlet = await simPLC.read_node("Outlet Valve")
    current_tank_level = await simPLC.read_node("Tank Level")
    if inlet is True:
        current_tank_level += 10 * eclpsed_time
    if outlet is True:
        current_tank_level -= 5 * eclpsed_time
    await simPLC.write_node("Tank Level", current_tank_level)
    time = Mundus.time

if __name__ == "__main__":
    simPLC.init(
        config_file="./example_plc.yaml",
    )
    uvicorn.run("FasterAPI.app:app", host="0.0.0.0", port=8080)
server: "ModBus"
  
modbus_server_config:
  type: "udp"
  address: 0.0.0.0
  port: 1502
  byte_order: "big"
  word_order: "big"

nodes:
  - qualified_name: "Inlet Valve"
    value: false
    modbus:
      slave: 0
      address: 0
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Outlet Valve"
    value: false
    modbus:
      slave: 0
      address: 1
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Tank Level"
    value: 0.0
    modbus:
      slave: 0
      address: 0
      type: "i"
      register_size: 64
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Blender"
    value: false
    modbus:
      slave: 0
      address: 2
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

Controller:

import socket
from math import inf

import uvicorn
from Akatosh.event import event
from Akatosh.universe import Mundus
from FasterAPI.app import app
from asyncua import ua
from sdplc import logger
from sdplc.sdplc import simPLC
from sdplc.router import sim_plc_router
from FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate

app.include_router(sim_plc_router)

time = 0


ca, ca_crt = generate_root_ca(
    common_name=socket.gethostname(),
    subject_alternative_names=[socket.gethostname()],
    directory="./",
)
server_key, server_csr = generate_key_and_csr(
    common_name=socket.gethostname(),
    san_dns_names=[socket.gethostname()],
    san_uris=["uri:ulfaric:SDPLC"],
    directory="./",
)
sign_certificate(csr=server_csr, issuer_key=ca, issuer_cert=ca_crt, directory="./")


@event(at=0, till=inf, label="Valve Control", priority=2)
async def inlet_control():
    current_tank_level = await simPLC.read_node("Tank Level")
    if current_tank_level <= 0:
        inlet_state = await simPLC.read_node("Inlet Valve")
        if inlet_state == False:
            await simPLC.write_node("Inlet Valve", True)
            await simPLC.write_node("Outlet Valve", False)
            logger.info(
                "Tank level reached lower threshold, closing outlet valve and opening inlet valve"
            )

    if current_tank_level >= 50 and current_tank_level < 100:
        inlet_state = await simPLC.read_node("Inlet Valve")
        outlet_state = await simPLC.read_node("Outlet Valve")
        if outlet_state == False:
            await simPLC.write_node("Outlet Valve", True)
            logger.info("Tank level reached high threshold, opening both valves")
        if inlet_state == False:
            await simPLC.write_node("Inlet Valve", True)
            logger.info("Tank level reached high threshold, opening both valves")

    if current_tank_level >= 150:
        inlet_state = await simPLC.read_node("Inlet Valve")
        if inlet_state == True:
            await simPLC.write_node("Inlet Valve", False)
            await simPLC.write_node("Outlet Valve", True)
            logger.info(
                "Tank level reached critical threshold, closing inlet valve and opening outlet valve"
            )


@event(at=0, step=1, till=inf, label="Tank Level Sensor", priority=2)
async def tank_level_sensor():
    current_tank_level = await simPLC.read_node("Tank Level")
    variable = [
        variable for variable in simPLC.nodes if variable.qualified_name == "Tank Level"
    ][0]
    logger.info(f"Tank Level Sensor: {current_tank_level}")


@event(at=0, till=inf, label="Blender", priority=2)
async def blender():
    current_tank_level = await simPLC.read_node("Tank Level")
    if current_tank_level >= 100:
        await simPLC.write_node("Blender", True)
    else:
        await simPLC.write_node("Blender", False)


if __name__ == "__main__":
    simPLC.init(
        config_file="./example_controller.yaml",
    )
    uvicorn.run("FasterAPI.app:app", host="0.0.0.0", port=8088)
client: "ModBus"
  
modbus_client_config:
  type: "udp"
  address: 127.0.0.1
  port: 1502
  byte_order: "big"
  word_order: "big"


nodes:
  - qualified_name: "Inlet Valve"
    value: false
    modbus:
      slave: 0
      address: 0
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Outlet Valve"
    value: true
    modbus:
      slave: 0
      address: 1
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Tank Level"
    value: 0.0
    modbus:
      slave: 0
      address: 0
      type: "i"
      register_size: 64
    opcua:
      namespace: "0"
      node_qualified_name: "0"

  - qualified_name: "Blender"
    value: false
    modbus:
      slave: 0
      address: 2
      type: "c"
    opcua:
      namespace: "0"
      node_qualified_name: "0"

Modbus

You can also just simulate a Modbus server. In this mode, you will have to add registers manually.

from sdplc.modbus.server import modbusServer

modbusServer.create_slave(0)

for i in range(0, 10):
    modbusServer.create_coil(0, i, False)

for i in range(0, 10):
    modbusServer.create_discrete_input(0, i, False)

modbusServer.create_holding_register(0, 0, 0, 64)
modbusServer.create_input_register(0, 0, 1000, 64)

modbusServer.start()

OPC UA

You can also just simulate a OPC UA server. You will also need to manually create namespace, node and variables.

from sdplc.opcua.server import opcuaServer

opcuaServer.init()

namespace = "http://example.org"
opcuaServer.register_namespace(namespace)
node = opcuaServer.register_node("Tank", namespace)
opcuaServer.register_variable("Tank Level", node=node, writeable=True, value=0)
opcuaServer.register_variable(
    "Tank Temperature", node_qualified_name="Tank", writeable=True, value=0
)

opcuaServer.start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sdplc-1.0.7.tar.gz (222.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sdplc-1.0.7-py3-none-any.whl (233.4 kB view details)

Uploaded Python 3

File details

Details for the file sdplc-1.0.7.tar.gz.

File metadata

  • Download URL: sdplc-1.0.7.tar.gz
  • Upload date:
  • Size: 222.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.10.12 Linux/5.15.0-126-generic

File hashes

Hashes for sdplc-1.0.7.tar.gz
Algorithm Hash digest
SHA256 da546fd24d53f3cf8291155551afa6c052dbc634434235c7f7f9501d529d7f94
MD5 dd1dae0ed45288bfa995fd1edf82cccf
BLAKE2b-256 162543b9ae3c5fcb1b710860f2273127d5d6936133e8572829208fbe7fc8d2c7

See more details on using hashes here.

File details

Details for the file sdplc-1.0.7-py3-none-any.whl.

File metadata

  • Download URL: sdplc-1.0.7-py3-none-any.whl
  • Upload date:
  • Size: 233.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.10.12 Linux/5.15.0-126-generic

File hashes

Hashes for sdplc-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 ad9d07568806af5f92cb7b7141aef3bd0f2c3ebed39cc96285e7188f1f37e323
MD5 c477479739113575eb8607732faac005
BLAKE2b-256 5b38c5dc51910717e7d4f6060a96064454313db1296baa2670f671be98b84cf0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page