Getting started: MHE#

Open an interactive online Jupyter Notebook with this content on Binder:

In this Jupyter Notebook we illustrate application of the do-mpc moving horizon estimation module. Please follow first the general Getting Started guide, as we cover the sample example and skip over some previously explained details.

[1]:

import numpy as np

# Add do_mpc to path. This is not necessary if it was installed via pip.
import sys
import os
rel_do_mpc_path = os.path.join('..','..','..')
sys.path.append(rel_do_mpc_path)

# Import do_mpc package:
import do_mpc


Creating the model#

First, we need to decide on the model type. For the given example, we are working with a continuous model.

[2]:

model_type = 'continuous' # either 'discrete' or 'continuous'
model = do_mpc.model.Model(model_type)


The model is based on the assumption that we have additive process and/or measurement noise:

\begin{align} \dot{x}(t) &= f(x(t),u(t),z(t),p(t),p_{\text{tv}}(t))+w(t), \\ y(t) &= h(x(t),u(t),z(t),p(t),p_{\text{tv}}(t))+v(t), \end{align}

we are free to chose, which states and which measurements experience additive noise.

Model variables#

The next step is to define the model variables. It is important to define the variable type, name and optionally shape (default is scalar variable).

In contrast to the previous example, we now use vectors for all variables.

[3]:

phi = model.set_variable(var_type='_x', var_name='phi', shape=(3,1))
dphi = model.set_variable(var_type='_x', var_name='dphi', shape=(3,1))

# Two states for the desired (set) motor position:
phi_m_set = model.set_variable(var_type='_u', var_name='phi_m_set', shape=(2,1))

# Two additional states for the true motor position:
phi_m = model.set_variable(var_type='_x', var_name='phi_m', shape=(2,1))


Model measurements#

This step is essential for the state estimation task: We must define a measurable output. Typically, this is a subset of states (or a transformation thereof) as well as the inputs.

Note that some MHE implementations consider inputs separately.

As mentionned above, we need to define for each measurement if additive noise is present. In our case we assume noisy state measurements ($$\phi$$) but perfect input measurements.

[4]:

# State measurements
phi_meas = model.set_meas('phi_1_meas', phi, meas_noise=True)

# Input measurements
phi_m_set_meas = model.set_meas('phi_m_set_meas', phi_m_set, meas_noise=False)


Model parameters#

Next we define parameters. The MHE allows to estimate parameters as well as states. Note that not all parameters must be estimated (as shown in the MHE setup below). We can also hardcode parameters (such as the spring constants c).

[5]:

Theta_1 = model.set_variable('parameter', 'Theta_1')
Theta_2 = model.set_variable('parameter', 'Theta_2')
Theta_3 = model.set_variable('parameter', 'Theta_3')

c = np.array([2.697,  2.66,  3.05, 2.86])*1e-3
d = np.array([6.78,  8.01,  8.82])*1e-5


Right-hand-side equation#

Finally, we set the right-hand-side of the model by calling model.set_rhs(var_name, expr) with the var_name from the state variables defined above and an expression in terms of $$x, u, z, p$$.

Note that we can decide whether the inidividual states experience process noise. In this example we choose that the system model is perfect. This is the default setting, so we don’t need to pass this parameter explictly.

[6]:

model.set_rhs('phi', dphi)

dphi_next = vertcat(
-c[0]/Theta_1*(phi[0]-phi_m[0])-c[1]/Theta_1*(phi[0]-phi[1])-d[0]/Theta_1*dphi[0],
-c[1]/Theta_2*(phi[1]-phi[0])-c[2]/Theta_2*(phi[1]-phi[2])-d[1]/Theta_2*dphi[1],
-c[2]/Theta_3*(phi[2]-phi[1])-c[3]/Theta_3*(phi[2]-phi_m[1])-d[2]/Theta_3*dphi[2],
)

model.set_rhs('dphi', dphi_next, process_noise = False)

tau = 1e-2
model.set_rhs('phi_m', 1/tau*(phi_m_set - phi_m))


The model setup is completed by calling model.setup():

[7]:

model.setup()


After calling model.setup() we cannot define further variables etc.

Configuring the moving horizon estimator#

The first step of configuring the moving horizon estimator is to call the class with a list of all parameters to be estimated. An empty list (default value) means that no parameters are estimated. The list of estimated parameters must be a subset (or all) of the previously defined parameters.

Note

So why did we define Theta_2 and Theta_3 if we do not estimate them?

In many cases we will use the same model for (robust) control and MHE estimation. In that case it is possible to have some external parameters (e.g. weather prediction) that are uncertain but cannot be estimated.

[8]:

mhe = do_mpc.estimator.MHE(model, ['Theta_1'])


MHE parameters:#

Next, we pass MHE parameters. Most importantly, we need to set the time step and the horizon. We also choose to obtain the measurement from the MHE data object. Alternatively, we are able to set a user defined measurement function that is called at each timestep and returns the N previous measurements for the estimation step.

[9]:

setup_mhe = {
't_step': 0.1,
'n_horizon': 10,
'store_full_solution': True,
'meas_from_data': True
}
mhe.set_param(**setup_mhe)


Objective function#

The most important step of the configuration is to define the objective function for the MHE problem:

\begin{split}\begin{aligned} \underset{ \begin{array}{c} \mathbf{x}_{0:N+1}, \mathbf{u}_{0:N}, p,\\ \mathbf{w}_{0:N}, \mathbf{v}_{0:N} \end{array} }{\mathrm{min}} &\frac{1}{2}\|x_0-\tilde{x}_0\|_{P_x}^2+\frac{1}{2}\|p-\tilde{p}\|_{P_p}^2 +\sum_{k=0}^{N-1} \left(\frac{1}{2}\|v_k\|_{P_{v,k}}^2 + \frac{1}{2}\|w_k\|_{P_{w,k}}^2\right),\\ &\left.\begin{aligned} \mathrm{s.t.}\quad x_{k+1} &= f(x_k,u_k,z_k,p,p_{\text{tv},k})+ w_k,\\ y_k &= h(x_k,u_k,z_k,p,p_{\text{tv},k}) + v_k, \\ &g(x_k,u_k,z_k,p_k,p_{\text{tv},k}) \leq 0 \end{aligned}\right \rbrace k=0,\dots, N \end{aligned}\end{split}

We typically consider the formulation shown above, where the user has to pass the weighting matrices P_x, P_v, P_p and P_w. In our concrete example, we assume a perfect model without process noise and thus P_w is not required.

We set the objective function with the weighting matrices shown below:

[10]:

P_v = np.diag(np.array([1,1,1]))
P_x = np.eye(8)
P_p = 10*np.eye(1)

mhe.set_default_objective(P_x, P_v, P_p)


Fixed parameters#

If the model contains parameters and if we estimate only a subset of these parameters, it is required to pass a function that returns the value of the remaining parameters at each time step.

Furthermore, this function must return a specific structure, which is first obtained by calling:

[11]:

p_template_mhe = mhe.get_p_template()


Using this structure, we then formulate the following function for the remaining (not estimated) parameters:

[12]:

def p_fun_mhe(t_now):
p_template_mhe['Theta_2'] = 2.25e-4
p_template_mhe['Theta_3'] = 2.25e-4
return p_template_mhe


This function is finally passed to the mhe instance:

[13]:

mhe.set_p_fun(p_fun_mhe)


Bounds#

The MHE implementation also supports bounds for states, inputs, parameters which can be set as shown below. For the given example, it is especially important to set realistic bounds on the estimated parameter. Otherwise the MHE solution is a poor fit.

[14]:

mhe.bounds['lower','_u', 'phi_m_set'] = -2*np.pi
mhe.bounds['upper','_u', 'phi_m_set'] = 2*np.pi

mhe.bounds['lower','_p_est', 'Theta_1'] = 1e-5
mhe.bounds['upper','_p_est', 'Theta_1'] = 1e-3


Setup#

Similar to the controller, simulator and model, we finalize the MHE configuration by calling:

[15]:

mhe.setup()


Configuring the Simulator#

In many cases, a developed control approach is first tested on a simulated system. do-mpc responds to this need with the do_mpc.simulator class. The simulator uses state-of-the-art DAE solvers, e.g. Sundials CVODE to solve the DAE equations defined in the supplied do_mpc.model. This will often be the same model as defined for the optimizer but it is also possible to use a more complex model of the same system.

In this section we demonstrate how to setup the simulator class for the given example. We initialize the class with the previously defined model:

[16]:

simulator = do_mpc.simulator.Simulator(model)


Simulator parameters#

Next, we need to parametrize the simulator. Please see the API documentation for simulator.set_param() for a full description of available parameters and their meaning. Many parameters already have suggested default values. Most importantly, we need to set t_step. We choose the same value as for the optimizer.

[17]:

# Instead of supplying a dict with the splat operator (**), as with the optimizer.set_param(),
# we can also use keywords (and call the method multiple times, if necessary):
simulator.set_param(t_step = 0.1)


Parameters#

In the model we have defined the inertia of the masses as parameters. The simulator is now parametrized to simulate using the “true” values at each timestep. In the most general case, these values can change, which is why we need to supply a function that can be evaluted at each time to obtain the current values. do-mpc requires this function to have a specific return structure which we obtain first by calling:

[18]:

p_template_sim = simulator.get_p_template()


We need to define a function which returns this structure with the desired numerical values. For our simple case:

[19]:

def p_fun_sim(t_now):
p_template_sim['Theta_1'] = 2.25e-4
p_template_sim['Theta_2'] = 2.25e-4
p_template_sim['Theta_3'] = 2.25e-4
return p_template_sim


This function is now supplied to the simulator in the following way:

[20]:

simulator.set_p_fun(p_fun_sim)


Setup#

Finally, we call:

[21]:

simulator.setup()


Creating the loop#

While the full loop should also include a controller, we are currently only interested in showcasing the estimator. We therefore estimate the states for an arbitrary initial condition and some random control inputs (shown below).

[22]:

x0 = np.pi*np.array([1, 1, -1.5, 1, -5, 5, 0, 0]).reshape(-1,1)


To make things more interesting we pass the estimator a perturbed initial state:

[23]:

x0_mhe = x0*(1+0.5*np.random.randn(8,1))


and use the x0 property of the simulator and estimator to set the initial state:

[24]:

simulator.x0 = x0
mhe.x0 = x0_mhe
mhe.p_est0 = 1e-4


It is also adviced to create an initial guess for the MHE optimization problem. The simplest way is to base that guess on the initial state, which is done automatically when calling:

[25]:

mhe.set_initial_guess()


Setting up the Graphic#

We are again using the do-mpc graphics module. This versatile tool allows us to conveniently configure a user-defined plot based on Matplotlib and visualize the results stored in the mhe.data, simulator.data objects.

We start by importing matplotlib:

[26]:

import matplotlib.pyplot as plt
import matplotlib as mpl
# Customizing Matplotlib:
mpl.rcParams['font.size'] = 18
mpl.rcParams['lines.linewidth'] = 3
mpl.rcParams['axes.grid'] = True


And initializing the graphics module with the data object of interest. In this particular example, we want to visualize both the mpc.data as well as the simulator.data.

[27]:

mhe_graphics = do_mpc.graphics.Graphics(mhe.data)
sim_graphics = do_mpc.graphics.Graphics(simulator.data)


Next, we create a figure and obtain its axis object. Matplotlib offers multiple alternative ways to obtain an axis object, e.g. subplots, subplot2grid, or simply gca. We use subplots:

[28]:

%%capture
# We just want to create the plot and not show it right now. This "inline magic" suppresses the output.
fig, ax = plt.subplots(3, sharex=True, figsize=(16,9))
fig.align_ylabels()

# We create another figure to plot the parameters:
fig_p, ax_p = plt.subplots(1, figsize=(16,4))


Most important API element for setting up the graphics module is graphics.add_line, which mimics the API of model.add_variable, except that we also need to pass an axis.

We want to show both the simulator and MHE results on the same axis, which is why we configure both of them identically:

[29]:

%%capture
for g in [sim_graphics, mhe_graphics]:
# Plot the angle positions (phi_1, phi_2, phi_2) on the first axis:
ax[0].set_prop_cycle(None)
ax[1].set_prop_cycle(None)

# Plot the set motor positions (phi_m_1_set, phi_m_2_set) on the second axis:
ax[2].set_prop_cycle(None)

ax[2].set_xlabel('time [s]')


Before we show any results we configure we further configure the graphic, by changing the appearance of the simulated lines. We can obtain line objects from any graphics instance with the result_lines property:

[30]:

sim_graphics.result_lines

[30]:

<do_mpc.tools.structure.Structure at 0x7fa7ba0e84f0>


We obtain a structure that can be queried conveniently as follows:

[31]:

# First element for state phi:
sim_graphics.result_lines['_x', 'phi', 0]

[31]:

[<matplotlib.lines.Line2D at 0x7fa7bab22340>]


In this particular case we want to change all result_lines with:

[32]:

for line_i in sim_graphics.result_lines.full:
line_i.set_alpha(0.4)
line_i.set_linewidth(6)


We furthermore use this property to create a legend:

[33]:

ax[0].legend(sim_graphics.result_lines['_x', 'phi'], '123', title='Sim.', loc='center right')
ax[1].legend(mhe_graphics.result_lines['_x', 'phi'], '123', title='MHE', loc='center right')

[33]:

<matplotlib.legend.Legend at 0x7fa7bab34f70>


and another legend for the parameter plot:

[34]:

ax_p.legend(sim_graphics.result_lines['_p', 'Theta_1']+mhe_graphics.result_lines['_p', 'Theta_1'], ['True','Estim.'])

[34]:

<matplotlib.legend.Legend at 0x7fa7bab34d30>


Running the loop#

We investigate the closed-loop MHE performance by alternating a simulation step (y0=simulator.make_step(u0)) and an estimation step (x0=mhe.make_step(y0)). Since we are lacking the controller which would close the loop (u0=mpc.make_step(x0)), we define a random control input function:

[35]:

def random_u(u0):
# Hold the current value with 80% chance or switch to new random value.
u_next = (0.5-np.random.rand(2,1))*np.pi # New candidate value.
switch = np.random.rand() >= 0.8 # switching? 0 or 1.
u0 = (1-switch)*u0 + switch*u_next # Old or new value.
return u0


The function holds the current input value with 80% chance or switches to a new random input value.

We can now run the loop. At each iteration, we perturb our measurements, for a more realistic scenario. This can be done by calling the simulator with a value for the measurement noise, which we defined in the model above.

[36]:

%%capture
np.random.seed(999) #make it repeatable

u0 = np.zeros((2,1))
for i in range(50):
u0 = random_u(u0) # Control input
v0 = 0.1*np.random.randn(model.n_v,1) # measurement noise
y0 = simulator.make_step(u0, v0=v0)
x0 = mhe.make_step(y0) # MHE estimation step


We can visualize the resulting trajectory with the pre-defined graphic:

[37]:

sim_graphics.plot_results()
mhe_graphics.plot_results()
# Reset the limits on all axes in graphic to show the data.
mhe_graphics.reset_axes()

# Mark the time after a full horizon is available to the MHE.
ax[0].axvline(1)
ax[1].axvline(1)
ax[2].axvline(1)

# Show the figure:
fig

[37]:


Parameter estimation:

[38]:

ax_p.set_ylim(1e-4, 4e-4)
ax_p.set_ylabel('mass inertia')
ax_p.set_xlabel('time [s]')
fig_p

[38]:


One of the main advantages of moving horizon estimation is the possibility to set bounds for states, inputs and estimated parameters. As mentioned above, this is crucial in the presented example. Let’s see how the MHE behaves without realistic bounds for the estimated mass inertia of disc one.

We simply reconfigure the bounds:

[39]:

mhe.bounds['lower','_p_est', 'Theta_1'] = -np.inf
mhe.bounds['upper','_p_est', 'Theta_1'] = np.inf


And setup the MHE again. The backend is now recreating the optimization problem, taking into consideration the currently saved bounds.

[40]:

mhe.setup()


We reset the history of the estimator and simulator (to clear their data objects and start “fresh”).

[41]:

mhe.reset_history()
simulator.reset_history()


Finally, we run the exact same loop again obtaining new results.

[42]:

%%capture
np.random.seed(999) #make it repeatable

u0 = np.zeros((2,1))
for i in range(50):
u0 = random_u(u0) # Control input
v0 = 0.1*np.random.randn(model.n_v,1) # measurement noise
y0 = simulator.make_step(u0, v0=v0)
x0 = mhe.make_step(y0) # MHE estimation step


These results now look quite terrible:

[43]:

sim_graphics.plot_results()
mhe_graphics.plot_results()
# Reset the limits on all axes in graphic to show the data.
mhe_graphics.reset_axes()

# Mark the time after a full horizon is available to the MHE.
ax[0].axvline(1)
ax[1].axvline(1)
ax[2].axvline(1)

# Show the figure:
fig

[43]:


Clearly, the main problem is a faulty parameter estimation, which is off by orders of magnitude:

[44]:

ax_p.set_ylabel('mass inertia')
ax_p.set_xlabel('time [s]')
fig_p

[44]:


Thank you, for following through this short example on how to use do-mpc. We hope you find the tool and this documentation useful.

We also want to emphasize that we skipped over many details, further functions etc. in this introduction. Please have a look at our more complex examples to get a better impression of the possibilities with do-mpc.