#
# This file is part of do-mpc
#
# do-mpc: An environment for the easy, modular and efficient implementation of
# robust nonlinear model predictive control
#
# Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
# TU Dortmund. All rights reserved
#
# do-mpc is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# do-mpc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with do-mpc. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from casadi import *
from casadi.tools import *
import pdb
import itertools
import time
import warnings
from do_mpc.tools.indexedproperty import IndexedProperty
[docs]class Optimizer:
"""The base clase for the optimization based state estimation (MHE) and predictive controller (MPC).
This class establishes the jointly used attributes, methods and properties.
.. warning::
The ``Optimizer`` base class can not be used independently.
"""
def __init__(self):
assert 'model' in self.__dict__.keys(), 'Cannot initialize the optimizer before assigning the model to the current class instance.'
# Initialize structures for bounds, scaling, initial values by calling the symbolic structures defined in the model
# with the default numerical value.
# This returns an identical numerical structure with all values set to the passed value.
self._x_lb = self.model._x(-np.inf)
self._x_ub = self.model._x(np.inf)
self._u_lb = self.model._u(-np.inf)
self._u_ub = self.model._u(np.inf)
self._z_lb = self.model._z(-np.inf)
self._z_ub = self.model._z(np.inf)
self._x_scaling = self.model._x(1.0)
self._u_scaling = self.model._u(1.0)
self._z_scaling = self.model._z(1.0)
self._p_scaling = self.model._p(1.0) # only meaningful for MHE.
# Lists for further non-linear constraints (optional). Constraints are formulated as cons < ub
self.nl_cons_list = [
{'expr_name': 'default', 'expr': DM(), 'ub': DM()}
]
self.slack_vars_list = [
{'slack_name': 'default', 'var':SX.sym('default',(0,0)), 'ub': DM()}
]
self.slack_cost = 0
@IndexedProperty
def bounds(self, ind):
"""Query and set bounds of the optimization variables.
The :py:func:`bounds` method is an indexed property, meaning
getting and setting this property requires an index and calls this function.
The power index (elements are seperated by comas) must contain atleast the following elements:
====== ================= ==========================================================
order index name valid options
====== ================= ==========================================================
1 bound type ``lower`` and ``upper``
2 variable type ``_x``, ``_u`` and ``_z`` (and ``_p_est`` for MHE)
3 variable name Names defined in :py:class:`do_mpc.model.Model`.
====== ================= ==========================================================
Further indices are possible (but not neccessary) when the referenced variable is a vector or matrix.
**Example**:
::
# Set with:
optimizer.bounds['lower','_x', 'phi_1'] = -2*np.pi
optimizer.bounds['upper','_x', 'phi_1'] = 2*np.pi
# Query with:
optimizer.bounds['lower','_x', 'phi_1']
"""
assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).'
assert len(ind)>=3, 'Power index must include bound_type, var_type, var_name (as a tuple).'
bound_type = ind[0]
var_type = ind[1]
var_name = ind[2:]
err_msg = 'Invalid power index {} for bound_type. Must be from (lower, upper).'
assert bound_type in ('lower', 'upper'), err_msg.format(bound_type)
err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).'
assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type)
if bound_type == 'lower':
query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='lb')
elif bound_type == 'upper':
query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='ub')
# query results string e.g. _x_lb, _x_ub, _u_lb, u_ub ....
# Get the desired struct:
var_struct = getattr(self, query)
err_msg = 'Calling .bounds with {} is not valid. Possible keys are {}.'
assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), msg.format(ind, var_struct.keys())
return var_struct[var_name]
@bounds.setter
def bounds(self, ind, val):
"""See Docstring for bounds getter method"""
assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).'
assert len(ind)>=3, 'Power index must include bound_type, var_type, var_name (as a tuple).'
bound_type = ind[0]
var_type = ind[1]
var_name = ind[2:]
err_msg = 'Invalid power index {} for bound_type. Must be from (lower, upper).'
assert bound_type in ('lower', 'upper'), err_msg.format(bound_type)
err_msg = 'Invalid power index {} for var_type. Must be from (_x, _u, _z, _p_est).'
assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type)
if bound_type == 'lower':
query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='lb')
elif bound_type == 'upper':
query = '{var_type}_{bound_type}'.format(var_type=var_type, bound_type='ub')
# query results string e.g. _x_lb, _x_ub, _u_lb, u_ub ....
# Get the desired struct:
var_struct = getattr(self, query)
err_msg = 'Calling .bounds with {} is not valid. Possible keys are {}.'
assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), msg.format(ind, var_struct.keys())
# Set value on struct:
var_struct[var_name] = val
@IndexedProperty
def scaling(self, ind):
"""Query and set scaling of the optimization variables.
The :py:func:`Optimizer.scaling` method is an indexed property, meaning
getting and setting this property requires an index and calls this function.
The power index (elements are seperated by comas) must contain atleast the following elements:
====== ================= ==========================================================
order index name valid options
====== ================= ==========================================================
1 variable type ``_x``, ``_u`` and ``_z`` (and ``_p_est`` for MHE)
2 variable name Names defined in :py:class:`do_mpc.model.Model`.
====== ================= ==========================================================
Further indices are possible (but not neccessary) when the referenced variable is a vector or matrix.
**Example**:
::
# Set with:
optimizer.scaling['_x', 'phi_1'] = 2
optimizer.scaling['_x', 'phi_2'] = 2
# Query with:
optimizer.scaling['_x', 'phi_1']
Scaling factors :math:`a` affect the MHE / MPC optimization problem. The optimization variables are scaled variables:
.. math::
\\bar\\phi = \\frac{\\phi}{a_{\\phi}} \\quad \\forall \\phi \\in [x, u, z, p_{\\text{est}}]
Scaled variables are used to formulate the bounds :math:`\\bar\\phi_{lb} \\leq \\bar\\phi_{ub}`
and for the evaluation of the ODE. For the objective function and the nonlinear constraints
the unscaled variables are used. The algebraic equations are also not scaled.
.. note::
Scaling the optimization problem is suggested when states and / or inputs take on values
which differ by orders of magnitude.
"""
assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).'
assert len(ind)>=2, 'Power index must include bound_type, var_type, var_name (as a tuple).'
var_type = ind[0]
var_name = ind[1:]
err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).'
assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type)
query = '{var_type}_scaling'.format(var_type=var_type)
# query results string e.g. _x_scaling, _u_scaling
# Get the desired struct:
var_struct = getattr(self, query)
err_msg = 'Calling .scaling with {} is not valid. Possible keys are {}.'
assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), msg.format(ind, var_struct.keys())
return var_struct[var_name]
@scaling.setter
def scaling(self, ind, val):
"""See Docstring for scaling getter method"""
assert isinstance(ind, tuple), 'Power index must include bound_type, var_type, var_name (as a tuple).'
assert len(ind)>=2, 'Power index must include bound_type, var_type, var_name (as a tuple).'
var_type = ind[0]
var_name = ind[1:]
err_msg = 'Invalid power index {} for var_type. Must be from (_x, states, _u, inputs, _z, algebraic).'
assert var_type in ('_x', '_u', '_z', '_p_est'), err_msg.format(var_type)
query = '{var_type}_scaling'.format(var_type=var_type)
# query results string e.g. _x_scaling, _u_scaling
# Get the desired struct:
var_struct = getattr(self, query)
err_msg = 'Calling .scaling with {} is not valid. Possible keys are {}.'
assert (var_name[0] if isinstance(var_name, tuple) else var_name) in var_struct.keys(), msg.format(ind, var_struct.keys())
var_struct[var_name] = val
def reset_history(self):
"""Reset the history of the optimizer.
All data from the :py:class:`do_mpc.data.Data` instance is removed.
"""
self.data.init_storage()
self._t0 = np.array([0])
def _prepare_data(self):
"""Write optimizer meta data to data object (all params set in self.data_fields).
If selected, initialize the container for the full solution of the optimizer.
"""
self.data.data_fields.update({'_eps': self.n_eps})
self.data.data_fields.update({'opt_p_num': self.n_opt_p})
self.data.opt_p = self.opt_p
if self.store_full_solution == True:
# Create data_field for the optimal solution.
self.data.data_fields.update({'_opt_x_num': self.n_opt_x})
self.data.data_fields.update({'_opt_aux_num': self.n_opt_aux})
self.data.opt_x = self.opt_x
# aux_struct is the struct_symSX variant of opt_aux (which is struct_SX). struct_SX cannot be unpickled (bug).
# See: https://groups.google.com/forum/#!topic/casadi-users/dqAb4tnA2ik
self.data.opt_aux = self.aux_struct
if self.store_lagr_multiplier == True:
# Create data_field for the lagrange multipliers
self.data.data_fields.update({'_lam_g_num': self.n_opt_lagr})
if len(self.store_solver_stats) > 0:
# These are valid arguments for solver stats:
solver_stats = ['iter_count', 'iterations', 'n_call_S', 'n_call_callback_fun',
'n_call_nlp_f', 'n_call_nlp_g', 'n_call_nlp_grad', 'n_call_nlp_grad_f',
'n_call_nlp_hess_l', 'n_call_nlp_jac_g', 'return_status', 'success', 't_proc_S',
't_proc_callback_fun', 't_proc_nlp_f', 't_proc_nlp_g', 't_proc_nlp_grad',
't_proc_nlp_grad_f', 't_proc_nlp_hess_l', 't_proc_nlp_jac_g', 't_wall_S',
't_wall_callback_fun', 't_wall_nlp_f', 't_wall_nlp_g', 't_wall_nlp_grad', 't_wall_nlp_grad_f',
't_wall_nlp_hess_l', 't_wall_nlp_jac_g']
# Create data_field(s) for the recorded (valid) stats.
for stat_i in self.store_solver_stats:
assert stat_i in solver_stats, 'The requested {} is not a valid solver stat and cannot be recorded. Please supply one of the following (or none): {}'.format(stat_i, solver_stats)
self.data.data_fields.update({stat_i: 1})
self.data.init_storage()
def set_nl_cons(self, expr_name, expr, ub=np.inf, soft_constraint=False, penalty_term_cons=1, maximum_violation=np.inf):
"""Introduce new constraint to the class. Further constraints are optional.
Expressions must be formulated with respect to ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``.
They are implemented as:
.. math::
m(x,u,z,p_{\\text{tv}}, p) \\leq m_{\\text{ub}}
Setting the flag ``soft_constraint=True`` will introduce slack variables :math:`\\epsilon`, such that:
.. math::
m(x,u,z,p_{\\text{tv}}, p)-\\epsilon &\\leq m_{\\text{ub}},\\\\
0 &\\leq \\epsilon \\leq \\epsilon_{\\text{max}},
Slack variables are added to the cost function and multiplied with the supplied penalty term.
This formulation makes constraints soft, meaning that a certain violation is tolerated and does not lead to infeasibility.
Typically, high values for the penalty are suggested to avoid significant violation of the constraints.
:param expr_name: Arbitrary name for the given expression. Names are used for key word indexing.
:type expr_name: string
:param expr: CasADi SX or MX function depending on ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``.
:type expr: CasADi SX or MX
:raises assertion: expr_name must be str
:raises assertion: expr must be a casadi SX or MX type
:return: Returns the newly created expression. Expression can be used e.g. for the RHS.
:rtype: casadi.SX
"""
assert self.flags['setup'] == False, 'Cannot call .set_expression after .setup_model.'
assert isinstance(expr_name, str), 'expr_name must be str, you have: {}'.format(type(expr_name))
assert isinstance(expr, (casadi.SX, casadi.MX)), 'expr must be a casadi SX or MX type, you have: {}'.format(type(expr))
assert isinstance(ub, (int, float, np.ndarray)), 'ub must be float, int or numpy.ndarray, you have: {}'.format(type(ub))
assert isinstance(soft_constraint, bool), 'soft_constraint must be boolean, you have: {}'.format(type(soft_constraint))
if soft_constraint==True:
# Introduce new slack variable:
epsilon = SX.sym('eps_'+expr_name,*expr.shape)
# Change expression
expr = expr-epsilon
# Add slack variable to list of slack variables:
self.slack_vars_list.extend([
{'slack_name': expr_name, 'var': epsilon, 'ub': maximum_violation}
])
# Add cost contribution:
self.slack_cost += sum1(penalty_term_cons*epsilon)
self.nl_cons_list.extend([
{'expr_name': expr_name, 'expr': expr, 'ub' : ub}])
return expr
def _setup_nl_cons(self):
"""Private method that is called from :py:func:`do_mpc.controller.MPC.setup` or :py:func:`do_mpc.estimator.MHE.setup`.
Afterwards no further non-linear constraints can be added with the :py:func:`Optimizer.set_nl_cons` method.
This is not part of the public API. Do not call this method.
"""
# Create struct for soft constraints:
self._eps = _eps = struct_symSX([
entry(slack_i['slack_name'], sym=slack_i['var']) for slack_i in self.slack_vars_list
])
self.n_eps = _eps.shape[0]
# Create bounds:
self._eps_lb = _eps(0.0)
self._eps_ub = _eps(np.inf)
# Set bounds:
for slack_i in self.slack_vars_list:
self._eps_ub[slack_i['slack_name']] = slack_i['ub']
# Objective function epsilon contribution:
self.epsterm_fun = Function('epsterm', [_eps], [self.slack_cost])
# Create struct for _nl_cons:
# Use the previously defined SX.sym variables to declare shape and symbolic variable.
self._nl_cons = struct_SX([
entry(expr_i['expr_name'], expr=expr_i['expr']) for expr_i in self.nl_cons_list
])
# Make function from these expressions:
_x, _u, _z, _tvp, _p = self.model['x', 'u', 'z', 'tvp', 'p']
self._nl_cons_fun = Function('nl_cons_fun', [_x, _u, _z, _tvp, _p, _eps], [self._nl_cons])
# Create bounds:
self._nl_cons_ub = self._nl_cons(np.inf)
self._nl_cons_lb = self._nl_cons(-np.inf)
# Set bounds:
for nl_cons_i in self.nl_cons_list:
self._nl_cons_ub[nl_cons_i['expr_name']] = nl_cons_i['ub']
def get_tvp_template(self):
"""Obtain output template for :py:func:`set_tvp_fun`.
The method returns a structured object with ``n_horizon+1`` elements,
and a set of time-varying parameters (as defined in :py:class:`do_mpc.model.Model`)
for each of these instances. The structure is initialized with all zeros.
Use this object to define values of the time-varying parameters.
This structure (with numerical values) should be used as the output of the ``tvp_fun`` function which is set to the class with :py:func:`set_tvp_fun`.
Use the combination of :py:func:`get_tvp_template` and :py:func:`set_tvp_fun`.
**Example:**
::
# in model definition:
alpha = model.set_variable(var_type='_tvp', var_name='alpha')
beta = model.set_variable(var_type='_tvp', var_name='beta')
...
# in optimizer configuration:
tvp_temp_1 = optimizer.get_tvp_template()
tvp_temp_1['_tvp', :] = np.array([1,1])
tvp_temp_2 = optimizer.get_tvp_template()
tvp_temp_2['_tvp', :] = np.array([0,0])
def tvp_fun(t_now):
if t_now<10:
return tvp_temp_1
else:
tvp_temp_2
optimizer.set_tvp_fun(tvp_fun)
:return: None
:rtype: None
"""
tvp_template = struct_symSX([
entry('_tvp', repeat=self.n_horizon+1, struct=self.model._tvp)
])
return tvp_template(0)
def set_tvp_fun(self, tvp_fun):
""" Set function which returns time-varying parameters.
The ``tvp_fun`` is called at each optimization step to get the current prediction of the time-varying parameters.
The supplied function must be callable with the current time as the only input. Furthermore, the function must return
a CasADi structured object which is based on the horizon and on the model definition. The structure can be obtained with
:py:func:`get_tvp_template`.
**Example:**
::
# in model definition:
alpha = model.set_variable(var_type='_tvp', var_name='alpha')
beta = model.set_variable(var_type='_tvp', var_name='beta')
...
# in optimizer configuration:
tvp_temp_1 = optimizer.get_tvp_template()
tvp_temp_1['_tvp', :] = np.array([1,1])
tvp_temp_2 = optimizer.get_tvp_template()
tvp_temp_2['_tvp', :] = np.array([0,0])
def tvp_fun(t_now):
if t_now<10:
return tvp_temp_1
else:
tvp_temp_2
optimizer.set_tvp_fun(tvp_fun)
.. note::
The method :py:func:`set_tvp_fun`. must be called prior to setup IF time-varying parameters are defined in the model.
It is not required to call the method if no time-varying parameters are defined.
:param tvp_fun: Function that returns the predicted tvp values at each timestep. Must have single input (float) and return a ``structure3.DMStruct`` (obtained with :py:func:`get_tvp_template`).
:type tvp_fun: function
"""
assert isinstance(tvp_fun(0), structure3.DMStruct), 'Incorrect output of tvp_fun. Use get_tvp_template to obtain the required structure.'
assert self.get_tvp_template().labels() == tvp_fun(0).labels(), 'Incorrect output of tvp_fun. Use get_tvp_template to obtain the required structure.'
self.flags['set_tvp_fun'] = True
self.tvp_fun = tvp_fun
def solve(self):
"""Solves the optmization problem.
The current problem is defined by the parameters in the
:py:attr:`opt_p_num` CasADi structured Data.
Typically, :py:attr:`opt_p_num` is prepared for the current iteration in the :py:func:`make_step` method.
It is, however, valid and possible to directly set paramters in :py:attr:`opt_p_num` before calling :py:func:`solve`.
The method updates the :py:attr:`opt_p_num` and :py:attr:`opt_x_num` attributes of the class.
By resetting :py:attr:`opt_x_num` to the current solution, the method implicitly
enables **warmstarting the optimizer** for the next iteration, since this vector is always used as the initial guess.
.. warning::
The method is part of the public API but it is generally not advised to use it.
Instead we recommend to call :py:func:`make_step` at each iterations, which acts as a wrapper
for :py:func:`solve`.
:raises asssertion: Optimizer was not setup yet.
:return: None
:rtype: None
"""
assert self.flags['setup'] == True, 'optimizer was not setup yet. Please call optimizer.setup().'
r = self.S(x0=self.opt_x_num, lbx=self.lb_opt_x, ubx=self.ub_opt_x,
ubg=self.cons_ub, lbg=self.cons_lb, p=self.opt_p_num)
# Note: .master accesses the underlying vector of the structure.
self.opt_x_num.master = r['x']
self.opt_x_num_unscaled.master = r['x']*self.opt_x_scaling
self.opt_g_num = r['g']
# Values of lagrange multipliers:
self.lam_g_num = r['lam_g']
self.lam_x_num = r['lam_x']
self.solver_stats = self.S.stats()
# Calculate values of auxiliary expressions (defined in model)
self.opt_aux_num.master = self.opt_aux_expression_fun(
self.opt_x_num,
self.opt_p_num
)
def _setup_discretization(self):
"""Private method that creates the discretization for the optimizer (MHE or MPC).
Returns the integrator function (``ifcn``) and the total number of collocation points.
The following discretization methods are available:
* orthogonal collocation
* discrete dynamics
Discretization parameters can be set with the :py:func:`do_mpc.controller.MPC.set_param` and
:py:func:`do_mpc.estimator.MHE.set_param` methods.
There is no point in calling this method as part of the public API.
"""
_x, _u, _z, _tvp, _p, _w = self.model['x', 'u', 'z', 'tvp', 'p', 'w']
rhs = substitute(self.model._rhs, _x, _x*self._x_scaling.cat)
rhs = substitute(rhs, _u, _u*self._u_scaling.cat)
rhs = substitute(rhs, _z, _z*self._z_scaling.cat)
rhs = substitute(rhs, _p, _p*self._p_scaling.cat) # only meaningful for MHE.
alg = substitute(self.model._alg, _x, _x*self._x_scaling.cat)
alg = substitute(alg, _u, _u*self._u_scaling.cat)
alg = substitute(alg, _z, _z*self._z_scaling.cat)
alg = substitute(alg, _p, _p*self._p_scaling.cat) # only meaningful for MHE.
if self.state_discretization == 'discrete':
_i = SX.sym('i', 0)
# discrete integrator ifcs mimics the API the collocation ifcn.
ifcn = Function('ifcn', [_x, _i, _u, _z, _tvp, _p, _w], [alg, rhs/self._x_scaling.cat])
n_total_coll_points = 0
if self.state_discretization == 'collocation':
ffcn = Function('ffcn', [_x, _u, _z, _tvp, _p, _w], [rhs/self._x_scaling.cat])
afcn = Function('afcn', [_x, _u, _z, _tvp, _p, _w], [alg])
# Get collocation information
coll = self.collocation_type
deg = self.collocation_deg
ni = self.collocation_ni
nk = self.n_horizon
t_step = self.t_step
n_x = self.model.n_x
n_u = self.model.n_u
n_p = self.model.n_p
n_z = self.model.n_z
n_w = self.model.n_w
n_tvp = self.model.n_tvp
n_total_coll_points = (deg + 1) * ni
# Choose collocation points
if coll == 'legendre': # Legendre collocation points
tau_root = [0] + collocation_points(deg, 'legendre')
elif coll == 'radau': # Radau collocation points
tau_root = [0] + collocation_points(deg, 'radau')
else:
raise Exception('Unknown collocation scheme')
# Size of the finite elements
h = t_step / ni
# Coefficients of the collocation equation
C = np.zeros((deg + 1, deg + 1))
# Coefficients of the continuity equation
D = np.zeros(deg + 1)
# Dimensionless time inside one control interval
tau = SX.sym("tau")
# All collocation time points
T = np.zeros((nk, ni, deg + 1))
for k in range(nk):
for i in range(ni):
for j in range(deg + 1):
T[k, i, j] = h * (k * ni + i + tau_root[j])
# For all collocation points
for j in range(deg + 1):
# Construct Lagrange polynomials to get the polynomial basis at the
# collocation point
L = 1
for r in range(deg + 1):
if r != j:
L *= (tau - tau_root[r]) / (tau_root[j] - tau_root[r])
lfcn = Function('lfcn', [tau], [L])
D[j] = lfcn(1.0)
# Evaluate the time derivative of the polynomial at all collocation
# points to get the coefficients of the continuity equation
tfcn = Function('tfcn', [tau], [tangent(L, tau)])
for r in range(deg + 1):
C[j, r] = tfcn(tau_root[r])
# Define symbolic variables for collocation
xk0 = SX.sym("xk0", n_x)
#zk = SX.sym("zk", n_z)
pk = SX.sym("pk", n_p)
tv_pk = SX.sym("tv_pk", n_tvp)
uk = SX.sym("uk", n_u)
wk = SX.sym("wk", n_w)
# State trajectory
n_ik = ni * (deg + 1) * n_x
ik = SX.sym("ik", n_ik)
ik_split = np.resize(np.array([], dtype=SX), (ni, deg + 1))
offset = 0
# Algebraic trajectory
n_zk = ni * (deg +1) * n_z
zk = SX.sym("zk", n_zk)
offset_z = 0
zk_split = np.resize(np.array([], dtype=SX), (ni, deg + 1))
# Store initial condition
ik_split[0, 0] = xk0
zk_split[0, 0] = zk[offset_z:offset_z + n_z]
offset_z += n_z
first_j = 1 # Skip allocating x for the first collocation point for the first finite element
# For each finite element
for i in range(ni):
# For each collocation point
for j in range(first_j, deg + 1):
# Get the expression for the state vector
ik_split[i, j] = ik[offset:offset + n_x]
zk_split[i, j] = zk[offset_z:offset_z + n_z]
offset_z += n_z
offset += n_x
# All collocation points in subsequent finite elements
first_j = 0
# Get the state at the end of the control interval
xkf = ik[offset:offset + n_x]
offset += n_x
# Check offset for consistency
assert(offset == n_ik)
assert(offset_z == n_zk)
# Constraints in the control interval
gk = []
lbgk = []
ubgk = []
# For all finite elements
for i in range(ni):
# for the first point:
a_i0 = afcn(ik_split[i, 0], uk, zk_split[i,0], tv_pk, pk, wk)
gk.append(a_i0)
lbgk.append(np.zeros(n_z))
ubgk.append(np.zeros(n_z))
# For all collocation points
for j in range(1, deg + 1):
# Get an expression for the state derivative at the coll point
xp_ij = 0
for r in range(deg + 1):
xp_ij += C[r, j] * ik_split[i, r]
# Add collocation equations to the NLP
f_ij = ffcn(ik_split[i, j], uk, zk_split[i,j], tv_pk, pk, wk)
gk.append(h * f_ij - xp_ij)
lbgk.append(np.zeros(n_x)) # equality constraints
ubgk.append(np.zeros(n_x)) # equality constraints
# algebraic constraints
a_ij = afcn(ik_split[i, j], uk, zk_split[i,j], tv_pk, pk, wk)
gk.append(a_ij)
lbgk.append(np.zeros(n_z))
ubgk.append(np.zeros(n_z))
# Get an expression for the state at the end of the finite element
xf_i = 0
for r in range(deg + 1):
xf_i += D[r] * ik_split[i, r]
# Add continuity equation to NLP
x_next = ik_split[i + 1, 0] if i + 1 < ni else xkf
gk.append(x_next - xf_i)
lbgk.append(np.zeros(n_x))
ubgk.append(np.zeros(n_x))
# Concatenate constraints
gk = vertcat(*gk)
lbgk = np.concatenate(lbgk)
ubgk = np.concatenate(ubgk)
assert(gk.shape[0] == ik.shape[0] + zk.shape[0])
# Create the integrator function
ifcn = Function("ifcn", [xk0, ik, uk, zk, tv_pk, pk, wk], [gk, xkf])
# Return the integration function and the number of collocation points
return ifcn, n_total_coll_points
def _setup_scenario_tree(self):
"""Private method that builds the scenario tree given the possible values of the uncertain parmeters.
By default all possible combinations of uncertain parameters are evaluated.
See the API in :py:class:`do_mpc.controller.MHE` for the high level / low level API.
This method is currently only used for the MPC controller.
There is no point in calling this method as part of the public API.
"""
n_p = self.model.n_p
nk = self.n_horizon
n_robust = self.n_robust
# Build auxiliary variables that code the structure of the tree
# Number of branches
n_branches = [self.n_combinations if k < n_robust else 1 for k in range(nk)]
# Calculate the number of scenarios (nodes at each stage)
n_scenarios = [self.n_combinations**min(k, n_robust) for k in range(nk + 1)]
# Scenaro tree structure
child_scenario = -1 * np.ones((nk, n_scenarios[-1], n_branches[0])).astype(int)
parent_scenario = -1 * np.ones((nk + 1, n_scenarios[-1])).astype(int)
branch_offset = -1 * np.ones((nk, n_scenarios[-1])).astype(int)
structure_scenario = np.zeros((nk + 1, n_scenarios[-1])).astype(int)
# Fill in the auxiliary structures
for k in range(nk):
# Scenario counter
scenario_counter = 0
# For all scenarios
for s in range(n_scenarios[k]):
# For all uncertainty realizations
for b in range(n_branches[k]):
child_scenario[k][s][b] = scenario_counter
structure_scenario[k][scenario_counter] = s
structure_scenario[k+1][scenario_counter] = s
parent_scenario[k + 1][scenario_counter] = s
scenario_counter += 1
# Store the range of branches
if n_robust == 0:
branch_offset[k][s] = 0
elif k < n_robust:
branch_offset[k][s] = 0
else:
branch_offset[k][s] = s % n_branches[0]
self.scenario_tree = {
'structure_scenario': structure_scenario,
'n_branches': n_branches,
'n_scenarios': n_scenarios,
'parent_scenario': parent_scenario,
'branch_offset': branch_offset
}
return n_branches, n_scenarios, child_scenario, parent_scenario, branch_offset