#
# This file is part of do-mpc
#
# do-mpc: An environment for the easy, modular and efficient implementation of
# robust nonlinear model predictive control
#
# Copyright (c) 2014-2019 Sergio Lucia, Alexandru Tatulea-Codrean
# TU Dortmund. All rights reserved
#
# do-mpc is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# do-mpc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with do-mpc. If not, see <http://www.gnu.org/licenses/>.
"""
Storage and handling of data.
"""
import numpy as np
import casadi.tools as castools
import pdb
import pickle
import do_mpc
import os
from typing import Union,Tuple,Dict
[docs]
class Data:
"""**do-mpc** data container. An instance of this class is created for the active **do-mpc** classes,
e.g. :py:class:`do_mpc.simulator.Simulator`, :py:class:`do_mpc.estimator.MHE`.
The class is initialized with an instance of the :py:class:`do_mpc.model.Model` which contains all
information about variables (e.g. states, inputs etc.).
The :py:class:`Data` class has a public API but is mostly used by other **do-mpc** classes, e.g. updated in the ``.make_step`` calls.
Args:
model: model object from the :py:class:`do_mpc.model`
"""
def __init__(self, model:Union[do_mpc.model.Model,do_mpc.model.LinearModel]):
self.dtype = 'default'
assert model.flags['setup'] == True, 'Model was not setup. After the complete model creation call model.setup().'
# As discussed here: https://groups.google.com/forum/#!topic/casadi-users/dqAb4tnA2ik
# struct_SX cannot be unpickled (seems like a bug)
# TODO: Find better workaround.
self.model = model.__dict__.copy()
self.model.pop('_rhs')
self.model.pop('_aux_expression')
self.model.pop('_y_expression')
self.model.pop('_alg')
self.model.pop('sv')
# TODO: n_aux not existing
#self._aux = np.empty((0, model.n_aux))
# Dictionary with possible data_fields in the class and their respective dimension. All data is numpy ndarray.
self.data_fields = {
'_time': 1,
'_x': model.n_x,
'_y': model.n_y,
'_u': model.n_u,
'_z': model.n_z,
'_tvp': model.n_tvp,
'_p': model.n_p,
'_aux': model.n_aux,
}
self.init_storage()
self.meta_data = {}
# Accelerate __getitem__ calls (to retrieve results) by saving indices of previous queries.
self.result_queries = {'ind':[], 'f_ind':[]}
[docs]
def __getitem__(self, ind:Tuple)->np.ndarray:
"""Query data fields. This method can be used to obtain the stored results in the :py:class:`Data` instance.
The full list of available fields can be inspected with:
::
print(data.data_fields)
The dict also denotes the dimension of each field.
The method allows for power indexing the results for the fields
``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``, ``_aux``, ``_y``
where further indices refer to the configured variables in the :py:class:`do_mpc.model.Model` and :py:class:`do_mpc.model.LinearModel` instance.
**Example:**
::
# Assume the following model was used (excerpt):
model = do_mpc.model.Model('continuous')
model.set_variable('_x', 'Temperature', shape=(5,1)) # Vector
model.set_variable('_p', 'disturbance', shape=(3,3)) # Matrix
model.set_variable('_u', 'heating') # scalar
...
# the model was used (among others) for the MPC controller
mpc = do_mpc.controller.MPC(model)
...
# Query the mpc.data instance:
mpc.data['_x'] # Return all states
mpc.data['_x', 'Temperature'] # Return the 5 temp states
mpc.data['_x', 'Temperature', :2] # Return the first 2 temp. states
mpc.data['_p', 'disturbance', 0, 2] # Matrix allows for further indices
# Other fields can also be queried, e.g.:
mpc.data['_time'] # current time
mpc.data['t_wall_total'] # optimizer runtime
# These do not allow further indices.
Args:
ind: Power index to query the prediction of a specific variable.
Returns:
Returns the queried data field (for all time instances)
"""
# ensure list:
if not isinstance(ind, tuple):
ind = [ind]
# First element is the data_field:
data_field = ind[0]
# Check validity:
keys = self.data_fields.keys()
assert data_field in keys, 'Your queried variable {} is not available. Please choose from {}'.format(data_field, keys)
if len(ind)>1:
# If further indices exist:
if ind in self.result_queries['ind']:
i = self.result_queries['ind'].index(ind)
f_ind = self.result_queries['f_ind'][i]
else:
powerind = ind[1:]
f_ind = self.model[data_field].f[powerind]
self.result_queries['ind'].append(ind)
self.result_queries['f_ind'].append(f_ind)
out = getattr(self, data_field)[:, f_ind]
else:
# If not just return the field:
out = getattr(self, data_field)
return out
def init_storage(self)->None:
"""Create new (empty) arrays for all variables.
The variables of interest are listed in the ``data_fields`` dictionary,
with their respective dimension. This dictionary may be updated.
The :py:class:`do_mpc.controller.MPC` class adds for example optimizer information.
"""
for field_i, dim_i in self.data_fields.items():
setattr(self, field_i, np.empty((0, dim_i)))
def set_meta(self, **kwargs)->None:
"""Set meta data for the current instance of the data object.
"""
for key, value in kwargs.items():
self.meta_data[key] = value
def update(self, **kwargs:Dict[np.ndarray,castools.DM])->None:
"""Update value(s) of the data structure with key word arguments.
These key word arguments must exist in the data fields of the data objective.
See self.data_fields for a complete list of data fields.
Example:
::
_x = np.ones((1, 3))
_u = np.ones((1, 2))
data.update('_x': _x, '_u': _u)
or:
data.update('_x': _x)
data.update('_u': _u)
Alternatively:
data_dict = {
'_x':np.ones((1, 3)),
'_u':np.ones((1, 2))
}
data.update(**data_dict)
Args:
kwargs : Arbitrary number of key word arguments for data fields that should be updated.
Raises:
assertion: Keyword must be in existing data_fields.
"""
for key, value in kwargs.items():
assert key in self.data_fields.keys(), 'Cannot update non existing key {} in data object.'.format(key)
if type(value) == castools.structure3.DMStruct:
value = value.cat
if type(value) == castools.DM:
# Convert to numpy
value = value.full()
elif type(value) in [float, int, bool]:
value = np.array(value)
# Get current results array for the given key:
arr = getattr(self, key)
# Append current value to results array:
updated = np.append(arr, value.reshape(1,-1), axis=0)
# Update results array:
setattr(self, key, updated)
def export(self)->dict:
"""The export method returns a dictionary of the stored data.
Returns:
Dictionary of the currently stored data.
"""
export_dict = {field_name: getattr(self, field_name) for field_name in self.data_fields}
return export_dict
[docs]
class MPCData(Data):
"""**do-mpc** data container for the :py:class:`do_mpc.controller.MPC` instance.
This method inherits from :py:class:`Data` and extends it to query the MPC predictions.
Warning:
For robust multi-stage MPC, the :py:class:`MPCData` class stores by default only the nominal values of the uncertain parameters.
Args:
model: model from :py:class:`do_mpc.model`
"""
def __init__(self, model:Union[do_mpc.model.Model,do_mpc.model.LinearModel]):
super().__init__(model)
# Accelerate prediction calls by saving indices of previous queries.
self.prediction_queries = {'ind':[], 'f_ind':[]}
def prediction(self, ind:tuple, t_ind:float=-1)->np.ndarray:
"""Query the MPC trajectories.
Use this method to obtain specific MPC trajectories from the data object.
Warnings:
This method requires that the optimal solution is stored in the :py:class:`do_mpc.data.MPCData` instance.
Storing the optimal solution must be activated with :py:func:`do_mpc.controller.MPC.set_param`.
Querying predicted trajectories requires the use of power indices, which is passed as tuple e.g.:
::
data.prediction((var_type, var_name, i), t_ind)
where
* ``var_type`` refers to ``_x``, ``_u``, ``_z``, ``_tvp``, ``_p``, ``_aux``
* ``var_name`` refers to the user-defined names in the :py:class:`do_mpc.model.Model`
* Use ``i`` to index vector valued variables.
The method returns a multidimensional numpy.ndarray. The dimensions refer to:
::
arr = data.prediction(('_x', 'x_1'))
arr.shape
>> (n_size, n_horizon, n_scenario)
with:
* ``n_size`` denoting the number of elements in ``x_1``, where ``n_size = 1`` is a scalar variable.
* ``n_horizon`` is the MPC horizon defined with :py:func:`do_mpc.controller.MPC.set_param`
* ``n_scenario`` refers to the number of uncertain scenarios (for robust MPC).
Additional to the power index tuple, a time index (``t_ind``) can be passed to access the prediction for a certain
time.
Args:
ind: Power index to query the prediction of a specific variable.
t_ind: Time index
Returns:
Predicted trajectories for the queries variable.
"""
assert self.meta_data['store_full_solution'], 'Optimal trajectory is not stored. Please update your MPC settings.'
assert isinstance(ind, tuple), 'Query index must be of type tuple.'
structure_scenario = self.meta_data['structure_scenario']
if self._opt_x_num.shape[0]==0:
_opt_x_num = np.zeros((1,self.opt_x.shape[0]))
_opt_p_num = np.zeros((1,self.opt_p.shape[0]))
_opt_aux_num = np.zeros((1,self.opt_aux.shape[0]))
else:
_opt_x_num = self._opt_x_num
_opt_p_num = self.opt_p_num
_opt_aux_num = self._opt_aux_num
if ind[0] in ['_x', '_z']:
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_x.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None), -1)+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
# - By indexing structure_scenario until f_ind.shape[0] we cover the case of _x and _z at the same time
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:f_ind.shape[0],:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_x_num[t_ind,f_ind]
elif ind[0] =='_u':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_x.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None))+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
if self.meta_data['open_loop']:
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,[0]].T].T
else:
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_x_num[t_ind,f_ind]
elif ind[0]=='_tvp':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_p.f[(ind[0], slice(None))+ind[1:]]
f_ind = np.array(f_ind).reshape(1, -1, 1)
#f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_p_num[t_ind,f_ind]
elif ind[0]=='_aux':
if ind in self.prediction_queries['ind']:
i = self.prediction_queries['ind'].index(ind)
f_ind = self.prediction_queries['f_ind'][i]
else:
f_ind = self.opt_aux.f[(ind[0], slice(None), lambda v: castools.horzcat(*v),slice(None))+ind[1:]]
f_ind = np.array([f_ind_k.full() for f_ind_k in f_ind], dtype='int32')
# sort pred such that each column belongs to one scenario
f_ind = f_ind[range(f_ind.shape[0]),:,structure_scenario[:-1,:].T].T
# Store f_ind:
self.prediction_queries['ind'].append(ind)
self.prediction_queries['f_ind'].append(f_ind)
out = _opt_aux_num[t_ind,f_ind]
else:
raise ValueError('Index {} not recognized.'.format(ind[0]))
return out
[docs]
def save_results(save_list:list,
result_name:str='results',
result_path:str='./results/',
overwrite:bool=False
)->None:
"""Exports the data objects from the **do-mpc** modules in ``save_list`` as a pickled file. Supply any, all or a selection of (as a list):
* :py:class:`do_mpc.controller.MPC`
* :py:class:`do_mpc.simulator.Simulator`
* :py:class:`do_mpc.estimator.Estimator`
These objects can be used in post-processing to create graphics with the :py:class:`do_mpc.graphics_backend`.
Args:
save_list: List of the objects to be stored.
result_name: Name of the result file, defaults to 'result'.
result_path: Result path, defaults to './results/'.
overwrite: Option to overwrite existing results, defaults to False. Index will be appended if file already exists.
Raises:
assertion: save_list must be a list.
assertion: result_name must be a string.
assertion: results_path must be a string.
assertion: overwrite must be boolean.
Exception: save_list contains object which is neither do_mpc simulator, optimizizer nor estimator.
"""
assert isinstance(save_list, list), 'save_list must be a list.'
assert isinstance(result_name, str), 'result_name must be a string.'
assert isinstance(result_path, str), 'results_path must be a string.'
assert isinstance(overwrite, bool), 'overwrite must be boolean.'
if not os.path.exists(result_path):
os.makedirs(result_path)
results = {}
for obj_i in save_list:
if isinstance(obj_i, do_mpc.controller.MPC):
results.update({'mpc': obj_i.data})
elif isinstance(obj_i, do_mpc.simulator.Simulator):
results.update({'simulator': obj_i.data})
elif isinstance(obj_i, (do_mpc.estimator.StateFeedback, do_mpc.estimator.EKF, do_mpc.estimator.MHE)):
results.update({'estimator': obj_i.data})
else:
raise Exception('save_list contains object which is neither do_mpc simulator, optimizizer nor estimator.')
# Dynamically generate new result name if name is already taken in result_path.
if overwrite==False:
ind = 1
ext_result_name = result_name
while os.path.isfile(result_path+ext_result_name+'.pkl'):
ext_result_name = '{ind:03d}_{name}'.format(ind=ind, name=result_name)
ind += 1
result_name = ext_result_name
with open(result_path+result_name+'.pkl', 'wb') as f:
pickle.dump(results, f)
[docs]
def load_results(file_name:str)->Dict:
""" Simple wrapper to open and unpickle a file.
If used for **do-mpc** results, this will return a dictionary with the stored **do-mpc** modules:
* :py:class:`do_mpc.controller.MPC`
* :py:class:`do_mpc.simulator.Simulator`
* :py:class:`do_mpc.estimator.Estimator`
Args:
file_name : File name (including path) for the file to be opened and unpickled.
Returns:
Returns the results stored in .pkl file.
"""
with open(file_name, 'rb') as f:
results = pickle.load(f)
return results