Source code for do_mpc.optimizer

#
#   This file is part of do-mpc
#
#   do-mpc: An environment for the easy, modular and efficient implementation of
#        robust nonlinear model predictive control
#
#   Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
#                        TU Dortmund. All rights reserved
#
#   do-mpc is free software: you can redistribute it and/or modify
#   it under the terms of the GNU Lesser General Public License as
#   published by the Free Software Foundation, either version 3
#   of the License, or (at your option) any later version.
#
#   do-mpc is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU Lesser General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with do-mpc.  If not, see <http://www.gnu.org/licenses/>.

"""
Shared tools for optimization-based estimation (MHE) and control (MPC).
"""
import numpy as np
import casadi.tools as castools
import pdb
import do_mpc
from typing import Union,Callable
import os
import subprocess

[docs] class Optimizer: """The base clase for the optimization based state estimation (MHE) and predictive controller (MPC). This class establishes the jointly used attributes, methods and properties. Warnings: The ``Optimizer`` base class can not be used independently. The methods and properties are inherited to :py:class:`do_mpc.estimator.MHE` and :py:class:`do_mpc.controller.MPC`. """ def __init__(self): assert 'model' in self.__dict__.keys(), 'Cannot initialize the optimizer before assigning the model to the current class instance.' # Initialize structures for bounds, scaling, initial values by calling the symbolic structures defined in the model # with the default numerical value. # This returns an identical numerical structure with all values set to the passed value. self.flags = { 'prepare_nlp': False, 'setup': False, 'initial_run': False, } self._x_lb = self.model._x(-np.inf) self._x_ub = self.model._x(np.inf) self._u_lb = self.model._u(-np.inf) self._u_ub = self.model._u(np.inf) self._z_lb = self.model._z(-np.inf) self._z_ub = self.model._z(np.inf) self._x_scaling = self.model._x(1.0) self._u_scaling = self.model._u(1.0) self._z_scaling = self.model._z(1.0) self._p_scaling = self.model._p(1.0) # only meaningful for MHE. # Dummy variables for bounds of all optimization variables self._lb_opt_x = None self._ub_opt_x = None # Lists for further non-linear constraints (optional). Constraints are formulated as cons < ub self.nl_cons_list = [ {'expr_name': 'default', 'expr': castools.DM([]), 'ub': castools.DM([])} ] self.slack_vars_list = [ {'slack_name': 'default', 'shape':0, 'ub': castools.DM([]), 'penalty': 0} ] self.slack_cost = 0 @property def nlp_obj(self): """Query and modify (symbolically) the NLP objective function. Use the variables in :py:attr:`opt_x` and :py:attr:`opt_p`. It is advised to add to the current objective, e.g.: :: mpc.prepare_nlp() # Modify the objective mpc.nlp_obj += sum1(vertcat(*mpc.opt_x['_x', -1, 0])**2) # Finish creating the NLP mpc.create_nlp() See the documentation of :py:attr:`opt_x` and :py:attr:`opt_p` on how to query these attributes. Warnings: This is a VERY low level feature and should be used with extreme caution. It is easy to break the code. Be especially careful NOT to accidentially overwrite the default objective. Note: Modifications must be done after calling :py:meth:`prepare_nlp` and before calling :py:meth:`create_nlp` """ assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling MPC.prepare_nlp or MPC.setup' return self._nlp_obj @nlp_obj.setter def nlp_obj(self, val): assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling MPC.prepare_nlp or MPC.setup' assert not self.flags['setup'], 'Cannot change attribute after calling MPC.create_nlp or MPC.setup' self._nlp_obj = val @property def nlp_cons(self): """Query and modify (symbolically) the NLP constraints. Use the variables in :py:attr:`opt_x` and :py:attr:`opt_p`. Prior to calling :py:meth:`create_nlp` this attribute returns a list of symbolic constraints. After calling :py:meth:`create_nlp` this attribute returns the concatenation of this list and the attribute cannot be altered anymore. It is advised to append to the current list of :py:attr:`nlp_cons`: :: mpc.prepare_nlp() # Create new constraint: Input at timestep 0 and 1 must be identical. extra_cons = mpc.opt_x['_u', 0, 0]-mpc.opt_x['_u',1, 0] mpc.nlp_cons.append( extra_cons ) # Create appropriate upper and lower bound (here they are both 0 to create an equality constraint) mpc.nlp_cons_lb.append(np.zeros(extra_cons.shape)) mpc.nlp_cons_ub.append(np.zeros(extra_cons.shape)) mpc.create_nlp() See the documentation of :py:attr:`opt_x` and :py:attr:`opt_p` on how to query these attributes. Warnings: This is a VERY low level feature and should be used with extreme caution. It is easy to break the code. Be especially careful NOT to accidentially overwrite the default objective. Note: Modifications must be done after calling :py:meth:`prepare_nlp` and before calling :py:meth:`create_nlp` """ assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling MPC.prepare_nlp or MPC.setup' return self._nlp_cons @nlp_cons.setter def nlp_cons(self, val): assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling prepare_nlp or setup' assert not self.flags['setup'], 'Cannot change attribute after calling create_nlp or setup' self._nlp_cons = val @property def nlp_cons_lb(self): """Query and modify the lower bounds of the :py:attr:`nlp_cons`. Prior to calling :py:meth:`create_nlp` this attribute returns a list of lower bounds matching the list of constraints obtained with :py:attr:`nlp_cons`. After calling :py:meth:`create_nlp` this attribute returns the concatenation of this list. Values for lower (and upper) bounds MUST be added when adding new constraints to :py:attr:`nlp_cons`. Warnings: This is a VERY low level feature and should be used with extreme caution. It is easy to break the code. Note: Modifications must be done after calling :py:meth:`prepare_nlp` """ assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling MPC.prepare_nlp or MPC.setup' return self._nlp_cons_lb @nlp_cons_lb.setter def nlp_cons_lb(self, val): assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling prepare_nlp or setup' self._nlp_cons_lb = val @property def nlp_cons_ub(self): """Query and modify the upper bounds of the :py:attr:`nlp_cons`. Prior to calling :py:meth:`create_nlp` this attribute returns a list of upper bounds matching the list of constraints obtained with :py:attr:`nlp_cons`. After calling :py:meth:`create_nlp` this attribute returns the concatenation of this list. Values for upper (and lower) bounds MUST be added when adding new constraints to :py:attr:`nlp_cons`. Warnings: This is a VERY low level feature and should be used with extreme caution. It is easy to break the code. Note: Modifications must be done after calling :py:meth:`prepare_nlp` """ assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling MPC.prepare_nlp or MPC.setup' return self._nlp_cons_ub @nlp_cons_ub.setter def nlp_cons_ub(self, val): assert self.flags['prepare_nlp'], 'Cannot query attribute prior to calling prepare_nlp or setup' self._nlp_cons_ub = val @do_mpc.tools.IndexedProperty def lb_opt_x(self, ind): """Query and modify the lower bounds of all optimization variables :py:attr:`opt_x`. This is a more advanced method of setting bounds on optimization variables of the MPC/MHE problem. Users with less experience are advised to use :py:attr:`bounds` instead. The attribute returns a nested structure that can be indexed using powerindexing. Please refer to :py:attr:`opt_x` for more details. Note: The attribute automatically considers the scaling variables when setting the bounds. See :py:attr:`scaling` for more details. Note: Modifications must be done after calling :py:meth:`prepare_nlp` or :py:meth:`setup` respectively. """ return self._lb_opt_x[ind] @lb_opt_x.setter def lb_opt_x(self, ind, val): self._lb_opt_x[ind] = val # Get canonical index cind = self._lb_opt_x.f[ind] # Modify the newly set values by considering the scaling variables. This requires the canonical index. self._lb_opt_x.master[cind] = self._lb_opt_x.master[cind]/self.opt_x_scaling.master[cind] @do_mpc.tools.IndexedProperty def ub_opt_x(self, ind): """Query and modify the lower bounds of all optimization variables :py:attr:`opt_x`. This is a more advanced method of setting bounds on optimization variables of the MPC/MHE problem. Users with less experience are advised to use :py:attr:`bounds` instead. The attribute returns a nested structure that can be indexed using powerindexing. Please refer to :py:attr:`opt_x` for more details. Note: The attribute automatically considers the scaling variables when setting the bounds. See :py:attr:`scaling` for more details. Note: Modifications must be done after calling :py:meth:`prepare_nlp` or :py:meth:`setup` respectively. """ return self._ub_opt_x[ind] @ub_opt_x.setter def ub_opt_x(self, ind, val): self._ub_opt_x[ind] = val # Get canonical index cind = self._ub_opt_x.f[ind] # Modify the newly set values by considering the scaling variables. This requires the canonical index. self._ub_opt_x.master[cind] = self._ub_opt_x.master[cind]/self.opt_x_scaling.master[cind] @do_mpc.tools.IndexedProperty def bounds(self, ind): """Query and set bounds of the optimization variables. The :py:func:`bounds` method is an indexed property, meaning getting and setting this property requires an index and calls this function. The power index (elements are separated by commas) must contain atleast the following elements: ====== ================= ========================================================== order index name valid options ====== ================= ========================================================== 1 bound type ``lower`` and ``upper`` 2 variable type ``_x``, ``_u`` and ``_z`` (and ``_p_est`` for MHE) 3 variable name Names defined in :py:class:`do_mpc.model.Model`. ====== ================= ========================================================== Further indices are possible (but not neccessary) when the referenced variable is a vector or matrix. **Example**: :: # Set with: optimizer.bounds['lower','_x', 'phi_1'] = -2*np.pi optimizer.bounds['upper','_x', 'phi_1'] = 2*np.pi # Query with: optimizer.bounds['lower','_x', 'phi_1'] """ assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).' assert len(ind)>=3, 'Power index must include bound_type, var_type, var_name (as a tuple).' bound_type = ind[0] var_type = ind[1] var_name = ind[2:] err_msg = 'Invalid power index {} for bound_type. Must be from (lower, upper).' assert bound_type in ('lower', 'upper'), err_msg.format(bound_type) err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).' assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type) if bound_type == 'lower': query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='lb') elif bound_type == 'upper': query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='ub') # query results string e.g. _x_lb, _x_ub, _u_lb, u_ub .... # Get the desired struct: var_struct = getattr(self, query) err_msg = 'Calling .bounds with {} is not valid. Possible keys are {}.' assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), err_msg.format(ind, var_struct.keys()) return var_struct[var_name] @bounds.setter def bounds(self, ind, val): """See Docstring for bounds getter method""" assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).' assert len(ind)>=3, 'Power index must include bound_type, var_type, var_name (as a tuple).' bound_type = ind[0] var_type = ind[1] var_name = ind[2:] err_msg = 'Invalid power index {} for bound_type. Must be from (lower, upper).' assert bound_type in ('lower', 'upper'), err_msg.format(bound_type) err_msg = 'Invalid power index {} for var_type. Must be from (_x, _u, _z, _p_est).' assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type) if bound_type == 'lower': query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='lb') elif bound_type == 'upper': query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='ub') # query results string e.g. _x_lb, _x_ub, _u_lb, u_ub .... # Get the desired struct: var_struct = getattr(self, query) err_msg = 'Calling .bounds with {} is not valid. Possible keys are {}.' assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), err_msg.format(ind, var_struct.keys()) # Set value on struct: var_struct[var_name] = val # Update bounds of optimization variables, if the problem is already created: if self.flags['prepare_nlp']: self._update_bounds() @do_mpc.tools.IndexedProperty def scaling(self, ind): """Query and set scaling of the optimization variables. The :py:func:`Optimizer.scaling` method is an indexed property, meaning getting and setting this property requires an index and calls this function. The power index (elements are seperated by comas) must contain atleast the following elements: ====== ================= ========================================================== order index name valid options ====== ================= ========================================================== 1 variable type ``_x``, ``_u`` and ``_z`` (and ``_p_est`` for MHE) 2 variable name Names defined in :py:class:`do_mpc.model.Model`. ====== ================= ========================================================== Further indices are possible (but not neccessary) when the referenced variable is a vector or matrix. **Example**: :: # Set with: optimizer.scaling['_x', 'phi_1'] = 2 optimizer.scaling['_x', 'phi_2'] = 2 # Query with: optimizer.scaling['_x', 'phi_1'] Scaling factors :math:`a` affect the MHE / MPC optimization problem. The optimization variables are scaled variables: .. math:: \\bar\\phi = \\frac{\\phi}{a_{\\phi}} \\quad \\forall \\phi \\in [x, u, z, p_{\\text{est}}] Scaled variables are used to formulate the bounds :math:`\\bar\\phi_{lb} \\leq \\bar\\phi_{ub}` and for the evaluation of the ODE. For the objective function and the nonlinear constraints the unscaled variables are used. The algebraic equations are also not scaled. Note: Scaling the optimization problem is suggested when states and / or inputs take on values which differ by orders of magnitude. """ assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).' assert len(ind)>=2, 'Power index must include bound_type, var_type, var_name (as a tuple).' var_type = ind[0] var_name = ind[1:] err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).' assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type) query = '{var_type}_scaling'.format(var_type=var_type) # query results string e.g. _x_scaling, _u_scaling # Get the desired struct: var_struct = getattr(self, query) err_msg = 'Calling .scaling with {} is not valid. Possible keys are {}.' assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), err_msg.format(ind, var_struct.keys()) return var_struct[var_name] @scaling.setter def scaling(self, ind, val): """See Docstring for scaling getter method""" assert not self.flags['setup'], 'Scaling can only be set before the optimization problem is created.' assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).' assert len(ind)>=2, 'Power index must include bound_type, var_type, var_name (as a tuple).' var_type = ind[0] var_name = ind[1:] err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).' assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type) query = '{var_type}_scaling'.format(var_type=var_type) # query results string e.g. _x_scaling, _u_scaling # Get the desired struct: var_struct = getattr(self, query) err_msg = 'Calling .scaling with {} is not valid. Possible keys are {}.' assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), err_msg.format(ind, var_struct.keys()) var_struct[var_name] = val def reset_history(self)->None: """Reset the history of the optimizer. All data from the :py:class:`do_mpc.data.Data` instance is removed. """ self.data.init_storage() self._t0 = np.array([0]) def _prepare_data(self): """Write optimizer meta data to data object (all params set in self.data_fields). If selected, initialize the container for the full solution of the optimizer. """ self.data.data_fields.update({'_eps': self.n_eps}) self.data.data_fields.update({'opt_p_num': self.n_opt_p}) self.data.opt_p = self.opt_p if self.settings.store_full_solution == True: # Create data_field for the optimal solution. self.data.data_fields.update({'_opt_x_num': self.n_opt_x}) self.data.data_fields.update({'_opt_aux_num': self.n_opt_aux}) self.data.opt_x = self.opt_x # aux_struct is the struct_symSX variant of opt_aux (which is struct_SX). struct_SX cannot be unpickled (bug). # See: https://groups.google.com/forum/#!topic/casadi-users/dqAb4tnA2ik self.data.opt_aux = self.aux_struct if self.settings.store_lagr_multiplier == True: # Create data_field for the lagrange multipliers self.data.data_fields.update({'_lam_g_num': self.n_opt_lagr}) if len(self.settings.store_solver_stats) > 0: # These are valid arguments for solver stats: solver_stats = ['iter_count', 'iterations', 'n_call_S', 'n_call_callback_fun', 'n_call_nlp_f', 'n_call_nlp_g', 'n_call_nlp_grad', 'n_call_nlp_grad_f', 'n_call_nlp_hess_l', 'n_call_nlp_jac_g', 'return_status', 'success', 't_proc_S', 't_proc_callback_fun', 't_proc_nlp_f', 't_proc_nlp_g', 't_proc_nlp_grad', 't_proc_nlp_grad_f', 't_proc_nlp_hess_l', 't_proc_nlp_jac_g', 't_wall_total', 't_wall_callback_fun', 't_wall_nlp_f', 't_wall_nlp_g', 't_wall_nlp_grad', 't_wall_nlp_grad_f', 't_wall_nlp_hess_l', 't_wall_nlp_jac_g'] # Create data_field(s) for the recorded (valid) stats. for stat_i in self.settings.store_solver_stats: assert stat_i in solver_stats, 'The requested {} is not a valid solver stat and cannot be recorded. Please supply one of the following (or none): {}'.format(stat_i, solver_stats) self.data.data_fields.update({stat_i: 1}) self.data.init_storage() def set_nl_cons(self, expr_name:str, expr:Union[castools.SX,castools.MX], ub:float=np.inf, soft_constraint:bool=False, penalty_term_cons:int=1, maximum_violation:float=np.inf)->Union[castools.SX,castools.MX]: """Introduce new constraint to the class. Further constraints are optional. Expressions must be formulated with respect to ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``. They are implemented as: .. math:: m(x,u,z,p_{\\text{tv}}, p) \\leq m_{\\text{ub}} Setting the flag ``soft_constraint=True`` will introduce slack variables :math:`\\epsilon`, such that: .. math:: m(x,u,z,p_{\\text{tv}}, p)-\\epsilon &\\leq m_{\\text{ub}},\\\\ 0 &\\leq \\epsilon \\leq \\epsilon_{\\text{max}}, Slack variables are added to the cost function and multiplied with the supplied penalty term. This formulation makes constraints soft, meaning that a certain violation is tolerated and does not lead to infeasibility. Typically, high values for the penalty are suggested to avoid significant violation of the constraints. Args: expr_name: Arbitrary name for the given expression. Names are used for key word indexing. expr: CasADi SX or MX function depending on ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``. ub: Upper bound soft_constraint: Flag to enable soft constraint penalty_term_cons: Penalty term constant maximum_violation: Maximum violation Raises: assertion: expr_name must be str assertion: expr must be a casadi SX or MX type Returns: Returns the newly created expression. Expression can be used e.g. for the RHS. """ assert self.flags['setup'] == False, 'Cannot call .set_expression after .setup().' assert isinstance(expr_name, str), 'expr_name must be str, you have: {}'.format(type(expr_name)) assert isinstance(expr, (castools.SX, castools.MX)), 'expr must be a casadi SX or MX type, you have: {}'.format(type(expr)) assert isinstance(ub, (int, float, np.ndarray)), 'ub must be float, int or numpy.ndarray, you have: {}'.format(type(ub)) assert isinstance(soft_constraint, bool), 'soft_constraint must be boolean, you have: {}'.format(type(soft_constraint)) if soft_constraint==True: # Add slack variable to list of slack variables: self.slack_vars_list.extend([ {'slack_name': expr_name, 'shape': expr.shape, 'ub': maximum_violation, 'penalty': penalty_term_cons} ]) # All operations to make the soft constraints work are in _setup_nl_cons. self.nl_cons_list.extend([ {'expr_name': expr_name, 'expr': expr, 'ub' : ub}]) return expr def _setup_nl_cons(self, nl_cons_input:Union[castools.SX,castools.MX])->None: """Private method that is called from :py:func:`do_mpc.controller.MPC.setup` or :py:func:`do_mpc.estimator.MHE.setup`. Afterwards no further non-linear constraints can be added with the :py:func:`Optimizer.set_nl_cons` method. This is not part of the public API. Do not call this method. Args: nl_cons_input: list of symbolic variables used as input to the nl_cons function. """ # Create struct for soft constraints: self._eps = _eps = self.model.sv.sym_struct([ castools.entry(slack_i['slack_name'], shape=slack_i['shape']) for slack_i in self.slack_vars_list ]) # Create struct for _nl_cons: # Use the previously defined sym variables to declare shape and symbolic variable. self._nl_cons = self.model.sv.struct([ castools.entry(expr_i['expr_name'], expr=expr_i['expr']) for expr_i in self.nl_cons_list ]) self.n_eps = _eps.shape[0] # Create bounds: self._eps_lb = _eps(0.0) self._eps_ub = _eps(np.inf) # Set bounds, add slack variable to constraint and add cost. for slack_i in self.slack_vars_list: self._eps_ub[slack_i['slack_name']] = slack_i['ub'] self._nl_cons[slack_i['slack_name']] -= self._eps[slack_i['slack_name']] self.slack_cost += castools.sum1(slack_i['penalty']*self._eps[slack_i['slack_name']]) # Objective function epsilon contribution: self.epsterm_fun = castools.Function('epsterm', [_eps], [self.slack_cost]) # Make function from these expressions: nl_cons_input += [_eps] self._nl_cons_fun = castools.Function('nl_cons_fun', nl_cons_input, [self._nl_cons]) # Create bounds: self._nl_cons_ub = self._nl_cons(np.inf) self._nl_cons_lb = self._nl_cons(-np.inf) # Set bounds: for nl_cons_i in self.nl_cons_list: self._nl_cons_ub[nl_cons_i['expr_name']] = nl_cons_i['ub'] def get_tvp_template(self)->Union[castools.structure3.SXStruct,castools.structure3.MXStruct]: """Obtain output template for :py:func:`set_tvp_fun`. The method returns a structured object with ``n_horizon+1`` elements, and a set of time-varying parameters (as defined in :py:class:`do_mpc.model.Model`) for each of these instances. The structure is initialized with all zeros. Use this object to define values of the time-varying parameters. This structure (with numerical values) should be used as the output of the ``tvp_fun`` function which is set to the class with :py:func:`set_tvp_fun`. Use the combination of :py:func:`get_tvp_template` and :py:func:`set_tvp_fun`. **Example:** :: # in model definition: alpha = model.set_variable(var_type='_tvp', var_name='alpha') beta = model.set_variable(var_type='_tvp', var_name='beta') ... # in optimizer configuration: tvp_temp_1 = optimizer.get_tvp_template() tvp_temp_1['_tvp', :] = np.array([1,1]) tvp_temp_2 = optimizer.get_tvp_template() tvp_temp_2['_tvp', :] = np.array([0,0]) def tvp_fun(t_now): if t_now<10: return tvp_temp_1 else: tvp_temp_2 optimizer.set_tvp_fun(tvp_fun) Returns: Casadi SX or MX structure """ tvp_template = self.model.sv.sym_struct([ castools.entry('_tvp', repeat=self.settings.n_horizon+1, struct=self.model._tvp) ]) return tvp_template(0) def set_tvp_fun(self, tvp_fun:Callable[[float],Union[castools.structure3.SXStruct,castools.structure3.MXStruct]])->None: """ Set function which returns time-varying parameters. The ``tvp_fun`` is called at each optimization step to get the current prediction of the time-varying parameters. The supplied function must be callable with the current time as the only input. Furthermore, the function must return a CasADi structured object which is based on the horizon and on the model definition. The structure can be obtained with :py:func:`get_tvp_template`. **Example:** :: # in model definition: alpha = model.set_variable(var_type='_tvp', var_name='alpha') beta = model.set_variable(var_type='_tvp', var_name='beta') ... # in optimizer configuration: tvp_temp_1 = optimizer.get_tvp_template() tvp_temp_1['_tvp', :] = np.array([1,1]) tvp_temp_2 = optimizer.get_tvp_template() tvp_temp_2['_tvp', :] = np.array([0,0]) def tvp_fun(t_now): if t_now<10: return tvp_temp_1 else: tvp_temp_2 optimizer.set_tvp_fun(tvp_fun) Note: The method :py:func:`set_tvp_fun`. must be called prior to setup IF time-varying parameters are defined in the model. It is not required to call the method if no time-varying parameters are defined. Args: tvp_fun: Function that returns the predicted tvp values at each timestep. Must have single input (float) and return a ``structure3.DMStruct`` (obtained with :py:func:`get_tvp_template`). """ assert isinstance(tvp_fun(0), castools.structure3.DMStruct), 'Incorrect output of tvp_fun. Use get_tvp_template to obtain the required structure.' assert self.get_tvp_template().labels() == tvp_fun(0).labels(), 'Incorrect output of tvp_fun. Use get_tvp_template to obtain the required structure.' self.flags['set_tvp_fun'] = True self.tvp_fun = tvp_fun def compile_nlp(self, overwrite:bool = False, cname:str = 'nlp.c', libname:str='nlp.so', compiler_command:str=None)->None: """Compile the NLP. This may accelerate the optimization. As compilation is time consuming, the default option is to NOT overwrite (``overwrite=False``) an existing compilation. If an existing compilation with the name ``libname`` is found, it is used. **This can be dangerous, if the NLP has changed** (user tweaked the cost function, the model etc.). Warnings: This feature is experimental and currently only supported on Linux and MacOS. **What happens here?** 1. The NLP is written to a C-file (``cname``) 2. The C-File (``cname``) is compiled. The custom compiler uses: :: gcc -fPIC -shared -O1 {cname} -o {libname} 3. The compiled library is linked to the NLP. This overwrites the original NLP. Options from the previous NLP (e.g. linear solver) are kept. :: self.S = nlpsol('solver_compiled', 'ipopt', f'{libname}', self.nlpsol_opts) Args: overwrite: If True, the existing compiled NLP will be overwritten. cname: Name of the C file that will be exported. libname: Name of the shared library that will be created after compilation. compiler_command: Command to use for compiling. If None, the default compiler command will be used. Please make sure to use matching strings for ``libname`` when supplying your custom compiler command. """ if not self.flags['setup']: raise Exception('Optimizer not setup. Call setup first.') if castools.sys.platform not in ('darwin', 'linux', 'linux2'): raise Exception('Compilation not supported on this platform.') if compiler_command is None: compiler_command = "gcc -fPIC -shared -O1 {cname} -o {libname}".format(cname=cname, libname=libname) # Only compile if not already compiled: if overwrite or not os.path.isfile(libname): # Create c code from solver object print('Generating c-code of nlp.') self.S.generate_dependencies(cname) # Compile c code print('Compiling c-code of nlp.') subprocess.Popen(compiler_command, shell=True).wait() # Overwrite solver object with loaded nlp: self.S = nlpsol('solver_compiled', 'ipopt', libname, self.settings.nlpsol_opts) print('Using compiled NLP solver.') def solve(self)->None: """Solves the optmization problem. The current problem is defined by the parameters in the :py:attr:`opt_p_num` CasADi structured Data. Typically, :py:attr:`opt_p_num` is prepared for the current iteration in the :py:func:`make_step` method. It is, however, valid and possible to directly set paramters in :py:attr:`opt_p_num` before calling :py:func:`solve`. The method updates the :py:attr:`opt_p_num` and :py:attr:`opt_x_num` attributes of the class. By resetting :py:attr:`opt_x_num` to the current solution, the method implicitly enables **warmstarting the optimizer** for the next iteration, since this vector is always used as the initial guess. Warnings: The method is part of the public API but it is generally not advised to use it. Instead we recommend to call :py:func:`make_step` at each iterations, which acts as a wrapper for :py:func:`solve`. Raises: asssertion: Optimizer was not setup yet. """ assert self.flags['setup'] == True, 'optimizer was not setup yet. Please call optimizer.setup().' solver_call_kwargs = { 'x0': self.opt_x_num, 'lbx': self._lb_opt_x, 'ubx': self._ub_opt_x, 'lbg': self.nlp_cons_lb, 'ubg': self.nlp_cons_ub, 'p': self.opt_p_num, } # Warmstarting the optimizer after the initial run: if self.flags['initial_run']: solver_call_kwargs.update({ 'lam_x0': self.lam_x_num, 'lam_g0': self.lam_g_num, }) r = self.S(**solver_call_kwargs) # Note: .master accesses the underlying vector of the structure. self.opt_x_num.master = r['x'] self.opt_x_num_unscaled.master = r['x']*self.opt_x_scaling self.opt_g_num = r['g'] # Values of lagrange multipliers: self.lam_g_num = r['lam_g'] self.lam_x_num = r['lam_x'] self.solver_stats = self.S.stats() # Calculate values of auxiliary expressions (defined in model) self.opt_aux_num.master = self.opt_aux_expression_fun( self.opt_x_num, self.opt_p_num ) # For warmstarting purposes: Flag that initial run has been completed. self.flags['initial_run'] = True def _setup_discretization(self): """Private method that creates the discretization for the optimizer (MHE or MPC). Returns the integrator function (``ifcn``) and the total number of collocation points. The following discretization methods are available: * orthogonal collocation * discrete dynamics Discretization parameters can be set with the :py:func:`do_mpc.controller.MPC.set_param` and :py:func:`do_mpc.estimator.MHE.set_param` methods. There is no point in calling this method as part of the public API. """ # Scaled variables _x, _u, _z, _tvp, _p, _w = self.model['x', 'u', 'z', 'tvp', 'p', 'w'] # Unscale variables _x_unscaled = _x*self._x_scaling.cat _u_unscaled = _u*self._u_scaling.cat _z_unscaled = _z*self._z_scaling.cat _p_unscaled = _p*self._p_scaling.cat # Create _rhs and _alg _rhs = self.model._rhs_fun(_x_unscaled, _u_unscaled, _z_unscaled, _tvp, _p_unscaled, _w) _alg = self.model._alg_fun(_x_unscaled, _u_unscaled, _z_unscaled, _tvp, _p_unscaled, _w) # Scale (only _rhs) _rhs_scaled = _rhs/self._x_scaling.cat if self.model.model_type == 'discrete': _i = self.model.sv.sym('i', 0) # discrete integrator ifcs mimics the API the collocation ifcn. ifcn = castools.Function('ifcn', [_x, _i, _u, _z, _tvp, _p, _w], [_alg, _rhs_scaled]) n_total_coll_points = 0 elif self.settings.state_discretization == 'collocation': ffcn = castools.Function('ffcn', [_x, _u, _z, _tvp, _p, _w], [_rhs_scaled]) afcn = castools.Function('afcn', [_x, _u, _z, _tvp, _p, _w], [_alg]) # Get collocation information coll = self.settings.collocation_type deg = self.settings.collocation_deg ni = self.settings.collocation_ni nk = self.settings.n_horizon t_step = self.settings.t_step n_x = self.model.n_x n_u = self.model.n_u n_p = self.model.n_p n_z = self.model.n_z n_w = self.model.n_w n_tvp = self.model.n_tvp n_total_coll_points = (deg + 1) * ni # Choose collocation points if coll == 'legendre': # Legendre collocation points tau_root = [0] + castools.collocation_points(deg, 'legendre') elif coll == 'radau': # Radau collocation points tau_root = [0] + castools.collocation_points(deg, 'radau') else: raise Exception('Unknown collocation scheme') # Size of the finite elements h = t_step / ni # Coefficients of the collocation equation C = np.zeros((deg + 1, deg + 1)) # Coefficients of the continuity equation D = np.zeros(deg + 1) # Dimensionless time inside one control interval tau = self.model.sv.sym("tau") # All collocation time points T = np.zeros((nk, ni, deg + 1)) for k in range(nk): for i in range(ni): for j in range(deg + 1): T[k, i, j] = h * (k * ni + i + tau_root[j]) # For all collocation points for j in range(deg + 1): # Construct Lagrange polynomials to get the polynomial basis at the # collocation point L = 1 for r in range(deg + 1): if r != j: L *= (tau - tau_root[r]) / (tau_root[j] - tau_root[r]) lfcn = castools.Function('lfcn', [tau], [L]) D[j] = lfcn(1.0) # Evaluate the time derivative of the polynomial at all collocation # points to get the coefficients of the continuity equation tfcn = castools.Function('tfcn', [tau], [castools.tangent(L, tau)]) for r in range(deg + 1): C[j, r] = tfcn(tau_root[r]) # Define symbolic variables for collocation xk0 = self.model.sv.sym("xk0", n_x) #zk = self.model.sv.sym("zk", n_z) pk = self.model.sv.sym("pk", n_p) tv_pk = self.model.sv.sym("tv_pk", n_tvp) uk = self.model.sv.sym("uk", n_u) wk = self.model.sv.sym("wk", n_w) # State trajectory n_ik = ni * (deg + 1) * n_x ik = self.model.sv.sym("ik", n_ik) ik_split = np.resize(np.array([], dtype=self.model.sv.dtype), (ni, deg + 1)) offset = 0 # Algebraic trajectory n_zk = ni * (deg +1) * n_z zk = self.model.sv.sym("zk", n_zk) offset_z = 0 zk_split = np.resize(np.array([], dtype=self.model.sv.dtype), (ni, deg + 1)) # Store initial condition ik_split[0, 0] = xk0 zk_split[0, 0] = zk[offset_z:offset_z + n_z] offset_z += n_z first_j = 1 # Skip allocating x for the first collocation point for the first finite element # For each finite element for i in range(ni): # For each collocation point for j in range(first_j, deg + 1): # Get the expression for the state vector ik_split[i, j] = ik[offset:offset + n_x] zk_split[i, j] = zk[offset_z:offset_z + n_z] offset_z += n_z offset += n_x # All collocation points in subsequent finite elements first_j = 0 # Get the state at the end of the control interval xkf = ik[offset:offset + n_x] offset += n_x # Check offset for consistency assert(offset == n_ik) assert(offset_z == n_zk) # Constraints in the control interval gk = [] lbgk = [] ubgk = [] # For all finite elements for i in range(ni): # for the first point: a_i0 = afcn(ik_split[i, 0], uk, zk_split[i,0], tv_pk, pk, wk) gk.append(a_i0) lbgk.append(np.zeros(n_z)) ubgk.append(np.zeros(n_z)) # For all collocation points for j in range(1, deg + 1): # Get an expression for the state derivative at the coll point xp_ij = 0 for r in range(deg + 1): xp_ij += C[r, j] * ik_split[i, r] # Add collocation equations to the NLP f_ij = ffcn(ik_split[i, j], uk, zk_split[i,j], tv_pk, pk, wk) gk.append(h * f_ij - xp_ij) lbgk.append(np.zeros(n_x)) # equality constraints ubgk.append(np.zeros(n_x)) # equality constraints # algebraic constraints a_ij = afcn(ik_split[i, j], uk, zk_split[i,j], tv_pk, pk, wk) gk.append(a_ij) lbgk.append(np.zeros(n_z)) ubgk.append(np.zeros(n_z)) # Get an expression for the state at the end of the finite element xf_i = 0 for r in range(deg + 1): xf_i += D[r] * ik_split[i, r] # Add continuity equation to NLP x_next = ik_split[i + 1, 0] if i + 1 < ni else xkf gk.append(x_next - xf_i) lbgk.append(np.zeros(n_x)) ubgk.append(np.zeros(n_x)) # Concatenate constraints gk = castools.vertcat(*gk) lbgk = np.concatenate(lbgk) ubgk = np.concatenate(ubgk) assert(gk.shape[0] == ik.shape[0] + zk.shape[0]) # Create the integrator function ifcn = castools.Function("ifcn", [xk0, ik, uk, zk, tv_pk, pk, wk], [gk, xkf]) # Return the integration function and the number of collocation points return ifcn, n_total_coll_points def _setup_scenario_tree(self): """Private method that builds the scenario tree given the possible values of the uncertain parmeters. By default all possible combinations of uncertain parameters are evaluated. See the API in :py:class:`do_mpc.controller.MHE` for the high level / low level API. This method is currently only used for the MPC controller. There is no point in calling this method as part of the public API. """ n_p = self.model.n_p nk = self.settings.n_horizon n_robust = self.settings.n_robust # Build auxiliary variables that code the structure of the tree # Number of branches n_branches = [self.n_combinations if k < n_robust else 1 for k in range(nk)] # Calculate the number of scenarios (nodes at each stage) n_scenarios = [self.n_combinations**min(k, n_robust) for k in range(nk + 1)] # Scenaro tree structure child_scenario = -1 * np.ones((nk, n_scenarios[-1], n_branches[0])).astype(int) parent_scenario = -1 * np.ones((nk + 1, n_scenarios[-1])).astype(int) branch_offset = -1 * np.ones((nk, n_scenarios[-1])).astype(int) structure_scenario = np.zeros((nk + 1, n_scenarios[-1])).astype(int) # Fill in the auxiliary structures for k in range(nk): # Scenario counter scenario_counter = 0 # For all scenarios for s in range(n_scenarios[k]): # For all uncertainty realizations for b in range(n_branches[k]): child_scenario[k][s][b] = scenario_counter structure_scenario[k][scenario_counter] = s structure_scenario[k+1][scenario_counter] = s parent_scenario[k + 1][scenario_counter] = s scenario_counter += 1 # Store the range of branches if n_robust == 0: branch_offset[k][s] = 0 elif k < n_robust: branch_offset[k][s] = 0 else: branch_offset[k][s] = s % n_branches[0] self.scenario_tree = { 'structure_scenario': structure_scenario, 'n_branches': n_branches, 'n_scenarios': n_scenarios, 'parent_scenario': parent_scenario, 'branch_offset': branch_offset } return n_branches, n_scenarios, child_scenario, parent_scenario, branch_offset def prepare_nlp(self)->None: """Prepare the optimization problem. Typically, this method is called internally from :py:meth:`setup`. Users should only call this method if they intend to modify the objective with :py:attr:`nlp_obj`, the constraints with :py:attr:`nlp_cons`, :py:attr:`nlp_cons_lb` and :py:attr:`nlp_cons_ub`. To finish the setup process, users MUST call :py:meth:`create_nlp` afterwards. Note: Do NOT call :py:meth:`setup` if you intend to go the manual route with :py:meth:`prepare_nlp` and :py:meth:`create_nlp`. Note: Only AFTER calling :py:meth:`prepare_nlp` the previously mentionned attributes :py:attr:`nlp_obj`, :py:attr:`nlp_cons`, :py:attr:`nlp_cons_lb`, :py:attr:`nlp_cons_ub` become available. Returns: None """ # MPC and MHE have similar methods. The documentation is valid for both of them. self._prepare_nlp() def create_nlp(self)->None: """Create the optimization problem. Typically, this method is called internally from :py:meth:`setup`. Users should only call this method if they intend to modify the objective with :py:attr:`nlp_obj`, the constraints with :py:attr:`nlp_cons`, :py:attr:`nlp_cons_lb` and :py:attr:`nlp_cons_ub`. To finish the setup process, users MUST call :py:meth:`create_nlp` afterwards. Note: Do NOT call :py:meth:`setup` if you intend to go the manual route with :py:meth:`prepare_nlp` and :py:meth:`create_nlp`. Note: Only AFTER calling :py:meth:`prepare_nlp` the previously mentionned attributes :py:attr:`nlp_obj`, :py:attr:`nlp_cons`, :py:attr:`nlp_cons_lb`, :py:attr:`nlp_cons_ub` become available. Returns: None """ # MPC and MHE have similar methods. The documentation is valid for both of them. self._create_nlp()