[code review] N-BEATS: NEURAL BASIS EXPANSION ANALYSIS FOR INTERPRETABLE TIME SERIES FORECASTING


figure2


1. linear

def linear(in_dim, out_dim, bias=True, dropout: int = None):
    layer = nn.Linear(in_dim, out_dim, bias=bias)
    if dropout is not None:
        return nn.Sequential(nn.Dropout(dropout), layer)
    else:
        return layer


2. linspace

len_back : Length of backcast

len_fore : Length of forecast

def linspace(len_back: int, len_fore : int, centered: bool = False) -> Tuple[np.ndarray, np.ndarray]:
    if centered:
        norm = max(len_back, len_fore )
        start = -len_back
        stop = len_fore  - 1
    else:
        norm = len_back + len_fore 
        start = 0
        stop = len_back + len_fore  - 1
    lin_space = np.linspace(start/norm, stop/norm, 
                            len_back+len_fore,dtype=np.float32)
    b_ls = lin_space[:len_back]
    f_ls = lin_space[len_back:]
    return b_ls, f_ls
b_ls,f_ls=linspace(2,3,centered=True)

print(b_ls)
print(f_ls)
#-------------------------------------#
[-0.6666667  -0.33333334]
[0.         0.33333334 0.6666667 ]
b_ls,f_ls=linspace(2,3,centered=False)

print(b_ls)
print(f_ls)
#-------------------------------------#
[0.  0.2]
[0.4 0.6 0.8]


[ NBEATS ]

3-1. NBEATSBlock

class NBEATSBlock(nn.Module):
    def __init__(
        self,hidden_dim,thetas_dim,
        num_block_layers=4,
        len_back=10,len_fore =5,
        share_thetas=False,dropout=0.1):
        
        #------------ 1) 기본 Setting ---------------------------------#
        super().__init__()
        self.hidden_dim = hidden_dim
        self.thetas_dim = thetas_dim
        self.len_back = len_back
        self.len_fore  = len_fore 
        self.share_thetas = share_thetas
        #--------------------------------------------------------------#
        
		#------------ 2) Layer들 구성하기 --------------------------------#
        fc_stack = [ nn.Linear(len_back, hidden_dim),nn.ReLU() ]
        for _ in range(num_block_layers - 1):
            fc_stack.extend([linear(hidden_dim, hidden_dim, dropout=dropout), nn.ReLU()])
        self.fc = nn.Sequential(*fc_stack)
        #--------------------------------------------------------------#
        
        #------------ 3) Forward & Backward의 theta 공유 여부 -----------#
        if share_thetas:
            self.theta_f_fc = self.theta_b_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
        else:
            self.theta_b_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
            self.theta_f_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
        #--------------------------------------------------------------#
        
    def forward(self, x):
        return self.fc(x)


3-2. NBEATSSeasonalBlock

  • s1_Wf : forward season 1
  • s2_Wf : forward season 2
  • b1_Wf : backward season 1
  • b2_Wf : backward season 2
def season_param(freq,line):
    param = [np.cos(2 * np.pi * i * line) for i in freq]
    param = torch.tensor(param,dtype=torch.float32)
    return  
def trend_param(thetas_dim,line):
    param = [line ** i for i in range(thetas_dim)]
    param = torch.tensor(param, dtype=torch.float32)
    return  
