Measurement Noise Estimation

This tutorial demonstrates how to estimate the measurement noise standard deviation for each sensor using a Gaussian Process model.

[1]:
import os

import gpflow as gpf
import numpy as np
import tensorflow as tf

from tsipy.correction import SignalGenerator
from tsipy.fusion import SVGPModel, LocalGPModel
from tsipy.fusion.kernels import MultiWhiteKernel
from tsipy.fusion.utils import (
    build_and_concat_label_mask,
    build_and_concat_label_mask_output,
)
from tsipy.utils import (
    make_dir,
    plot_signals,
    plot_signals_and_confidence,
    pprint,
    pprint_block,
    sort_inputs,
)

Parameters

[2]:
random_seed = 0
normalization = False # If this is True, then estimated stds need re-scaling
clipping = True
num_inducing_pts = 500
max_iter = 5000

noise_std_a_rate = 0.040
noise_std_b_rate = 0.015

Generate Dataset

[3]:
np.random.seed(random_seed)
tf.random.set_seed(random_seed)

# Generate Brownian motion signal
signal_generator = SignalGenerator(
    y_center=0.0,
    add_degradation=False,
    noise_stds=(noise_std_a_rate, noise_std_b_rate),
)
# Noise added is proportional to the value range of the signal
noise_std_a = noise_std_a_rate * (np.max(signal_generator.y) - np.min(signal_generator.y))
noise_std_b = noise_std_b_rate * (np.max(signal_generator.y) - np.min(signal_generator.y))

t_a, a = signal_generator["a"]
t_b, b = signal_generator["b"]

pprint("Signal", level=0)
pprint("- t_a", t_a.shape, level=1)
pprint("- a", a.shape, level=1)

pprint("Signal", level=0)
pprint("- t_b", t_b.shape, level=1)
pprint("- b", b.shape, level=1)
Signal
    - t_a                                         (90002,)
    - a                                           (90002,)
Signal
    - t_b                                         (20127,)
    - b                                           (20127,)
[4]:
_ = plot_signals(
    [
        (t_a, a, "$a$", {}),
        (t_b, b, "$b$", {}),
        (signal_generator.x, signal_generator.y, "$s$", {}),
    ],
    legend="upper right",
)
../_images/notebooks_demo_noise_6_0.png

Sensor Fusion and Noise Estimation

[5]:
gpf.config.set_default_float(np.float64)
np.random.seed(random_seed)
tf.random.set_seed(random_seed)

t_a = build_and_concat_label_mask(t_a, label=1)
t_b = build_and_concat_label_mask(t_b, label=2)
t_out = build_and_concat_label_mask_output(signal_generator.x)

# Concatenate signals and sort by x[:, 0]
t = np.vstack((t_a, t_b))
s = np.reshape(np.hstack((a, b)), newshape=(-1, 1))
t, s = sort_inputs(t, s, sort_axis=0)

pprint("Signals", level=0)
pprint("- t", t.shape, level=1)
pprint("- s", s.shape, level=1)
Signals
    - t                                           (110129, 2)
    - s                                           (110129, 1)

Fusion Model and Training

Sensor noise standard deviation is a parameter of MultiWhiteKernel. This parameter is learned during training.

[6]:
# Kernels
matern_kernel = gpf.kernels.Matern12(active_dims=[0])
white_kernel = MultiWhiteKernel(labels=(1, 2), active_dims=[1])
kernel = matern_kernel + white_kernel

# Fusion model
model = SVGPModel(
    kernel=kernel,
    num_inducing_pts=num_inducing_pts,
    normalization=normalization,
    clipping=clipping,
)

# Train
model.fit(t, s, max_iter=max_iter, x_val=t_out, n_evals=5, verbose=True)

# Predict
s_out_mean, s_out_std = model(t_out)

pprint("\nOutput Signals", level=0)
pprint("- t_out", t_out.shape, level=1)
pprint("- s_out_mean", s_out_mean.shape, level=1)
pprint("- s_out_std", s_out_std.shape, level=1)
    - Step      1/5000:                           -527151.479
    - Step   1000/5000:                           -72688.766
    - Step   2000/5000:                           -35821.624
    - Step   3000/5000:                           -36876.176
    - Step   4000/5000:                           -31580.178
    - Step   5000/5000:                           -29128.483

Output Signals
    - t_out                                       (100000, 2)
    - s_out_mean                                  (100000,)
    - s_out_std                                   (100000,)

Results

[7]:
_ = plot_signals_and_confidence(
    [(t_out[:, 0], s_out_mean, s_out_std, "SVGP")],
)

fig, ax = plot_signals_and_confidence(
    [(t_out[:, 0], s_out_mean, s_out_std, "SVGP")],
)
ax.scatter(
    t_a[:, 0],
    a,
    label="$a$",
    s=3,
)
_ = ax.scatter(
    t_b[:, 0],
    b,
    label="$b$",
    s=3,
)
../_images/notebooks_demo_noise_12_0.png
../_images/notebooks_demo_noise_12_1.png

Noise standard deviation comparison

The measurement noise is underestimated. When training the GP, the model is not guaranteed to recover the exact measurement noise level. Given the selected kernel, the GP can treat part of the signal as noise. Similarly, the measurement noise is estimated to be greater for signal B, which is indeed not the case. Thus, one needs to be careful when making conclusion.

[8]:
# Kernel at index 0 is the matern kernel
# Kernel at index 1 is the multi-white kernel
est_noise_stds = model._model.kernel.kernels[1].variances.numpy()

est_noise_std_a = est_noise_stds[0]
est_noise_std_b = est_noise_stds[1]

pprint("Sensor a", level=0)
pprint("- Ground-truth:", "{:.5f}".format(noise_std_a), level=1)
pprint("- Estimation:", "{:.5f}".format(est_noise_std_a), level=1)

pprint("Sensor b", level=0)
pprint("- Ground-truth:", "{:.5f}".format(noise_std_b), level=1)
pprint("- Estimation:", "{:.5f}".format(est_noise_std_b), level=1)
Sensor a
    - Ground-truth:                               0.31556
    - Estimation:                                 0.00026
Sensor b
    - Ground-truth:                               0.11834
    - Estimation:                                 0.00033
[9]:
_, ax_a = plot_signals_and_confidence(
    [
        (
            signal_generator.x,
            signal_generator.y,
            noise_std_a * np.ones_like(signal_generator.y),
            r"$\sigma_a$"
        ),
        (t_out[:, 0], s_out_mean, s_out_std, "SVGP"),
    ],
)
../_images/notebooks_demo_noise_15_0.png
[10]:
_, ax_b = plot_signals_and_confidence(
    [
        (
            signal_generator.x,
            signal_generator.y,
            noise_std_b * np.ones_like(signal_generator.y),
            r"$\sigma_b$"
        ),
        (t_out[:, 0], s_out_mean, s_out_std, "SVGP"),
    ],
)
ax_b.set_ylim(*ax_a.get_ylim())
[10]:
(-6.04577481468815, 3.9928228639266123)
../_images/notebooks_demo_noise_16_1.png
[ ]: