Source code for do_mpc.opcua._base
#
# This file is part of do-mpc
#
# do-mpc: An environment for the easy, modular and efficient implementation of
# robust nonlinear model predictive control
#
# Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
# TU Dortmund. All rights reserved
#
# do-mpc is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# do-mpc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with do-mpc. If not, see <http://www.gnu.org/licenses/>.
import time
import numpy as np
from typing import List
from threading import Timer, Thread
from enum import Enum, auto
from casadi import *
from ._client import RTClient
from ._helper import Namespace, NamespaceEntry, ClientOpts
import casadi.tools as ctools
from ..model import Model
[docs]
class RTBase:
''' Real Time Base.
The RTBase class extends do-mpc with an OPC UA interface.
Note:
The :py:class:`do_mpc.estimator.MHE` class is currently not supported.
Use this class to configure an OPC UA client for a previously initiated do-mpc class e.g.. :py:class:`do_mpc.controller.MPC` or :py:class:`do_mpc.simulator.Simulator`.
**Configuration and setup:**
Configuring and setting up the RTBase class involves the following steps:
1. Use :py:class:`do_mpc.opcua.ClientOpts` dataclass to specify client name as well as IP adress and port number of the target server.
2. Initiate the RTBase class with a do-mpc object and the dataclass :py:class:`do_mpc.opcua.ClientOpts`.
3. Use :py:meth:`set_write_tags` and :py:meth:`set_read_tags` to take over the namespace tags (node IDs) from another instance of RTBase (optional).
Note:
Use :py:meth:`set_write_tags` and :py:meth:`set_read_tags` only after registering all clients with the :py:meth:`do_mpc.opcua.RTServer.namespace_from_client` method.
4. Use :py:meth:`connect` to connect the client to the OPC UA server.
4. Use :py:meth:`write_to_tags` to write initial values to the OPC UA server.
5. Use :py:meth:`async_step_start` to run the do-mpc method :py:meth:`do_mpc.controller.MPC.make_step`.
Args:
do_mpc_object : An instance of a do-mpc class.
clientOpts : Client Options
namespace : Namespace containing OPC UA node IDs
'''
def __init__(self, do_mpc_object, clientOpts: ClientOpts, namespace:Namespace=None)->None:
self.do_mpc_object = do_mpc_object
if namespace == None:
self.get_default_namespace(clientOpts.name)
else:
self.namespace = namespace
self.cycle_time = do_mpc_object.settings.t_step*clientOpts.timeunit
self.client = RTClient(clientOpts, self.namespace)
self.tagout = []
self.tagin = []
self.is_running = False
self.new_init = True
self.async_fag = False
def namespace_from_model(self, model:Model, model_name:str)->Namespace:
"""
Create a OPC UA namespace from the provided model.
Args:
model : A do-mpc model.
model_name : Name given to the generated namespace.
Returns:
Namespace generated from the OPC UA model.
"""
node_list = []
variable_list = ['aux', 'p', 'tvp', 'u', 'v', 'w', 'x', 'y', 'z']
for var in variable_list:
for key in model[var].labels():
if 'default' in key:
continue
node_list.append(NamespaceEntry(var, key))
return Namespace(model_name, node_list)
def get_default_namespace(self, namespace_name:str)->None:
'''
Sets default namespace using :py:meth:`namespace_from_model`.
Args:
namespace_name : Name given to the generated namespace
'''
self.namespace = self.namespace_from_model(self.do_mpc_object.model, namespace_name)
def connect(self)->None:
'''
Connects client to the server.
'''
try:
self.client.connect()
except RuntimeError:
self.enabled = False
def disconnect(self)->None:
'''
Disconnects client from the server.
'''
try:
self.client.disconnect()
except RuntimeError:
print("The real-time controller could not be stopped due to server issues. Please stop the client manually and delete the object!")
def set_write_tags(self, tagout:List[str])->None:
'''
Set tags (node IDs) to write to. The provided tags must match the node IDs registered on the taget server.
Args:
tagout : A list of node IDs to which the client writes.
'''
self.tagout = tagout
def set_read_tags(self, tagin:List[str])->None:
'''
Set tags (node IDs) to read from. The provided tags must match the node IDs registered on the taget server.
Args:
tagin : A list of node IDs from which the client reads.
'''
self.tagin = tagin
def make_step(self)->None:
'''
Calls the do-mpc make_step method e.g.. :py:meth:`do_mpc.controller.MPC.make_step`.
The input for make_step is taken from node IDs specified in :py:meth:`read_from_tags`.
The output of make_step is written to the node IDs specified in :py:meth:`write_to_tags`.
'''
input = self.read_from_tags()
output = self.do_mpc_object.make_step(input)
self.write_to_tags(output)
def write_to_tags(self, data:np.ndarray)->None:
'''
Write to the node IDs specified in :py:meth:`write_to_tags`
Args:
data : data which is written to server.
'''
if isinstance(data, ctools.structure3.DMStruct):
data = data.cat.full().flatten()
elif isinstance(data, ctools.DM):
data = data.full().flatten()
elif isinstance(data, np.ndarray):
data = data.flatten()
else:
raise TypeError(f'Unsupported dtype:{type(data)}')
if data.size != len(self.tagout):
raise Exception(f'Trying to write {len(data)} elements to {len(self.tagout)}')
for tag, value in zip(self.tagout, data):
self.client.writeData(tag, [value])
def read_from_tags(self)->np.ndarray:
'''
Read from the node IDs specified in :py:meth:`read_from_tags`.
Returns:
Values stored on the OPC UA server.
'''
return np.array([self.client.readData(i) for i in self.tagin]).reshape(-1,1)
def async_run(self)->None:
'''
This method is called inside of :py:meth:`async_step_start`. It calls :py:meth:`make_step` and :py:meth:`async_step_start`.
'''
self.is_running = False
self.async_step_start()
self.make_step()
def async_step_start(self)->None:
'''
This method calls the :py:meth:`async_run` method in a frequency given by the do-mpc classes t_step value.
'''
if self.new_init == True:
self.new_thread = Thread(target=self.make_step)
self.new_thread.start()
self.new_init = False
if not self.is_running:
self.cycle = time.time() + self.cycle_time
self.thread = Timer(self.cycle - time.time(), self.async_run)
self.thread.start()
self.is_running = True
if self.async_fag == True:
self.async_fag = False
return print('Async operation was interrupted by the user')
def async_step_stop(self)->None:
'''
Stops :py:meth:`async_step_start` from running.
'''
self.thread.cancel()
self.is_running = False
self.new_init = True
self.async_fag = True