class NBEATSSeasonalBlock(NBEATSBlock):
    def __init__(
        self,hidden_dim,thetas_dim=None,
        num_block_layers=4,
        len_back=10,len_fore =5,
        nb_harmonics=None,
        min_period=1,dropout=0.1):
        if nb_harmonics:
            thetas_dim = nb_harmonics
        else:
            thetas_dim = len_fore 
        self.min_period = min_period
        super().__init__(
            hidden_dim=hidden_dim,thetas_dim=thetas_dim,
            num_block_layers=num_block_layers,
            len_back=len_back,len_fore =len_fore ,
            share_thetas=True,dropout=dropout)
        
		#------------------- Seasonal Parameter ( 학습 대상은 X ) ---------------#
        p1, p2 = (thetas_dim//2,thetas_dim//2) if thetas_dim%2 == 0 else (thetas_dim//2, thetas_dim//2 + 1)
        line_back, line_fore = linspace(len_back, len_fore , centered=True)
        
        s1_Wb = season_param(self.get_freq(p1),line_back) # H/2-1
        s2_Wb = season_param(self.get_freq(p2),line_back)
        s1_Wf = season_param(self.get_freq(p1),line_fore) # H/2-1
        s2_Wf = season_param(self.get_freq(p2),line_fore)
        self.register_buffer("S_backcast", torch.cat([s1_Wb, s2_Wb]))
        self.register_buffer("S_forecast", torch.cat([s1_Wf, s2_Wf]))

        
    def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]:
        #------------------ 1) NBEATS block 통과 ---------------------------#
        ### (아직 Forward / Backward 안 나뉨)
        x = super().forward(x)
        #------------------------------------------------------------------#
        
        #--------------- 2) Forecast & Backcast 생성 (with seasonal) -------------------#
        amplitudes_backward = self.theta_b_fc(x)
        amplitudes_forward = self.theta_f_fc(x)
        backcast = amplitudes_backward.mm(self.S_backcast)
        forecast = amplitudes_forward.mm(self.S_forecast)
        #-------------------------------------------------------------------------------#
        return backcast, forecast

    def get_freq(self, n):
        return np.linspace(0, (self.len_back + self.len_fore ) / self.min_period, n)


3-3. NBEATSTrendBlock

class NBEATSTrendBlock(NBEATSBlock):
    def __init__(
        self,hidden_dim,thetas_dim,
        num_block_layers=4,
        len_back=10,len_fore =5,dropout=0.1):
        super().__init__(
            hidden_dim=hidden_dim,thetas_dim=thetas_dim,
            num_block_layers=num_block_layers,
            len_back=len_back,len_fore =len_fore ,
            share_thetas=True,dropout=dropout)

        #------------------- Trend Parameter ( 학습 대상은 X ) ---------------#
        line_back, line_fore = linspace(len_back, len_fore , centered=True)
        norm = np.sqrt(len_fore  / thetas_dim)  # ensure range of predictions is comparable to input
        
        trend_Wf = trend_param(thetas_dim,line_fore)
        trend_Wb = trend_param(thetas_dim,line_back)
        self.register_buffer("T_forecast", trend_Wf * norm)
        self.register_buffer("T_backcast", trend_Wb * norm)        

        
    def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]:
        #------------------ 1) NBEATS block 통과 ---------------------------#
        ### (아직 Forward / Backward 안 나뉨)
        x = super().forward(x)
        #------------------------------------------------------------------#
        
        #--------------- 2) Forecast & Backcast 생성 (with trned) -------------------#
        backcast = self.theta_b_fc(x).mm(self.T_backcast)
        forecast = self.theta_f_fc(x).mm(self.T_forecast)
        #----------------------------------------------------------------------------#
        return backcast, forecast


3-4. NBEATSGenericBlock

class NBEATSGenericBlock(NBEATSBlock):
    def __init__(
        self,hidden_dim,thetas_dim,
        num_block_layers=4,
        len_back=10,len_fore =5,dropout=0.1):
        super().__init__(
            hidden_dim=hidden_dim,
            thetas_dim=thetas_dim,
            num_block_layers=num_block_layers,
            len_back=len_back,
            len_fore =len_fore ,
            dropout=dropout,
        )
        
		#------------ 일반적인 Backward & Forward Parameter ( 학습 대상 O ) -----------#
        self.backcast_fc = nn.Linear(thetas_dim, len_back)
        self.forecast_fc = nn.Linear(thetas_dim, len_fore )

    def forward(self, x):
        #------------------ 1) NBEATS block 통과 ---------------------------#
        ### (아직 Forward / Backward 안 나뉨)
        x = super().forward(x)
        #------------------------------------------------------------------#

        #--------------- 2) Forecast & Backcast 생성 -------------------#
        theta_b = F.relu(self.theta_b_fc(x))
        theta_f = F.relu(self.theta_f_fc(x))
		#------------------------------------------------------------------#
        return self.backcast_fc(theta_b), self.forecast_fc(theta_f)


