Source code for do_mpc.opcua._server

#
#   This file is part of do-mpc
#
#   do-mpc: An environment for the easy, modular and efficient implementation of
#        robust nonlinear model predictive control
#
#   Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
#                        TU Dortmund. All rights reserved
#
#   do-mpc is free software: you can redistribute it and/or modify
#   it under the terms of the GNU Lesser General Public License as
#   published by the Free Software Foundation, either version 3
#   of the License, or (at your option) any later version.
#
#   do-mpc is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU Lesser General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with do-mpc.  If not, see <http://www.gnu.org/licenses/>.

import time
from casadi import *
from ._base import RTBase
from ._helper import NamespaceEntry, ServerOpts
from ._client import RTClient


try:
    import asyncua.sync as opcua
except ImportError:
    raise ImportError("The asyncua library is not installed. Please install it and try again.")



[docs] class RTServer: ''' Real Time Server. The RTServer class extends do-mpc with an easy to setup opcua server. **Configuration and setup:** Configuring and setting up the RTServer client involves the following steps: 1. Use :py:class:`do_mpc.opcua.ServerOpts` dataclass to specify server name as well as IP adress and port for the server. 2. Initiate the RTServer class with the ServerOpts dataclass. 3. Use the :py:meth:`namespace_from_client` to automatically generate a namespace from a :py:class:`do_mpc.opcua.RTBase` instance (optional). 4. Start the OPC UA server by calling :py:meth:`start` Note: Remember to properly stop the server afterwards using the :py:meth:`stop` method. Args: opts : Server options. ''' def __init__(self, opts:ServerOpts)->None: # The basic OPCUA server definition contains a name, address and a port numer self.name = opts.name self.address = opts.address self.port = opts.port # Try to create the server try: self.opcua_server = opcua.Server() print("A server named - {} - was created".format(self.name)) except RuntimeError: self.created = False print("Server could not be created. Check your opcua module installation!") return False def namespace_from_client(self, client:RTClient)->None: ''' Takes an instance of :py:class:`do_mpc.opcua.RTBase` as input and registers an OPC UA namespace for the namespace stored in the RTBase class. Args: client : A client with a stored namespace. ''' # get namespace from client client_namespace = client.client.namespace self.object_node_dict = {} # register a new namespace on the OPC UA server idx = self.opcua_server.register_namespace(client_namespace.namespace_name) # iterate through client namespace for namespace_entry in client_namespace.entry_list: # check for every object node if it was already registered if namespace_entry.objectnode not in self.object_node_dict: # if not, register object node object = self.opcua_server.nodes.objects.add_object(idx, namespace_entry.objectnode) self.object_node_dict[namespace_entry.objectnode] = object # add variable to object node self.add_variable_to_node(namespace_entry, idx) # write namespace index to client namespace for clients read/write methods client.client.add_namespace_url(idx) namespace_array = self.opcua_server.get_namespace_array()[2:] print(f"The following namespaces are registered: {namespace_array}") # Add variables to a registered node def add_variable_to_node(self, namespace_entry:NamespaceEntry, namespace_url:int)->None: ''' Adds a variable to a registered node on the OPC UA server. Args: namespace_entry : A OPCUA node ID. Contains the variable as well as the target node name. namespace_url : The namespace index identifying the namespace on the OPC UA server ''' # create unique and descriptive variable name variable_name = f'{namespace_entry.objectnode}{namespace_entry.variable}' # add variable to object node datavector = self.object_node_dict[namespace_entry.objectnode].add_variable( opcua.ua.NodeId(variable_name, namespace_url), variable_name, [0.0] ) datavector.set_writable() namespace_entry.variable = variable_name # Start server def start(self)->None: ''' Starts the OPC UA server. ''' try: self.opcua_server.start() except RuntimeError as err: print("The server "+ self.name +" could not be started, returned error message :\n", err) # Stop server def stop(self)->None: ''' Stops the OPC UA server ''' try: self.opcua_server.stop() print("The server "+ self.name +" was stopped successfully @ ",time.strftime('%Y-%m-%d %H:%M %Z', time.localtime())) except RuntimeError as err: print("The server could not be stopped, returned error message :\n", err)