Source code for pydygp.linlatentforcemodels.mlfm

""" Base class definition for the Multiplicative Latent Force Model
"""
import numpy as np
from collections import namedtuple
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process import kernels as sklearn_kernels

def _make_tt_dense(tt, dt_max):
    # utility function for BaseMLFM.sim(...)
    res, inds = ([tt[0]], [0])
    for ta, tb in zip(tt[:-1], tt[1:]):
        N = int(np.ceil((tb - ta) / dt_max + 1))
        _tt = np.linspace(ta, tb, N)
        res = np.concatenate((res, _tt[1:]))
        inds.append(inds[-1] + N - 1)
    return res, inds

def _sim_gp(tt, gp):
    """Simulates values from a sklearn GPR object
    """
    K = gp.kernel(tt[:, None])
    K[np.diag_indices_from(K)] += gp.alpha
    L = np.linalg.cholesky(K)
    return L.dot(np.random.normal(size=tt.size))

# keep track of the dimensions in the MLFM
Dimensions = namedtuple('Dimensions', 'N K R D')

[docs]class BaseMLFM: """ Base class for the Multiplicative Latent Force Model. Parameters ---------- basis_mats : tuple A tuple of square matrices lf_kernels : list, optional Kernels of the latent force Gaussian process objects """
[docs] def __init__(self, basis_mats, R=None, lf_kernels=None): self.basis_mats = basis_mats if R is None: if lf_kernels is None: raise ValueError("If R is not supplied then lf_kernels must be supplied.") else: R = len(lf_kernels) # store the model dimensions K = basis_mats[0].shape[0] D = len(basis_mats) self.dim = Dimensions(None, K, R, D) # setup the latent forces self.setup_latentforces(lf_kernels)
[docs] def setup_latentforces(self, kernels=None): """Initalises the latent force GPs Parameters ---------- kernels : list, optional Kernels of the latent force Gaussian process objects """ if kernels is None: # Default is for kernels 1 * exp(-0.5 * (s-t)**2 ) kernels = [sklearn_kernels.ConstantKernel(1.) * sklearn_kernels.RBF(1.) for r in range(self.dim.R)] if len(kernels) != self.dim.R or \ not all(isinstance(k, sklearn_kernels.Kernel) for k in kernels): _msg = "kernels should be a list of {} kernel objects".format(self.dim.R) raise ValueError(_msg) self.latentforces = [GaussianProcessRegressor(kern) for kern in kernels]
[docs] def sim(self, x0, tt, beta=None, dt_max=0.1, latent_forces=None, size=1): """Simulate the process along a set of points Parameters ---------- x0 : array_like, shape (K, ) initial condition for the ode tt : array_like Ordered sequence of time points for the model to be simulated at beta : array_like, shape (R+1, D) dt_max : float, defualt = 0.1 Maximum spacing of dense time points used for simulating the model. latent_forces : Tuple of callables, optional, default = None Returns ------- Y : array, shape(len(tt), K) Values of the MLFM simulated at tt latent_forces : tuple of callables Tuple of functions (g_1(t),...,g_R(t)) used to simulate the model. Examples -------- >>> import numpy as np >>> from pydygp.linlatentforcemodels import BaseMLFM >>> struct_mats = [np.zeros((2, 2))] + [*pydygp.liealgebras.so2()] >>> tt = np.linspace(0., 5., 15) >>> mlfm = BaseMLFM(struct_mats) >>> Y, g = mlfm.sim([1., 0], tt) """ from scipy.interpolate import interp1d from scipy.integrate import odeint if beta is None: try: # some of the child classes # will fix beta beta = self.beta except: raise ValueError("Must supply beta.") if beta.shape[0] != self.dim.R + 1 \ or beta.shape[1] != self.dim.D: msg = "Beta must be of shape {} x {}.".format(self.dim.R+1, self.dim.D) raise ValueError(msg) # create a dense set of time points # so that max(np.diff(ttdense)) <= dt_max ttdense, inds = _make_tt_dense(tt, dt_max) # allows the passing of pre-defined functions if latent_forces is None: ginterp = [interp1d(ttdense, _sim_gp(ttdense, lf), kind='cubic', fill_value='extrapolate') for lf in self.latentforces] else: assert(len(latent_forces) == self.dim.R) ginterp = latent_forces # form the coeff. matrices of the evol. equation from # the structural parameters struct_mats = [sum(brd*Ld for brd, Ld in zip(br, self.basis_mats)) for br in beta] # the evolution equation def dXdt(X, t): At = struct_mats[0] + \ sum(Ar*ur(t) for Ar, ur in zip(struct_mats[1:], ginterp)) return At.dot(X) if size == 1: sol = odeint(dXdt, x0, ttdense) return sol[inds, :], ginterp else: return [odeint(dXdt, x0i, ttdense)[inds, :] for x0i in x0], ginterp
def _odeint(self, x0, tt, beta, glist): if len(glist) != self.dim.R: raise ValueError("glist must be a list of {} callable functions.".format(self.dim.R)) # import odeint for solving from scipy.integrate import odeint struct_mats = [sum(brd*Ld for brd, Ld in zip(br, self.basis_mats)) for br in beta] def A(t): return struct_mats[0] + sum(Ar*gr(t) for Ar, gr in zip(struct_mats[1:], glist)) return odeint(lambda x, t: A(t).dot(x), x0, tt) def _component_functions(self, g, beta, N=None): if N is None: N = self.dim.N g = g.reshape(self.dim.R, N) g = np.row_stack((np.ones(N), g)) struct_mats = np.array([ sum(brd*Ld for brd, Ld in zip(br, self.basis_mats)) for br in beta]) # match struct mats and G on r=0,...,R comp_funcs = struct_mats[..., None] * g[:, None, None, :] # sum over r return comp_funcs.sum(0) def __mul__(self, other): return MLFMCartesianProduct(self, other) def flatten(self): return self,
class MLFMCartesianProduct: """ Cartesian product of two MLFM like objects """ def __init__(self, mlfm1, mlfm2): # Check compatability N1, K1, R1, D1 = mlfm1.dim N2, K2, R2, D2 = mlfm2.dim self.mlfm1 = mlfm1 self.mlfm2 = mlfm2 if R1 != R2: raise ValueError("The number of latent forces in both ", "both models must be the same") if N1 != N2: raise ValueError("Dimension of the input vector must ", "be the same for both models") # This causes a bit of rewrite self.dim = Dimensions(N1, None, R1, None) def __mul__(self, mlfm): return MLFMCartesianProduct(self, mlfm) def flatten(self): """ Returns a flat tuple of the constituents of the MMLF objects """ mlfms = self.mlfm1.flatten() + self.mlfm2.flatten() return mlfms def sim(self, x0, tt, beta, dt_max=0.1, size=1, latent_forces=None): mlfms = self.flatten() # simulate from the first model Y0, lf = mlfms[0].sim(x0[0], tt, beta[0], dt_max=dt_max, size=size) Data = [Y0, ] for i in range(len(mlfms)-1): yi, _ = mlfms[i+1].sim(x0[i+1], tt, latent_forces=lf, beta=beta[i+1], dt_max=dt_max, size=size) Data.append(yi) return Data, lf