4. NBEATs 모델 class

class NBeats(BaseModel)

  • __init__forward 모듈만 소개

여러 블록을 stack할 때, 사용할 수 있는 3가지 선택지

  • 1) generic : NBEATSGenericBlock
  • 2) seasonality : NBEATSSeasonalBlock
  • 3) trend : NBEATSTrendBlock
class NBeats(BaseModel):
    def __init__(
        self,
        stack_types: List[str] = ["trend", "seasonality"],
        num_blocks=[3, 3],
        num_block_layers=[3, 3],
        widths=[32, 512],
        sharing: List[int] = [True, True],
        expansion_coefficient_lengths: List[int] = [3, 7],
        prediction_length: int = 1,
        context_length: int = 1,
        dropout: float = 0.1,
        learning_rate: float = 1e-2,
        log_interval: int = -1,
        log_gradient_flow: bool = False,
        log_val_interval: int = None,
        weight_decay: float = 1e-3,
        loss: MultiHorizonMetric = None,
        reduce_on_plateau_patience: int = 1000,
        backcast_loss_ratio: float = 0.0,
        logging_metrics: nn.ModuleList = None,
        **kwargs,
    ):

        if logging_metrics is None:
            logging_metrics = nn.ModuleList([SMAPE(), MAE(), RMSE(), MAPE(), MASE()])
        if loss is None:
            loss = MASE()
        self.save_hyperparameters()
        super().__init__(loss=loss, logging_metrics=logging_metrics, **kwargs)

        self.net_blocks = nn.ModuleList()
        for stack_id, stack_type in enumerate(stack_types):
            for _ in range(num_blocks[stack_id]):
                #--------------------------------------------------#
                if stack_type == "generic":
                    net_block = NBEATSGenericBlock(
                        units=self.hparams.widths[stack_id],
                        thetas_dim=self.hparams.expansion_coefficient_lengths[stack_id],
                        num_block_layers=self.hparams.num_block_layers[stack_id],
                        backcast_length=context_length,
                        forecast_length=prediction_length,
                        dropout=self.hparams.dropout,
                    )
                #--------------------------------------------------#
                elif stack_type == "seasonality":
                    net_block = NBEATSSeasonalBlock(
                        units=self.hparams.widths[stack_id],
                        num_block_layers=self.hparams.num_block_layers[stack_id],
                        backcast_length=context_length,
                        forecast_length=prediction_length,
                        min_period=self.hparams.expansion_coefficient_lengths[stack_id],
                        dropout=self.hparams.dropout,
                    )
                #--------------------------------------------------#
                elif stack_type == "trend":
                    net_block = NBEATSTrendBlock(
                        units=self.hparams.widths[stack_id],
                        thetas_dim=self.hparams.expansion_coefficient_lengths[stack_id],
                        num_block_layers=self.hparams.num_block_layers[stack_id],
                        backcast_length=context_length,
                        forecast_length=prediction_length,
                        dropout=self.hparams.dropout,
                    )
                else:
                    raise ValueError(f"Unknown stack type {stack_type}")

                self.net_blocks.append(net_block)

                
    def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        target = x["encoder_cont"][..., 0]
        timesteps = self.hparams.context_length + self.hparams.prediction_length
        
        #-------- 예측값들이 들어갈 empty list (of tensor) ------------#
        generic_forecast = [torch.zeros((target.size(0), timesteps), 
                                        dtype=torch.float32, device=self.device)]
        trend_forecast = [torch.zeros((target.size(0), timesteps), 
                                      dtype=torch.float32, device=self.device)]
        seasonal_forecast = [torch.zeros((target.size(0), timesteps),
                                         dtype=torch.float32, device=self.device)]
        forecast = torch.zeros((target.size(0), self.hparams.prediction_length),
                               dtype=torch.float32, device=self.device)
		#--------------------------------------------------------------#
        
        #--------------------------------------------------------------#
        backcast = target  # initialize backcast
        
        ## Feed Forward
        for i, block in enumerate(self.net_blocks):
            # 1) 공통 Block 통과 한 뒤, forecast & backcast 나뉨
            backcast_block, forecast_block = block(backcast)

            # 2) 위의 결과 저장 ( for interpretation )
            full = torch.cat([backcast_block.detach(), forecast_block.detach()], dim=1)
            if isinstance(block, NBEATSTrendBlock):
                trend_forecast.append(full)
            elif isinstance(block, NBEATSSeasonalBlock):
                seasonal_forecast.append(full)
            else:
                generic_forecast.append(full)

            # 3) backcast & forecast 업데이트하기
            backcast = (backcast - backcast_block)  
            forecast = forecast + forecast_block
            
        #--------------------------------------------------------------#
        ## 최종 예측값    
		final_outputs= self.to_network_output(
            # (1) Prediction
            prediction=self.transform_output(forecast, target_scale=x["target_scale"]),
            
            # (2) Backcast
            backcast=self.transform_output(prediction=target - backcast, 
                                           target_scale=x["target_scale"]),

            # (3) Trend
            trend=self.transform_output(torch.stack(trend_forecast, dim=0).sum(0), 
                                        target_scale=x["target_scale"]),
            # (4) Seasonality
            seasonality=self.transform_output(
                torch.stack(seasonal_forecast, dim=0).sum(0), 
                target_scale=x["target_scale"]),

            # (5) Generic
            generic=self.transform_output(torch.stack(generic_forecast, dim=0).sum(0),
                                          target_scale=x["target_scale"])
        )
        return final_outputs


