#
# This file is part of do-mpc
#
# do-mpc: An environment for the easy, modular and efficient implementation of
# robust nonlinear model predictive control
#
# Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
# TU Dortmund. All rights reserved
#
# do-mpc is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# do-mpc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with do-mpc. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from casadi import *
from casadi.tools import *
import pdb
import copy
import warnings
import time
import do_mpc.optimizer
import do_mpc.data
[docs]class Estimator(do_mpc.model.IteratedVariables):
"""The Estimator base class. Used for :py:class:`StateFeedback`, :py:class:`EKF` and :py:class:`MHE`.
This class cannot be used independently.
.. note::
The methods :py:func:`Estimator.set_initial_state` and :py:func:`Estimator.reset_history`
are overwritten when using the :py:class:`MHE` by the methods defined in :py:class:`do_mpc.optimizer.Optimizer`.
"""
def __init__(self, model):
self.model = model
do_mpc.model.IteratedVariables.__init__(self)
assert model.flags['setup'] == True, 'Model for estimator was not setup. After the complete model creation call model.setup_model().'
self.data = do_mpc.data.Data(model)
self.data.dtype = 'Estimator'
def reset_history(self):
"""Reset the history of the estimator
"""
self.data.init_storage()
[docs]class StateFeedback(Estimator):
"""Simple state-feedback "estimator".
The main method :py:func:`StateFeedback.make_step` simply returns the input.
Why do you even bother to use this class?
"""
def __init__(self, model):
super().__init__(model)
def make_step(self, y0):
"""Return the measurement ``y0``.
"""
return y0
[docs]class EKF(Estimator):
"""Extended Kalman Filter. Setup this class and use :py:func:`EKF.make_step`
during runtime to obtain the currently estimated states given the measurements ``y0``.
.. warning::
Not currently implemented.
"""
def __init__(self, model):
raise Exception('EKF is not currently supported. This is a placeholder.')
super().__init__(model)
# Flags are checked when calling .setup.
self.flags = {
'setup': False,
}
def make_step(self, y0):
"""Main method during runtime. Pass the most recent measurement and
retrieve the estimated state."""
assert self.flags['setup'] == True, 'EKF was not setup yet. Please call EKF.setup().'
None
[docs]class MHE(do_mpc.optimizer.Optimizer, Estimator):
"""Moving horizon estimator.
For general information on moving horizon estimation, please read our `background article`_.
.. _`background article`: ../theory_mhe.html
The MHE estimator extends the :py:class:`do_mpc.optimizer.Optimizer` base class
(which is also used for :py:class:`do_mpc.controller.MPC`), as well as the :py:class:`Estimator` base class.
Use this class to configure and run the MHE based on a previously configured :py:class:`do_mpc.model.Model` instance.
The class is initiated by passing a list of the **parameters that should be estimated**. This must be a subset (or all) of the parameters defined in
:py:class:`do_mpc.model.Model`. This allows to define parameters in the model that influence the model externally (e.g. weather predictions),
and those that are internal e.g. system parameters and can be estimated.
Passing an empty list (default) value, means that no parameters are estimated.
.. note::
Parameters are influencing the model equation at all timesteps but are constant over the entire horizon.
Parameters could also be introduced as states without dynamic but this would increase the total number of optimization variables.
**Configuration and setup:**
Configuring and setting up the MHE involves the following steps:
1. Use :py:func:`set_param` to configure the :py:class:`MHE`. See docstring for details.
2. Set the objective of the control problem with :py:func:`set_default_objective` or use the low-level interface :py:func:`set_objective`.
5. Set upper and lower bounds.
6. Optionally, set further (non-linear) constraints with :py:func:`set_nl_cons`.
7. Use :py:func:`get_p_template` and :py:func:`set_p_fun` to set the function for the parameters.
8. Finally, call :py:func:`setup`.
.. warning::
Before running the estimator, make sure to supply a valid initial guess for all estimated variables (states, algebraic states, inputs and parameters).
Simply set the intial values of :py:attr:`x0`, :py:attr:`z0`, :py:attr:`u0` and :py:attr:`p_est0` and then call :py:func:`set_initial_guess`.
To take full control over the initial guess, modify the values of :py:attr:`opt_x_num`.
During runtime use :py:func:`make_step` with the most recent measurement to obtain the estimated states.
:param model: A configured and setup :py:class:`do_mpc.model.Model`
:type model: :py:class:`do_mpc.model.Model`
:param p_est_list: List with names of parameters (``_p``) defined in ``model``
:type p_est_list: list
"""
def __init__(self, model, p_est_list=[]):
Estimator.__init__(self, model)
do_mpc.optimizer.Optimizer.__init__(self)
# Initialize structure to hold the optimial solution and initial guess:
self._opt_x_num = None
# Initialize structure to hold the parameters for the optimization problem:
self._opt_p_num = None
# Parameters that can be set for the MHE:
self.data_fields = [
'n_horizon',
't_step',
'meas_from_data',
'state_discretization',
'collocation_type',
'collocation_deg',
'collocation_ni',
'nl_cons_check_colloc_points',
'cons_check_colloc_points',
'store_full_solution',
'store_lagr_multiplier',
'store_solver_stats',
'nlpsol_opts'
]
# Default Parameters:
self.meas_from_data = False
self.state_discretization = 'collocation'
self.collocation_type = 'radau'
self.collocation_deg = 2
self.collocation_ni = 1
self.nl_cons_check_colloc_points = False
self.cons_check_colloc_points = True
self.store_full_solution = False
self.store_lagr_multiplier = True
self.store_solver_stats = [
'success',
't_wall_S',
't_wall_S',
]
self.nlpsol_opts = {} # Will update default options with this dict.
# Create seperate structs for the estimated and the set parameters (the union of both are all parameters of the model.)
_p = model._p
self._p_est = struct_symSX(
[entry('default', shape=(0,0))]+
[entry(p_i, sym=_p[p_i]) for p_i in _p.keys() if p_i in p_est_list]
)
self._p_set = struct_symSX(
[entry(p_i, sym=_p[p_i]) for p_i in _p.keys() if p_i not in p_est_list]
)
# Function to obtain full set of parameters from the seperate structs (while obeying the order):
self._p_cat_fun = Function('p_cat_fun', [self._p_est, self._p_set], [_p])
self.n_p_est = self._p_est.shape[0]
self.n_p_set = self._p_set.shape[0]
# Initialize additional structures by calling the symbolic structures defined above
# with the default numerical value.
# This returns an identical numerical structure with all values set to the passed value.
# TODO: p_scaling already exists. Maybe use it instead of these seperate structs?
self._p_est_scaling = self._p_est(1.0)
self._p_set_scaling = self._p_set(1.0) # This not meant to be adapted. We need it to concatenate p_scaling.
self._p_est_lb = self._p_est(-np.inf)
self._p_est_ub = self._p_est(np.inf)
self._p_est0 = self._p_est(0.0)
# Introduce aliases / new variables to smoothly and intuitively formulate
# the MHE objective function.
self._y_meas = self.model._y
self._x_prev = copy.copy(self.model._x)
self._x = self.model._x
self._p_est_prev = copy.copy(self._p_est)
self._p_est = self._p_est
self._w = self.model._w
self._v = self.model._v
# Flags are checked when calling .setup.
self.flags = {
'setup': False,
'set_tvp_fun': False,
'set_p_fun': False,
'set_y_fun': False,
'set_objective': False,
'set_initial_guess': False,
}
@property
def p_est0(self):
""" Initial value of estimated parameters and current iterate.
This is the numerical structure holding the information about the current
estimated parameters in the class.
The property can be indexed according to the model definition.
**Example:**
::
model = do_mpc.model.Model('continuous')
model.set_variable('_p','temperature', shape=(4,1))
# Initiate MHE with list of estimated parameters:
mhe = do_mpc.estimator.MHE(model, ['temperature'])
# Get or set current value of variable:
mhe.p_est0['temperature', 0] # 0th element of variable
mhe.p_est0['temperature'] # all elements of variable
mhe.p_est0['temperature', 0:2] # 0th and 1st element
Usefull CasADi symbolic structure methods:
* ``.shape``
* ``.keys()``
* ``.labels()``
"""
return self._p_est0
@p_est0.setter
def p_est0(self, val):
self._p_est0 = self._convert2struct(val, self._p_est)
@property
def opt_x_num(self):
"""Full MHE solution and initial guess.
This is the core attribute of the MHE class.
It is used as the initial guess when solving the optimization problem
and then overwritten with the current solution.
The attribute is a CasADi numeric structure with nested power indices.
It can be indexed as follows:
::
# dynamic states:
opt_x_num['_x', time_step, collocation_point, _x_name]
# algebraic states:
opt_x_num['_z', time_step, collocation_point, _z_name]
# inputs:
opt_x_num['_u', time_step, _u_name]
# estimated parameters:
opt_x_Num['_p_est', _p_names]
# slack variables for soft constraints:
opt_x_num['_eps', time_step, _nl_cons_name]
The names refer to those given in the :py:class:`do_mpc.model.Model` configuration.
Further indices are possible, if the variables are itself vectors or matrices.
The attribute can be used **to manually set a custom initial guess or for debugging purposes**.
.. note::
The attribute ``opt_x_num`` carries the scaled values of all variables. See ``opt_x_num_unscaled``
for the unscaled values (these are not used as the initial guess).
.. warning::
Do not tweak or overwrite this attribute unless you known what you are doing.
.. note::
The attribute is populated when calling :py:func:`setup`
"""
return self._opt_x_num
@opt_x_num.setter
def opt_x_num(self, val):
self._opt_x_num = val
@property
def opt_p_num(self):
"""Full MHE parameter vector.
This attribute is used when calling the solver to pass all required parameters,
including
* previously estimated state(s)
* previously estimated parameter(s)
* known parameters
* sequence of time-varying parameters
* sequence of measurements parameters
**do-mpc** handles setting these parameters automatically in the :py:func:`make_step`
method. However, you can set these values manually and directly call :py:func:`solve`.
The attribute is a CasADi numeric structure with nested power indices.
It can be indexed as follows:
::
# previously estimated state:
opt_p_num['_x_prev', _x_name]
# previously estimated parameters:
opt_p_num['_p_est_prev', _x_name]
# known parameters
opt_p_num['_p_set', _p_name]
# time-varying parameters:
opt_p_num['_tvp', time_step, _tvp_name]
# sequence of measurements:
opt_p_num['_y_meas', time_step, _y_name]
The names refer to those given in the :py:class:`do_mpc.model.Model` configuration.
Further indices are possible, if the variables are itself vectors or matrices.
.. warning::
Do not tweak or overwrite this attribute unless you known what you are doing.
.. note::
The attribute is populated when calling :py:func:`setup`
"""
return self._opt_p_num
@opt_p_num.setter
def opt_p_num(self, val):
self._opt_p_num = val
def set_param(self, **kwargs):
"""Method to set the parameters of the :py:class:`MHE` class. Parameters must be passed as pairs of valid keywords and respective argument.
For example:
::
mhe.set_param(n_horizon = 20)
It is also possible and convenient to pass a dictionary with multiple parameters simultaneously as shown in the following example:
::
setup_mhe = {
'n_horizon': 20,
't_step': 0.5,
}
mhe.set_param(**setup_mhe)
This makes use of thy python "unpack" operator. See `more details here`_.
.. _`more details here`: https://codeyarns.github.io/tech/2012-04-25-unpack-operator-in-python.html
.. note:: The only required parameters are ``n_horizon`` and ``t_step``. All other parameters are optional.
.. note:: :py:func:`set_param` can be called multiple times. Previously passed arguments are overwritten by successive calls.
The following parameters are available:
:param n_horizon: Prediction horizon of the optimal control problem. Parameter must be set by user.
:type n_horizon: int
:param t_step: Timestep of the mhe.
:type t_step: float
:param meas_from_data: Default option to retrieve past measurements for the MHE optimization problem. The :py:func:`set_y_fun` is called during setup.
:type meas_from_data: bool
:param state_discretization: Choose the state discretization for continuous models. Currently only ``'collocation'`` is available. Defaults to ``'collocation'``.
:type state_discretization: str
:param collocation_type: Choose the collocation type for continuous models with collocation as state discretization. Currently only ``'radau'`` is available. Defaults to ``'radau'``.
:type collocation_type: str
:param collocation_deg: Choose the collocation degree for continuous models with collocation as state discretization. Defaults to ``2``.
:type collocation_deg: int
:param collocation_ni: For orthogonal collocation, choose the number of finite elements for the states within a time-step (and during constant control input). Defaults to ``1``. Can be used to avoid high-order polynomials.
:type collocation_ni: int
:param nl_cons_check_colloc_points: For orthogonal collocation choose wether the bounds set with :py:func:`set_nl_cons` are evaluated once per finite Element or for each collocation point. Defaults to ``False`` (once per collocation point).
:type nl_cons_check_colloc_points: bool
:param cons_check_colloc_points: For orthogonal collocation choose whether the linear bounds set with :py:attr:`bounds` are evaluated once per finite Element or for each collocation point. Defaults to ``True`` (for all collocation points).
:type cons_check_colloc_points: bool
:param store_full_solution: Choose whether to store the full solution of the optimization problem. This is required for animating the predictions in post processing. However, it drastically increases the required storage. Defaults to False.
:type store_full_solution: bool
:param store_lagr_multiplier: Choose whether to store the lagrange multipliers of the optimization problem. Increases the required storage. Defaults to ``True``.
:type store_lagr_multiplier: bool
:param store_solver_stats: Choose which solver statistics to store. Must be a list of valid statistics. Defaults to ``['success','t_wall_S','t_wall_S']``.
:type store_solver_stats: list
:param nlpsol_opts: Dictionary with options for the CasADi solver call ``nlpsol`` with plugin ``ipopt``. All options are listed `here <http://casadi.sourceforge.net/api/internal/d4/d89/group__nlpsol.html>`_.
:type store_solver_stats: dict
.. note:: We highly suggest to change the linear solver for IPOPT from `mumps` to `MA27`. In many cases this will drastically boost the speed of **do-mpc**. Change the linear solver with:
::
optimizer.set_param(nlpsol_opts = {'ipopt.linear_solver': 'MA27'})
.. note:: To surpress the output of IPOPT, please use:
::
surpress_ipopt = {'ipopt.print_level':0, 'ipopt.sb': 'yes', 'print_time':0}
optimizer.set_param(nlpsol_opts = surpress_ipopt)
"""
assert self.flags['setup'] == False, 'Setting parameters after setup is prohibited.'
for key, value in kwargs.items():
if not (key in self.data_fields):
print('Warning: Key {} does not exist for optimizer.'.format(key))
else:
setattr(self, key, value)
def set_objective(self, stage_cost, arrival_cost):
"""Set the stage cost :math:`l(\cdot)` and arrival cost :math:`m(\cdot)` function for the MHE problem:
.. math::
\\underset{
\\begin{array}{c}
\\mathbf{x}_{0:N+1}, \\mathbf{u}_{0:N}, p,\\\\
\\mathbf{w}_{0:N}, \\mathbf{v}_{0:N}
\\end{array}
}{\\mathrm{min}}
&m(x_0,\\tilde{x}_0, p,\\tilde{p})
+\\sum_{k=0}^{N-1} l(v_k, w_k, p, p_{\\text{tv},k}),\\\\
&\\left.\\begin{aligned}
\\mathrm{s.t.}\\quad
x_{k+1} &= f(x_k,u_k,z_k,p,p_{\\text{tv},k})+ w_k,\\\\
y_k &= h(x_k,u_k,z_k,p,p_{\\text{tv},k}) + v_k, \\\\
&g(x_k,u_k,z_k,p_k,p_{\\text{tv},k}) \\leq 0
\\end{aligned}\\right\} k=0,\\dots, N
Use the class attributes:
* ``mhe._w`` as :math:`w_k`
* ``mhe._v`` as :math:`v_k`
* ``mhe._x_prev`` as :math:`\\tilde{x}_0`
* ``mhe._x`` as :math:`x_0`
* ``mhe._p_est_prev`` as :math:`\\tilde{p}_0`
* ``mhe._p_est`` as :math:`p_0`
To formulate the objective function and pass the stage cost and arrival cost independently.
.. note::
The retrieved attributes are symbolic structures, which can be queried with the given variable names,
e.g.:
::
x1 = mhe._x['state_1']
For a vector of all states, use the ``.cat`` method as shown in the example below.
**Example:**
::
# Get variables:
v = mhe._v.cat
stage_cost = v.T@np.diag(np.array([1,1,1,20,20]))@v
x_0 = mhe._x
x_prev = mhe._x_prev
p_0 = mhe._p_est
p_prev = mhe._p_est_prev
dx = x_0.cat - x_prev.cat
dp = p_0.cat - p_prev.cat
arrival_cost = 1e-4*dx.T@dx + 1e-4*dp.T@dp
mhe.set_objective(stage_cost, arrival_cost)
.. note::
Use :py:func:`set_default_objective` as a high-level wrapper for this method,
if you want to use the default MHE objective function.
:param stage_cost: Stage cost that is added to the MHE objective at each age.
:type stage_cost: CasADi expression
:param arrival_cost: Arrival cost that is added to the MHE objective at the initial state.
:type arrival_cost: CasADi expression
:return: None
:rtype: None
"""
assert stage_cost.shape == (1,1), 'stage_cost must have shape=(1,1). You have {}'.format(stage_cost.shape)
assert arrival_cost.shape == (1,1), 'arrival_cost must have shape=(1,1). You have {}'.format(arrival_cost.shape)
assert self.flags['setup'] == False, 'Cannot call .set_objective after .setup.'
stage_cost_input = self._w, self._v, self.model._tvp, self.model._p
err_msg = 'objective cost equation must be solely depending on w, v, p and tvp.'
assert set(symvar(stage_cost)).issubset(set(symvar(vertcat(*stage_cost_input)))), err_msg
self.stage_cost_fun = Function('stage_cost_fun', [*stage_cost_input], [stage_cost])
arrival_cost_input = self._x, self._x_prev, self._p_est, self._p_est_prev, self._p_set
err_msg = 'Arrival cost equation must be solely depending on x_0, x_prev, p_0, p_prev, p_set'
assert set(symvar(arrival_cost)).issubset(set(symvar(vertcat(*arrival_cost_input)))), err_msg
self.arrival_cost_fun = Function('arrival_cost_fun', arrival_cost_input, [arrival_cost])
self.flags['set_objective'] = True
def set_default_objective(self, P_x, P_v=None, P_p=None, P_w=None):
""" Configure the suggested default MHE formulation.
Use this method to pass tuning matrices for the MHE optimization problem:
.. math::
\\underset{
\\begin{array}{c}
\\mathbf{x}_{0:N+1}, \\mathbf{u}_{0:N}, p,\\\\
\\mathbf{w}_{0:N}, \\mathbf{v}_{0:N}
\\end{array}
}{\\mathrm{min}}
&m(x_0,\\tilde{x}_0, p,\\tilde{p})
+\\sum_{k=0}^{N-1} l(v_k, w_k, p, p_{\\text{tv},k}),\\\\
&\\left.\\begin{aligned}
\\mathrm{s.t.}\\quad
x_{k+1} &= f(x_k,u_k,z_k,p,p_{\\text{tv},k})+ w_k,\\\\
y_k &= h(x_k,u_k,z_k,p,p_{\\text{tv},k}) + v_k, \\\\
&g(x_k,u_k,z_k,p_k,p_{\\text{tv},k}) \\leq 0
\\end{aligned}\\right\} k=0,\\dots, N
where we introduce the bold letter notation,
e.g. :math:`\mathbf{x}_{0:N+1}=[x_0, x_1, \dots, x_{N+1}]^T` to represent sequences and where
:math:`\|x\|_P^2=x^T P x` denotes the :math:`P` weighted squared norm.
Pass the weighting matrices :math:`P_x`, :math:`P_p` and :math:`P_v` and :math:`P_w`.
The matrices must be of appropriate dimension and array-like.
.. note::
It is possible to pass parameters or time-varying parameters defined in the
:py:class:`do_mpc.model.Model` as weighting.
You'll probably choose time-varying parameters (``_tvp``) for ``P_v`` and ``P_w``
and parameters (``_p``) for ``P_x`` and ``P_p``.
Use :py:func:`set_p_fun` and :py:func:`set_tvp_fun` to configure how these values
are determined at each time step.
**General remarks:**
* In the case that no parameters are estimated, the weighting matrix :math:`P_p` is not required.
* In the case that the :py:class:`do_mpc.model.Model` is configured without process-noise (see :py:func:`do_mpc.model.Model.set_rhs`) the parameter ``P_w`` is not required.
* In the case that the :py:class:`do_mpc.model.Model` is configured without measurement-noise (see :py:func:`do_mpc.model.Model.set_meas`) the parameter ``P_v`` is not required.
The respective terms are not present in the MHE formulation in that case.
.. note::
Use :py:func:`set_objective` as a low-level alternative for this method,
if you want to use a custom objective function.
:param P_x: Tuning matrix :math:`P_x` of dimension :math:`n \\times n` :math:`(x \\in \\mathbb{R}^{n})`
:type P_x: numpy.ndarray, casadi.SX, casadi.DM
:param P_v: Tuning matrix :math:`P_v` of dimension :math:`m \\times m` :math:`(v \\in \\mathbb{R}^{m})`
:type P_v: numpy.ndarray, casadi.SX, casadi.DM
:param P_p: Tuning matrix :math:`P_p` of dimension :math:`l \\times l` :math:`(p_{\\text{est}} \\in \\mathbb{R}^{l})`)
:type P_p: numpy.ndarray, casadi.SX, casadi.DM
:param P_w: Tuning matrix :math:`P_w` of dimension :math:`k \\times k` :math:`(w \\in \\mathbb{R}^{k})`
:type P_w: numpy.ndarray, casadi.SX, casadi.DM
"""
input_types = (np.ndarray, casadi.SX, casadi.DM)
err_msg = '{name} must be of type {type_set}, you have {type_is}'
assert isinstance(P_x, input_types), err_msg.format(name='P_x', type_set = input_types, type_is = type(P_x))
input_types = (np.ndarray, casadi.SX, casadi.DM, type(None))
assert isinstance(P_v, input_types), err_msg.format(name='P_v', type_set = input_types, type_is = type(P_v))
assert isinstance(P_p, input_types), err_msg.format(name='P_p', type_set = input_types, type_is = type(P_p))
assert isinstance(P_w, input_types), err_msg.format(name='P_w', type_set = input_types, type_is = type(P_w))
n_x = self.model.n_x
n_y = self.model.n_y
n_w = self.model.n_w
n_v = self.model.n_v
n_p = self.n_p_est
assert P_x.shape == (n_x, n_x), 'P_x has wrong shape:{}, must be {}'.format(P_x.shape, (n_x,n_x))
# Calculate stage cost:
stage_cost = 0
if P_v is None:
assert n_v == 0, 'Must pass weighting factor P_v, since you have measurement noise on some measurements (configured in model).'
else:
assert P_v.shape == (n_v, n_v), 'P_v has wrong shape:{}, must be {}'.format(P_v.shape, (n_v,n_v))
v = self._v.cat
stage_cost += v.T@P_v@v
if P_w is None:
assert n_w == 0, 'Must pass weighting factor P_w, since you have process noise on some states (configured in model).'
else:
assert P_w.shape == (n_w, n_w), 'P_w has wrong shape:{}, must be {}'.format(P_w.shape, (n_w,n_w))
w = self._w.cat
stage_cost += w.T@P_w@w
# Calculate arrival cost:
x_0 = self._x
x_prev = self._x_prev
dx = x_0.cat - x_prev.cat
arrival_cost = dx.T@P_x@dx
# Add parameter term if there are parameters to be estimated:
if P_p is None:
assert n_p == 0, 'Must pass weighting factor P_p, since you are trying to estimate parameters.'
else:
assert P_p.shape == (n_p, n_p), 'P_p has wrong shape:{}, must be {}'.format(P_p.shape, (n_p,n_p))
p_0 = self._p_est
p_prev = self._p_est_prev
dp = p_0.cat - p_prev.cat
arrival_cost += dp.T@P_p@dp
# Set MHE objective:
self.set_objective(stage_cost, arrival_cost)
def get_p_template(self):
"""Obtain output template for :py:func:`set_p_fun`.
This is used to set the (not estimated) parameters.
Use this structure as the return of a user defined parameter function (``p_fun``)
that is called at each MHE step. Pass this function to the MHE by calling :py:func:`set_p_fun`.
.. note::
The combination of :py:func:`get_p_template` and :py:func:`set_p_fun` is
identical to the :py:class:`do_mpc.simulator.Simulator` methods, if the MHE
is not estimating any parameters.
:return: p_template
:rtype: struct_symSX
"""
return self._p_set(0)
def set_p_fun(self, p_fun):
"""Set function which returns parameters..
The ``p_fun`` is called at each MHE time step and returns the (fixed) parameters.
The function must return a numerical CasADi structure, which can be retrieved with :py:func:`get_p_template`.
:param p_fun: Parameter function.
:type p_fun: function
"""
assert self.get_p_template().labels() == p_fun(0).labels(), 'Incorrect output of p_fun. Use get_p_template to obtain the required structure.'
self.p_fun = p_fun
self.flags['set_p_fun'] = True
def get_y_template(self):
"""Obtain output template for :py:func:`set_y_fun`.
Use this structure as the return of a user defined parameter function (``y_fun``)
that is called at each MHE step. Pass this function to the MHE by calling :py:func:`set_y_fun`.
The structure carries a set of measurements **for each time step of the horizon** and can be accessed as follows:
::
y_template['y_meas', k, 'meas_name']
# Slicing is possible, e.g.:
y_template['y_meas', :, 'meas_name']
where ``k`` runs from ``0`` to ``N_horizon`` and ``meas_name`` refers to the user-defined names in :py:class:`do_mpc.model.Model`.
.. note::
The structure is ordered, sucht that ``k=0`` is the "oldest measurement" and ``k=N_horizon`` is the newest measurement.
By default, the following measurement function is choosen:
::
y_template = self.get_y_template()
def y_fun(t_now):
n_steps = min(self.data._y.shape[0], self.n_horizon)
for k in range(-n_steps,0):
y_template['y_meas',k] = self.data._y[k]
try:
for k in range(self.n_horizon-n_steps):
y_template['y_meas',k] = self.data._y[-n_steps]
except:
None
return y_template
Which simply reads the last results from the ``MHE.data`` object.
:return: y_template
:rtype: struct_symSX
"""
y_template = struct_symSX([
entry('y_meas', repeat=self.n_horizon, struct=self._y_meas)
])
return y_template(0)
def set_y_fun(self, y_fun):
"""Set the measurement function. The function must return a CasADi structure which can be obtained
from :py:func:`get_y_template`. See the respective doc string for details.
:param y_fun: measurement function.
:type y_fun: function
"""
assert self.get_y_template().labels() == y_fun(0).labels(), 'Incorrect output of y_fun. Use get_y_template to obtain the required structure.'
self.y_fun = y_fun
self.flags['set_y_fun'] = True
def _check_validity(self):
"""Private method to be called in :py:func:`setup`. Checks if the configuration is valid and
if the optimization problem can be constructed.
Furthermore, default values are set if they were not configured by the user (if possible).
Specifically, we set dummy values for the ``tvp_fun`` and ``p_fun`` if they are not present in the model
and the default measurement function.
"""
# Objective mus be defined.
if self.flags['set_objective'] == False:
raise Exception('Objective is undefined. Please call .set_objective() or .set_default_objective() prior to .setup().')
# tvp_fun must be set, if tvp are defined in model.
if self.flags['set_tvp_fun'] == False and self.model._tvp.size > 0:
raise Exception('You have not supplied a function to obtain the time-varying parameters defined in model. Use .set_tvp_fun() prior to setup.')
# p_fun must be set, if p are defined in model.
if self.flags['set_p_fun'] == False and self._p_set.size > 0:
raise Exception('You have not supplied a function to obtain the parameters defined in model. Use .set_p_fun() prior to setup.')
# Lower bounds should be lower than upper bounds:
for lb, ub in zip([self._x_lb, self._u_lb, self._z_lb], [self._x_ub, self._u_ub, self._z_ub]):
bound_check = lb.cat > ub.cat
bound_fail = [label_i for i,label_i in enumerate(lb.labels()) if bound_check[i]]
if np.any(bound_check):
raise Exception('Your bounds are inconsistent. For {} you have lower bound > upper bound.'.format(bound_fail))
# Set dummy functions for tvp and p in case these parameters are unused.
if 'tvp_fun' not in self.__dict__:
_tvp = self.get_tvp_template()
def tvp_fun(t): return _tvp
self.set_tvp_fun(tvp_fun)
if 'p_fun' not in self.__dict__:
_p = self.get_p_template()
def p_fun(t): return _p
self.set_p_fun(p_fun)
if self.flags['set_y_fun'] == False and self.meas_from_data:
# Case that measurement function is automatically created.
y_template = self.get_y_template()
def y_fun(t_now):
n_steps = min(self.data._y.shape[0], self.n_horizon)
for k in range(-n_steps,0):
y_template['y_meas',k] = self.data._y[k]
try:
for k in range(self.n_horizon-n_steps):
y_template['y_meas',k] = self.data._y[-n_steps]
except:
None
return y_template
self.set_y_fun(y_fun)
elif self.flags['set_y_fun'] == True:
# Case that the user supplied a measurement function.
pass
else:
# No measurement function.
raise Exception('You have not suppplied a measurement function. Use .set_y_fun or set parameter meas_from_data to True for default function.')
def set_initial_guess(self):
"""Initial guess for optimization variables.
Uses the current class attributes :py:obj:`x0`, :py:obj:`z0` and :py:obj:`u0`, :py:obj:`p_est0` to create an initial guess for the MHE.
The initial guess is simply the initial values for all :math:`k=0,\dots,N` instances of :math:`x_k`, :math:`u_k` and :math:`z_k`, :math:`p_{\\text{est,k}}`.
.. warning::
If no initial values for :py:attr:`x0`, :py:attr:`z0` and :py:attr:`u0` were supplied during setup, these default to zero.
.. note::
The initial guess is fully customizable by directly setting values on the class attribute:
:py:attr:`opt_x_num`.
"""
assert self.flags['setup'] == True, 'mhe was not setup yet. Please call mhe.setup().'
self.opt_x_num['_x'] = self._x0.cat/self._x_scaling
self.opt_x_num['_u'] = self._u0.cat/self._u_scaling
self.opt_x_num['_z'] = self._z0.cat/self._z_scaling
self.opt_x_num['_p_est'] = self._p_est0.cat/self._p_est_scaling
self.flags['set_initial_guess'] = True
def setup(self):
"""The setup method finalizes the MHE creation.
The optimization problem is created based on the configuration of the module.
.. note::
After this call, the :py:func:`solve` and :py:func:`make_step` method is applicable.
"""
self._setup_nl_cons()
# Concatenate _p_est_scaling und _p_set_scaling to p_scaling (and make it a struct again)
self._p_scaling = self.model._p(self._p_cat_fun(self._p_est_scaling, self._p_set_scaling))
# Gather meta information:
meta_data = {key: getattr(self, key) for key in self.data_fields}
self.data.set_meta(**meta_data)
self._check_validity()
self._setup_mhe_optim_problem()
self._prepare_data()
self.flags['setup'] = True
def make_step(self, y0):
"""Main method of the class during runtime. This method is called at each timestep
and returns the current state estimate for the current measurement ``y0``.
The method prepares the MHE by setting the current parameters, calls :py:func:`solve`
and updates the :py:class:`do_mpc.data.Data` object.
.. warning::
Moving horizon estimation will only work reliably once **a full sequence of measurements**
corresponding to the set horizon ist available.
:param y0: Current measurement.
:type y0: numpy.ndarray
:return: x0, estimated state of the system.
:rtype: numpy.ndarray
"""
# Check setup.
assert self.flags['setup'] == True, 'optimizer was not setup yet. Please call optimizer.setup().'
# Check input type.
if isinstance(y0, (np.ndarray, casadi.DM)):
pass
elif isinstance(y0, structure3.DMStruct):
y0 = y0.cat
else:
raise Exception('Invalid type {} for y0. Must be {}'.format(type(y0), (np.ndarray, casadi.DM, structre3.DMStruct)))
# Check input shape.
n_val = np.prod(y0.shape)
assert n_val == self.model.n_y, 'Wrong input with shape {}. Expected vector with {} elements'.format(n_val, self.model.n_y)
# Check (once) if the initial guess was supplied.
if not self.flags['set_initial_guess']:
warnings.warn('Intial guess for the optimizer was not set. The solver call is likely to fail.')
time.sleep(5)
# Since do-mpc is warmstarting, the initial guess will exist after the first call.
self.flags['set_initial_guess'] = True
self.data.update(_y = y0)
p_est0 = self._p_est0
x0 = self._x0
t0 = self._t0
tvp0 = self.tvp_fun(t0)
p_set0 = self.p_fun(t0)
y_traj = self.y_fun(t0)
self.opt_p_num['_x_prev'] = self.opt_x_num['_x', 1, -1]*self._x_scaling
self.opt_p_num['_p_est_prev'] = p_est0
self.opt_p_num['_p_set'] = p_set0
self.opt_p_num['_tvp'] = tvp0['_tvp']
self.opt_p_num['_y_meas'] = y_traj['y_meas']
self.solve()
# Extract solution:
x_next = self.opt_x_num['_x', -1, -1]*self._x_scaling
p_est_next = self.opt_x_num['_p_est']*self._p_est_scaling
u0 = self.opt_x_num['_u', -1]*self._u_scaling
# Which z must be extracted here?
z0 = self.opt_x_num['_z', -1, -1]*self._z_scaling
aux0 = self.opt_aux_num['_aux', -1]
p0 = self._p_cat_fun(p_est0, p_set0)
# Update data object:
self.data.update(_x = x0)
self.data.update(_u = u0)
self.data.update(_z = z0)
self.data.update(_p = p0)
self.data.update(_tvp = tvp0['_tvp', -1])
self.data.update(_time = t0)
self.data.update(_aux = aux0)
# Store additional information
self.data.update(opt_p_num = self.opt_p_num)
if self.store_full_solution == True:
opt_x_num_unscaled = self.opt_x_num_unscaled
opt_aux_num = self.opt_aux_num
self.data.update(_opt_x_num = opt_x_num_unscaled)
self.data.update(_opt_aux_num = opt_aux_num)
if self.store_lagr_multiplier == True:
lam_g_num = self.lam_g_num
self.data.update(_lam_g_num = lam_g_num)
if len(self.store_solver_stats) > 0:
solver_stats = self.solver_stats
store_solver_stats = self.store_solver_stats
self.data.update(**{stat_i: value for stat_i, value in solver_stats.items() if stat_i in store_solver_stats})
# Update initial
self._t0 = self._t0 + self.t_step
self._x0.master = x_next
self._p_est0.master = p_est_next
self._u0.master = u0
self._z0.master = z0
return x_next.full()
def _setup_mhe_optim_problem(self):
"""Private method of the MHE class to construct the MHE optimization problem.
The method depends on inherited methods from the :py:class:`do_mpc.optimizer.Optimizer`,
such as :py:func:`do_mpc.optimizer.Optimizer._setup_discretization` and
:py:func:`do_mpc.optimizer.Optimizer._setup_scenario_tree`.
The MPC has a similar method with similar structure.
"""
# Obtain an integrator (collocation, discrete-time) and the amount of intermediate (collocation) points
ifcn, n_total_coll_points = self._setup_discretization()
# Create struct for optimization variables:
self.opt_x = opt_x = struct_symSX([
entry('_x', repeat=[self.n_horizon+1, 1+n_total_coll_points], struct=self.model._x),
entry('_z', repeat=[self.n_horizon, max(n_total_coll_points,1)], struct=self.model._z),
entry('_u', repeat=[self.n_horizon], struct=self.model._u),
entry('_w', repeat=[self.n_horizon], struct=self.model._w),
entry('_v', repeat=[self.n_horizon], struct=self.model._v),
entry('_eps', repeat=[self.n_horizon], struct=self._eps),
entry('_p_est', struct=self._p_est),
])
self.n_opt_x = self.opt_x.shape[0]
# NOTE: The entry _x[k,:] starts with the collocation points from s to b at time k
# and the last point contains the child node
# NOTE: Currently there exist dummy collocation points for the initial state (for each branch)
# Create scaling struct as assign values for _x, _u, _z.
self.opt_x_scaling = opt_x_scaling = opt_x(1)
opt_x_scaling['_x'] = self._x_scaling
# TODO: Should _w be scaled?
opt_x_scaling['_z'] = self._z_scaling
opt_x_scaling['_u'] = self._u_scaling
opt_x_scaling['_p_est'] = self._p_est_scaling
# opt_x are unphysical (scaled) variables. opt_x_unscaled are physical (unscaled) variables.
self.opt_x_unscaled = opt_x_unscaled = opt_x(opt_x.cat * opt_x_scaling)
# Create struct for optimization parameters:
self.opt_p = opt_p = struct_symSX([
entry('_x_prev', struct=self.model._x),
entry('_p_est_prev', struct=self._p_est_prev),
entry('_p_set', struct=self._p_set),
entry('_tvp', repeat=self.n_horizon, struct=self.model._tvp),
entry('_y_meas', repeat=self.n_horizon, struct=self.model._y),
])
self.n_opt_p = opt_p.shape[0]
# Dummy struct with symbolic variables
self.aux_struct = struct_symSX([
entry('_aux', repeat=[self.n_horizon], struct=self.model._aux_expression)
])
# Create mutable symbolic expression from the struct defined above.
self.opt_aux = opt_aux = struct_SX(self.aux_struct)
self.n_opt_aux = self.opt_aux.shape[0]
self.lb_opt_x = opt_x(-np.inf)
self.ub_opt_x = opt_x(np.inf)
# Initialize objective function and constraints
obj = 0
cons = []
cons_lb = []
cons_ub = []
# Arrival cost:
arrival_cost = self.arrival_cost_fun(
opt_x_unscaled['_x', 0, -1],
opt_p['_x_prev'],#/self._x_scaling,
opt_x_unscaled['_p_est'],
opt_p['_p_est_prev'],#/self._p_est_scaling
opt_p['_p_set']
)
obj += arrival_cost
# Get concatenated parameters vector containing the estimated and fixed parameters (scaled)
_p = self._p_cat_fun(self.opt_x['_p_est'], self.opt_p['_p_set']/self._p_set_scaling)
# For all control intervals
for k in range(self.n_horizon):
# Compute constraints and predicted next state of the discretization scheme
col_xk = vertcat(*opt_x['_x', k+1, :-1])
col_zk = vertcat(*opt_x['_z', k])
[g_ksb, xf_ksb] = ifcn(opt_x['_x', k, -1], col_xk,
opt_x['_u', k], col_zk, opt_p['_tvp', k],
_p, opt_x['_w', k])
# Compute current measurement
yk_calc = self.model._meas_fun(opt_x_unscaled['_x', k+1, -1], opt_x_unscaled['_u', k], opt_x_unscaled['_z', k, 0],
opt_p['_tvp', k], _p, opt_x_unscaled['_v', k])
# Add the collocation equations
cons.append(g_ksb)
cons_lb.append(np.zeros(g_ksb.shape[0]))
cons_ub.append(np.zeros(g_ksb.shape[0]))
# Add continuity constraints
cons.append(xf_ksb - opt_x['_x', k+1, -1])
cons_lb.append(np.zeros((self.model.n_x, 1)))
cons_ub.append(np.zeros((self.model.n_x, 1)))
# Add measurement constraints
cons.append(yk_calc - opt_p['_y_meas', k])
cons_lb.append(np.zeros((self.model.n_y, 1)))
cons_ub.append(np.zeros((self.model.n_y, 1)))
if self.nl_cons_check_colloc_points:
# Ensure nonlinear constraints on all collocation points
for i in range(n_total_coll_points):
nl_cons_k = self._nl_cons_fun(
opt_x_unscaled['_x', k, i], opt_x_unscaled['_u', k], opt_x_unscaled['_z', k, i],
opt_p['_tvp', k], _p, opt_x_unscaled['_eps', k])
cons.append(nl_cons_k)
cons_lb.append(self._nl_cons_lb)
cons_ub.append(self._nl_cons_ub)
else:
# Ensure nonlinear constraints only on the beginning of the FE
nl_cons_k = self._nl_cons_fun(
opt_x_unscaled['_x', k, -1], opt_x_unscaled['_u', k], opt_x_unscaled['_z', k, 0],
opt_p['_tvp', k], _p, opt_x_unscaled['_eps', k])
cons.append(nl_cons_k)
cons_lb.append(self._nl_cons_lb)
cons_ub.append(self._nl_cons_ub)
cons.append(nl_cons_k)
cons_lb.append(self._nl_cons_lb)
cons_ub.append(self._nl_cons_ub)
obj += self.stage_cost_fun(
opt_x_unscaled['_w', k], opt_x_unscaled['_v', k], opt_p['_tvp', k], _p
)
# Add slack variables to the cost
obj += self.epsterm_fun(opt_x_unscaled['_eps', k])
# Calculate the auxiliary expressions for the current scenario:
opt_aux['_aux', k] = self.model._aux_expression_fun(
opt_x_unscaled['_x', k, -1], opt_x_unscaled['_u', k], opt_x_unscaled['_z', k, -1], opt_p['_tvp', k], _p)
if self.cons_check_colloc_points: # Constraints for all collocation points.
# Bounds for the states on all discretize values along the horizon
self.lb_opt_x['_x'] = self._x_lb.cat/self._x_scaling
self.ub_opt_x['_x'] = self._x_ub.cat/self._x_scaling
# Bounds for the algebraic states along the horizon
self.lb_opt_x['_z'] = self._z_lb.cat/self._z_scaling
self.ub_opt_x['_z'] = self._z_ub.cat/self._z_scaling
else: # Constraints only at the beginning of the finite Element
# Bounds for the states on all discretize values along the horizon
self.lb_opt_x['_x', 1:self.n_horizon, -1] = self._x_lb.cat/self._x_scaling
self.ub_opt_x['_x', 1:self.n_horizon, -1] = self._x_ub.cat/self._x_scaling
# Bounds for the algebraic states along the horizon
self.lb_opt_x['_z', :, 0] = self._z_lb.cat/self._z_scaling
self.ub_opt_x['_z', :, 0] = self._z_ub.cat/self._z_scaling
# Bounds for the inputs along the horizon
self.lb_opt_x['_u'] = self._u_lb.cat/self._u_scaling
self.ub_opt_x['_u'] = self._u_ub.cat/self._u_scaling
# Bounds for the slack variables along the horizon:
self.lb_opt_x['_eps'] = self._eps_lb.cat
self.ub_opt_x['_eps'] = self._eps_ub.cat
# Bounds for the inputs along the horizon
self.lb_opt_x['_p_est'] = self._p_est_lb.cat/self._p_est_scaling
self.ub_opt_x['_p_est'] = self._p_est_ub.cat/self._p_est_scaling
cons = vertcat(*cons)
self.cons_lb = vertcat(*cons_lb)
self.cons_ub = vertcat(*cons_ub)
self.n_opt_lagr = cons.shape[0]
# Create casadi optimization object:
nlpsol_opts = {
'expand': False,
'ipopt.linear_solver': 'mumps',
}.update(self.nlpsol_opts)
nlp = {'x': vertcat(opt_x), 'f': obj, 'g': cons, 'p': vertcat(opt_p)}
self.S = nlpsol('S', 'ipopt', nlp, self.nlpsol_opts)
# Create copies of these structures with numerical values (all zero):
self.opt_x_num = self.opt_x(0)
self.opt_x_num_unscaled = self.opt_x(0)
self.opt_p_num = self.opt_p(0)
self.opt_aux_num = self.opt_aux(0)
# Create function to caculate all auxiliary expressions:
self.opt_aux_expression_fun = Function('opt_aux_expression_fun', [opt_x, opt_p], [opt_aux])