Source code for gluonts.distribution.piecewise_linear

# Copyright 2018, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
# or in the "license" file accompanying this file. This file is distributed
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

# Standard library imports
from typing import Dict, Optional, Tuple, cast, List

# Third-party imports
import numpy as np

# First-party imports
from gluonts.core.component import validated
from gluonts.distribution.bijection import AffineTransformation, Bijection
from gluonts.distribution.distribution import Distribution, getF
from gluonts.distribution.distribution_output import DistributionOutput
from gluonts.distribution.transformed_distribution import (
from gluonts.model.common import Tensor
from import util

[docs]class PiecewiseLinear(Distribution): r""" Piecewise linear distribution. This class represents the *quantile function* (i.e., the inverse CDF) associated with the a distribution, as a continuous, non-decreasing, piecewise linear function defined in the [0, 1] interval: .. math:: q(x; \gamma, b, d) = \gamma + \sum_{l=0}^L b_l (x_l - d_l)_+ where the input :math:`x \in [0,1]` and the parameters are - :math:`\gamma`: intercept at 0 - :math:`b`: differences of the slopes in consecutive pieces - :math:`d`: knot positions Parameters ---------- gamma Tensor containing the intercepts at zero slopes Tensor containing the slopes of each linear piece. All coefficients must be positive. Shape: ``(*gamma.shape, num_pieces)`` knot_spacings Tensor containing the spacings between knots in the splines. All coefficients must be positive and sum to one on the last axis. Shape: ``(*gamma.shape, num_pieces)`` F """ is_reparameterizable = False @validated() def __init__( self, gamma: Tensor, slopes: Tensor, knot_spacings: Tensor, F=None ) -> None: self.F = F if F else getF(gamma) self.gamma = gamma self.slopes = slopes self.knot_spacings = knot_spacings # Since most of the calculations are easily expressed in the original parameters, we transform the # learned parameters back self.b, self.knot_positions = PiecewiseLinear._to_orig_params( self.F, slopes, knot_spacings ) @property def args(self) -> List: return [self.gamma, self.slopes, self.knot_spacings] @staticmethod def _to_orig_params( F, slopes: Tensor, knot_spacings: Tensor ) -> Tuple[Tensor, Tensor]: r""" Convert the trainable parameters to the original parameters of the splines, i.e., convert the slopes of each piece to the difference between slopes of consecutive pieces and knot spacings to knot positions. Parameters ---------- F slopes Tensor of shape (*gamma.shape, num_pieces) knot_spacings Tensor of shape (*gamma.shape, num_pieces) Returns ------- Tensor Tensor of shape (*gamma.shape, num_pieces) Tensor Tensor of shape (*gamma.shape, num_pieces) """ # b: the difference between slopes of consecutive pieces # shape (..., num_pieces - 1) b = F.slice_axis(slopes, axis=-1, begin=1, end=None) - F.slice_axis( slopes, axis=-1, begin=0, end=-1 ) # Add slope of first piece to b: b_0 = m_0 m_0 = F.slice_axis(slopes, axis=-1, begin=0, end=1) b = F.concat(m_0, b, dim=-1) # The actual position of the knots is obtained by cumulative sum of # the knot spacings. The first knot position is always 0 for quantile # functions; cumsum will take care of that. knot_positions = util.cumsum(F, knot_spacings, exclusive=True) return b, knot_positions
[docs] def sample( self, num_samples: Optional[int] = None, dtype=np.float32 ) -> Tensor: F = self.F # if num_samples=None then u should have the same shape as gamma, i.e., (dim,) # else u should be (num_samples, dim) # Note: there is no need to extend the parameters to (num_samples, dim, ...) # Thankfully samples returned by `uniform_like` have the expected datatype. u = F.random.uniform_like( data=( self.gamma if num_samples is None else self.gamma.expand_dims(axis=0).repeat( axis=0, repeats=num_samples ) ) ) sample = self.quantile(u) if num_samples is None: sample = F.squeeze(sample, axis=0) return sample
# overwrites the loss method of the Distribution class
[docs] def loss(self, x: Tensor) -> Tensor: return self.crps(x)
[docs] def crps(self, x: Tensor) -> Tensor: r""" Compute CRPS in analytical form. Parameters ---------- x Observation to evaluate. Shape equals to gamma.shape. Returns ------- Tensor Tensor containing the CRPS. """ F = self.F gamma, b, knot_positions = self.gamma, self.b, self.knot_positions a_tilde = self.cdf(x) max_a_tilde_knots = F.broadcast_maximum( a_tilde.expand_dims(axis=-1), knot_positions ) knots_cubed = F.broadcast_power(self.knot_positions, F.ones(1) * 3.0) coeff = ( (1.0 - knots_cubed) / 3.0 - knot_positions - F.square(max_a_tilde_knots) + 2 * max_a_tilde_knots * knot_positions ) crps = ( (2 * a_tilde - 1) * x + (1 - 2 * a_tilde) * gamma + F.sum(b * coeff, axis=-1, keepdims=False) ) return crps
[docs] def cdf(self, x: Tensor) -> Tensor: r""" Computes the quantile level :math:`\alpha` such that :math:`q(\alpha) = x`. Parameters ---------- x Tensor of shape gamma.shape Returns ------- Tensor Tensor of shape gamma.shape """ F = self.F gamma, b, knot_positions = self.gamma, self.b, self.knot_positions quantiles_at_knots = self.quantile_internal(knot_positions, axis=-2) # Mask to nullify the terms corresponding to knots larger than l_0, which is the largest knot # (quantile level) such that the quantile at l_0, s(l_0) < x. # (..., num_pieces) mask = F.broadcast_lesser(quantiles_at_knots, x.expand_dims(axis=-1)) slope_l0 = F.sum(b * mask, axis=-1, keepdims=False) # slope_l0 can be zero in which case a_tilde = 0. # The following is to circumvent mxnet issue with "where" operator which returns nan even if the statement # you are interested in does not result in nan (but the "else" statement evaluates to nan). slope_l0_nz = F.where( slope_l0 == F.zeros_like(slope_l0), F.ones_like(x), slope_l0 ) a_tilde = F.where( slope_l0 == F.zeros_like(slope_l0), F.zeros_like(x), ( x - gamma + F.sum(b * knot_positions * mask, axis=-1, keepdims=False) ) / slope_l0_nz, ) return F.broadcast_minimum(F.ones_like(a_tilde), a_tilde)
[docs] def quantile(self, level: Tensor) -> Tensor: return self.quantile_internal(level, axis=0)
[docs] def quantile_internal( self, x: Tensor, axis: Optional[int] = None ) -> Tensor: r""" Evaluates the quantile function at the quantile levels contained in `x`. Parameters ---------- x Tensor of shape ``*gamma.shape`` if axis=None, or containing an additional axis on the specified position, otherwise. axis Index of the axis containing the different quantile levels which are to be computed. Returns ------- Tensor Quantiles tensor, of the same shape as x. """ F = self.F # shapes of self # self.gamma: (*batch_shape) # self.knot_positions, self.b: (*batch_shape, num_pieces) # axis=None - passed at inference when num_samples is None # The shape of x is (*batch_shape). # The shapes of the parameters should be: # gamma: (*batch_shape), knot_positions, b: (*batch_shape, num_pieces) # They match the self. counterparts so no reshaping is needed # axis=0 - passed at inference when num_samples is not None # The shape of x is (num_samples, *batch_shape). # The shapes of the parameters should be: # gamma: (num_samples, *batch_shape), knot_positions, b: (num_samples, *batch_shape, num_pieces), # They do not match the self. counterparts and we need to expand the axis=0 to all of them. # axis=-2 - passed at training when we evaluate quantiles at knot_positions in order to compute a_tilde # The shape of x is shape(x) = shape(knot_positions) = (*batch_shape, num_pieces). # The shape of the parameters shopuld be: # gamma: (*batch_shape, 1), knot_positions: (*batch_shape, 1, num_pieces), b: (*batch_shape, 1, num_pieces) # They do not match the self. counterparts and we need to expand axis=-1 for gamma and axis=-2 for the rest. if axis is not None: gamma = self.gamma.expand_dims(axis=axis if axis == 0 else -1) knot_positions = self.knot_positions.expand_dims(axis=axis) b = self.b.expand_dims(axis=axis) else: gamma, knot_positions, b = self.gamma, self.knot_positions, self.b x_minus_knots = F.broadcast_minus( x.expand_dims(axis=-1), knot_positions ) quantile = F.broadcast_add( gamma, F.sum(F.broadcast_mul(b, F.relu(x_minus_knots)), axis=-1) ) return quantile
@property def batch_shape(self) -> Tuple: return self.gamma.shape @property def event_shape(self) -> Tuple: return () @property def event_dim(self) -> int: return 0
[docs]class PiecewiseLinearOutput(DistributionOutput): distr_cls: type = PiecewiseLinear @validated() def __init__(self, num_pieces: int) -> None: super().__init__(self) assert ( isinstance(num_pieces, int) and num_pieces > 1 ), "num_pieces should be an integer larger than 1" self.num_pieces = num_pieces self.args_dim = cast( Dict[str, int], {"gamma": 1, "slopes": num_pieces, "knot_spacings": num_pieces}, )
[docs] @classmethod def domain_map(cls, F, gamma, slopes, knot_spacings): # slopes of the pieces are non-negative slopes_proj = F.Activation(data=slopes, act_type="softrelu") + 1e-4 # the spacing between the knots should be in [0, 1] and sum to 1 knot_spacings_proj = F.softmax(knot_spacings) return gamma.squeeze(axis=-1), slopes_proj, knot_spacings_proj
[docs] def distribution( self, distr_args, loc: Optional[Tensor] = None, scale: Optional[Tensor] = None, ) -> PiecewiseLinear: if scale is None: return self.distr_cls(*distr_args) else: distr = self.distr_cls(*distr_args) return TransformedPiecewiseLinear( distr, [AffineTransformation(loc=loc, scale=scale)] )
@property def event_shape(self) -> Tuple: return ()
# Need to inherit from PiecewiseLinear to get the overwritten loss method.
[docs]class TransformedPiecewiseLinear(TransformedDistribution, PiecewiseLinear): @validated() def __init__( self, base_distribution: PiecewiseLinear, transforms: List[Bijection] ) -> None: super().__init__(base_distribution, transforms)
[docs] def crps(self, y: Tensor) -> Tensor: # TODO: use event_shape F = getF(y) x = y scale = 1.0 for t in self.transforms[::-1]: assert isinstance( t, AffineTransformation ), "Not an AffineTransformation" x = t.f_inv(x) scale *= t.scale p = self.base_distribution.crps(x) return F.broadcast_mul(p, scale)