5. Run Experiment

(1) Import Packages

import sys
import pandas as pd

import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from sklearn.preprocessing import scale

from pytorch_forecasting import NBeats, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.data.examples import generate_ar_data

sys.path.append("..")


(2) Data 생성 / 소개

  • 총 40000개의 데이터

    ( 100종류의 데이터(=MTS) & 각 데이터의 length는 400 )

data = generate_ar_data(seasonality=10.0, timesteps=400, n_series=100)
data["static"] = 2
data["date"] = pd.Timestamp("2020-01-01") + pd.to_timedelta(data.time_idx, "D")

figure2


  • validation 데이터
validation = data.series.sample(20)


(3) Dataset 생성

max_encoder_length = 150
max_prediction_length = 20

training_cutoff = data["time_idx"].max() - max_prediction_length


training : train & validation 데이터셋

validation : validation 데이터셋

context_length = max_encoder_length
prediction_length = max_prediction_length

training = TimeSeriesDataSet(
    data[lambda x: x.time_idx < training_cutoff],
    time_idx="time_idx",
    target="value",
    categorical_encoders={"series": NaNLabelEncoder().fit(data.series)},
    group_ids=["series"],
    min_encoder_length=context_length,
    max_encoder_length=context_length,
    max_prediction_length=prediction_length,
    min_prediction_length=prediction_length,
    time_varying_unknown_reals=["value"],
    randomize_length=None,
    add_relative_time_idx=False,
    add_target_scales=False)

validation = TimeSeriesDataSet.from_dataset(training, 
                                            data, 
                                            min_prediction_idx=training_cutoff)


(4) Data Loader

batch_size = 128

train_dataloader = training.to_dataloader(train=True, batch_size=batch_size,
                                          num_workers=2)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size, 
                                          num_workers=2)


(5) Modeling

early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, 
                                    patience=10, verbose=False, mode="min")
trainer = pl.Trainer(
    max_epochs=100,
    gpus=0,
    weights_summary="top",
    gradient_clip_val=0.1,
    callbacks=[early_stop_callback],
    limit_train_batches=15)


사용할 모델 : NBeats

  • net = NBeats.from_dataset( 전체(train+val) 데이터셋 , 기타 등등 )
net = NBeats.from_dataset(
    training, learning_rate=3e-2, log_interval=10, log_val_interval=1, 
    log_gradient_flow=False, weight_decay=1e-2
)


(6) Fit Model

  • trainer.fit(모델, train 데이터로더, validation 데이터로더)
trainer.fit(
    net,
    train_dataloader=train_dataloader,
    val_dataloaders=val_dataloader,
)

Tags:

Categories:

Updated: