[code review] N-BEATS: NEURAL BASIS EXPANSION ANALYSIS FOR INTERPRETABLE TIME SERIES FORECASTING
1. linear
def linear(in_dim, out_dim, bias=True, dropout: int = None):
layer = nn.Linear(in_dim, out_dim, bias=bias)
if dropout is not None:
return nn.Sequential(nn.Dropout(dropout), layer)
else:
return layer
2. linspace
len_back
: Length of backcast
len_fore
: Length of forecast
def linspace(len_back: int, len_fore : int, centered: bool = False) -> Tuple[np.ndarray, np.ndarray]:
if centered:
norm = max(len_back, len_fore )
start = -len_back
stop = len_fore - 1
else:
norm = len_back + len_fore
start = 0
stop = len_back + len_fore - 1
lin_space = np.linspace(start/norm, stop/norm,
len_back+len_fore,dtype=np.float32)
b_ls = lin_space[:len_back]
f_ls = lin_space[len_back:]
return b_ls, f_ls
b_ls,f_ls=linspace(2,3,centered=True)
print(b_ls)
print(f_ls)
#-------------------------------------#
[-0.6666667 -0.33333334]
[0. 0.33333334 0.6666667 ]
b_ls,f_ls=linspace(2,3,centered=False)
print(b_ls)
print(f_ls)
#-------------------------------------#
[0. 0.2]
[0.4 0.6 0.8]
[ NBEATS
]
3-1. NBEATSBlock
class NBEATSBlock(nn.Module):
def __init__(
self,hidden_dim,thetas_dim,
num_block_layers=4,
len_back=10,len_fore =5,
share_thetas=False,dropout=0.1):
#------------ 1) 기본 Setting ---------------------------------#
super().__init__()
self.hidden_dim = hidden_dim
self.thetas_dim = thetas_dim
self.len_back = len_back
self.len_fore = len_fore
self.share_thetas = share_thetas
#--------------------------------------------------------------#
#------------ 2) Layer들 구성하기 --------------------------------#
fc_stack = [ nn.Linear(len_back, hidden_dim),nn.ReLU() ]
for _ in range(num_block_layers - 1):
fc_stack.extend([linear(hidden_dim, hidden_dim, dropout=dropout), nn.ReLU()])
self.fc = nn.Sequential(*fc_stack)
#--------------------------------------------------------------#
#------------ 3) Forward & Backward의 theta 공유 여부 -----------#
if share_thetas:
self.theta_f_fc = self.theta_b_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
else:
self.theta_b_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
self.theta_f_fc = nn.Linear(hidden_dim, thetas_dim, bias=False)
#--------------------------------------------------------------#
def forward(self, x):
return self.fc(x)
3-2. NBEATSSeasonalBlock
s1_Wf
: forward season 1s2_Wf
: forward season 2b1_Wf
: backward season 1b2_Wf
: backward season 2
def season_param(freq,line):
param = [np.cos(2 * np.pi * i * line) for i in freq]
param = torch.tensor(param,dtype=torch.float32)
return
def trend_param(thetas_dim,line):
param = [line ** i for i in range(thetas_dim)]
param = torch.tensor(param, dtype=torch.float32)
return
class NBEATSSeasonalBlock(NBEATSBlock):
def __init__(
self,hidden_dim,thetas_dim=None,
num_block_layers=4,
len_back=10,len_fore =5,
nb_harmonics=None,
min_period=1,dropout=0.1):
if nb_harmonics:
thetas_dim = nb_harmonics
else:
thetas_dim = len_fore
self.min_period = min_period
super().__init__(
hidden_dim=hidden_dim,thetas_dim=thetas_dim,
num_block_layers=num_block_layers,
len_back=len_back,len_fore =len_fore ,
share_thetas=True,dropout=dropout)
#------------------- Seasonal Parameter ( 학습 대상은 X ) ---------------#
p1, p2 = (thetas_dim//2,thetas_dim//2) if thetas_dim%2 == 0 else (thetas_dim//2, thetas_dim//2 + 1)
line_back, line_fore = linspace(len_back, len_fore , centered=True)
s1_Wb = season_param(self.get_freq(p1),line_back) # H/2-1
s2_Wb = season_param(self.get_freq(p2),line_back)
s1_Wf = season_param(self.get_freq(p1),line_fore) # H/2-1
s2_Wf = season_param(self.get_freq(p2),line_fore)
self.register_buffer("S_backcast", torch.cat([s1_Wb, s2_Wb]))
self.register_buffer("S_forecast", torch.cat([s1_Wf, s2_Wf]))
def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]:
#------------------ 1) NBEATS block 통과 ---------------------------#
### (아직 Forward / Backward 안 나뉨)
x = super().forward(x)
#------------------------------------------------------------------#
#--------------- 2) Forecast & Backcast 생성 (with seasonal) -------------------#
amplitudes_backward = self.theta_b_fc(x)
amplitudes_forward = self.theta_f_fc(x)
backcast = amplitudes_backward.mm(self.S_backcast)
forecast = amplitudes_forward.mm(self.S_forecast)
#-------------------------------------------------------------------------------#
return backcast, forecast
def get_freq(self, n):
return np.linspace(0, (self.len_back + self.len_fore ) / self.min_period, n)
3-3. NBEATSTrendBlock
class NBEATSTrendBlock(NBEATSBlock):
def __init__(
self,hidden_dim,thetas_dim,
num_block_layers=4,
len_back=10,len_fore =5,dropout=0.1):
super().__init__(
hidden_dim=hidden_dim,thetas_dim=thetas_dim,
num_block_layers=num_block_layers,
len_back=len_back,len_fore =len_fore ,
share_thetas=True,dropout=dropout)
#------------------- Trend Parameter ( 학습 대상은 X ) ---------------#
line_back, line_fore = linspace(len_back, len_fore , centered=True)
norm = np.sqrt(len_fore / thetas_dim) # ensure range of predictions is comparable to input
trend_Wf = trend_param(thetas_dim,line_fore)
trend_Wb = trend_param(thetas_dim,line_back)
self.register_buffer("T_forecast", trend_Wf * norm)
self.register_buffer("T_backcast", trend_Wb * norm)
def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]:
#------------------ 1) NBEATS block 통과 ---------------------------#
### (아직 Forward / Backward 안 나뉨)
x = super().forward(x)
#------------------------------------------------------------------#
#--------------- 2) Forecast & Backcast 생성 (with trned) -------------------#
backcast = self.theta_b_fc(x).mm(self.T_backcast)
forecast = self.theta_f_fc(x).mm(self.T_forecast)
#----------------------------------------------------------------------------#
return backcast, forecast
3-4. NBEATSGenericBlock
class NBEATSGenericBlock(NBEATSBlock):
def __init__(
self,hidden_dim,thetas_dim,
num_block_layers=4,
len_back=10,len_fore =5,dropout=0.1):
super().__init__(
hidden_dim=hidden_dim,
thetas_dim=thetas_dim,
num_block_layers=num_block_layers,
len_back=len_back,
len_fore =len_fore ,
dropout=dropout,
)
#------------ 일반적인 Backward & Forward Parameter ( 학습 대상 O ) -----------#
self.backcast_fc = nn.Linear(thetas_dim, len_back)
self.forecast_fc = nn.Linear(thetas_dim, len_fore )
def forward(self, x):
#------------------ 1) NBEATS block 통과 ---------------------------#
### (아직 Forward / Backward 안 나뉨)
x = super().forward(x)
#------------------------------------------------------------------#
#--------------- 2) Forecast & Backcast 생성 -------------------#
theta_b = F.relu(self.theta_b_fc(x))
theta_f = F.relu(self.theta_f_fc(x))
#------------------------------------------------------------------#
return self.backcast_fc(theta_b), self.forecast_fc(theta_f)
4. NBEATs 모델 class
class NBeats(BaseModel)
__init__
와forward
모듈만 소개
여러 블록을 stack할 때, 사용할 수 있는 3가지 선택지
- 1) generic :
NBEATSGenericBlock
- 2) seasonality :
NBEATSSeasonalBlock
- 3) trend :
NBEATSTrendBlock
class NBeats(BaseModel):
def __init__(
self,
stack_types: List[str] = ["trend", "seasonality"],
num_blocks=[3, 3],
num_block_layers=[3, 3],
widths=[32, 512],
sharing: List[int] = [True, True],
expansion_coefficient_lengths: List[int] = [3, 7],
prediction_length: int = 1,
context_length: int = 1,
dropout: float = 0.1,
learning_rate: float = 1e-2,
log_interval: int = -1,
log_gradient_flow: bool = False,
log_val_interval: int = None,
weight_decay: float = 1e-3,
loss: MultiHorizonMetric = None,
reduce_on_plateau_patience: int = 1000,
backcast_loss_ratio: float = 0.0,
logging_metrics: nn.ModuleList = None,
**kwargs,
):
if logging_metrics is None:
logging_metrics = nn.ModuleList([SMAPE(), MAE(), RMSE(), MAPE(), MASE()])
if loss is None:
loss = MASE()
self.save_hyperparameters()
super().__init__(loss=loss, logging_metrics=logging_metrics, **kwargs)
self.net_blocks = nn.ModuleList()
for stack_id, stack_type in enumerate(stack_types):
for _ in range(num_blocks[stack_id]):
#--------------------------------------------------#
if stack_type == "generic":
net_block = NBEATSGenericBlock(
units=self.hparams.widths[stack_id],
thetas_dim=self.hparams.expansion_coefficient_lengths[stack_id],
num_block_layers=self.hparams.num_block_layers[stack_id],
backcast_length=context_length,
forecast_length=prediction_length,
dropout=self.hparams.dropout,
)
#--------------------------------------------------#
elif stack_type == "seasonality":
net_block = NBEATSSeasonalBlock(
units=self.hparams.widths[stack_id],
num_block_layers=self.hparams.num_block_layers[stack_id],
backcast_length=context_length,
forecast_length=prediction_length,
min_period=self.hparams.expansion_coefficient_lengths[stack_id],
dropout=self.hparams.dropout,
)
#--------------------------------------------------#
elif stack_type == "trend":
net_block = NBEATSTrendBlock(
units=self.hparams.widths[stack_id],
thetas_dim=self.hparams.expansion_coefficient_lengths[stack_id],
num_block_layers=self.hparams.num_block_layers[stack_id],
backcast_length=context_length,
forecast_length=prediction_length,
dropout=self.hparams.dropout,
)
else:
raise ValueError(f"Unknown stack type {stack_type}")
self.net_blocks.append(net_block)
def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
target = x["encoder_cont"][..., 0]
timesteps = self.hparams.context_length + self.hparams.prediction_length
#-------- 예측값들이 들어갈 empty list (of tensor) ------------#
generic_forecast = [torch.zeros((target.size(0), timesteps),
dtype=torch.float32, device=self.device)]
trend_forecast = [torch.zeros((target.size(0), timesteps),
dtype=torch.float32, device=self.device)]
seasonal_forecast = [torch.zeros((target.size(0), timesteps),
dtype=torch.float32, device=self.device)]
forecast = torch.zeros((target.size(0), self.hparams.prediction_length),
dtype=torch.float32, device=self.device)
#--------------------------------------------------------------#
#--------------------------------------------------------------#
backcast = target # initialize backcast
## Feed Forward
for i, block in enumerate(self.net_blocks):
# 1) 공통 Block 통과 한 뒤, forecast & backcast 나뉨
backcast_block, forecast_block = block(backcast)
# 2) 위의 결과 저장 ( for interpretation )
full = torch.cat([backcast_block.detach(), forecast_block.detach()], dim=1)
if isinstance(block, NBEATSTrendBlock):
trend_forecast.append(full)
elif isinstance(block, NBEATSSeasonalBlock):
seasonal_forecast.append(full)
else:
generic_forecast.append(full)
# 3) backcast & forecast 업데이트하기
backcast = (backcast - backcast_block)
forecast = forecast + forecast_block
#--------------------------------------------------------------#
## 최종 예측값
final_outputs= self.to_network_output(
# (1) Prediction
prediction=self.transform_output(forecast, target_scale=x["target_scale"]),
# (2) Backcast
backcast=self.transform_output(prediction=target - backcast,
target_scale=x["target_scale"]),
# (3) Trend
trend=self.transform_output(torch.stack(trend_forecast, dim=0).sum(0),
target_scale=x["target_scale"]),
# (4) Seasonality
seasonality=self.transform_output(
torch.stack(seasonal_forecast, dim=0).sum(0),
target_scale=x["target_scale"]),
# (5) Generic
generic=self.transform_output(torch.stack(generic_forecast, dim=0).sum(0),
target_scale=x["target_scale"])
)
return final_outputs
5. Run Experiment
(1) Import Packages
import sys
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from sklearn.preprocessing import scale
from pytorch_forecasting import NBeats, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.data.examples import generate_ar_data
sys.path.append("..")
(2) Data 생성 / 소개
-
총 40000개의 데이터
( 100종류의 데이터(=MTS) & 각 데이터의 length는 400 )
data = generate_ar_data(seasonality=10.0, timesteps=400, n_series=100)
data["static"] = 2
data["date"] = pd.Timestamp("2020-01-01") + pd.to_timedelta(data.time_idx, "D")
- validation 데이터
validation = data.series.sample(20)
(3) Dataset 생성
max_encoder_length = 150
max_prediction_length = 20
training_cutoff = data["time_idx"].max() - max_prediction_length
training
: train & validation 데이터셋
validation
: validation 데이터셋
context_length = max_encoder_length
prediction_length = max_prediction_length
training = TimeSeriesDataSet(
data[lambda x: x.time_idx < training_cutoff],
time_idx="time_idx",
target="value",
categorical_encoders={"series": NaNLabelEncoder().fit(data.series)},
group_ids=["series"],
min_encoder_length=context_length,
max_encoder_length=context_length,
max_prediction_length=prediction_length,
min_prediction_length=prediction_length,
time_varying_unknown_reals=["value"],
randomize_length=None,
add_relative_time_idx=False,
add_target_scales=False)
validation = TimeSeriesDataSet.from_dataset(training,
data,
min_prediction_idx=training_cutoff)
(4) Data Loader
batch_size = 128
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size,
num_workers=2)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size,
num_workers=2)
(5) Modeling
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4,
patience=10, verbose=False, mode="min")
trainer = pl.Trainer(
max_epochs=100,
gpus=0,
weights_summary="top",
gradient_clip_val=0.1,
callbacks=[early_stop_callback],
limit_train_batches=15)
사용할 모델 : NBeats
- net = NBeats.from_dataset(
전체(train+val) 데이터셋
,기타 등등
)
net = NBeats.from_dataset(
training, learning_rate=3e-2, log_interval=10, log_val_interval=1,
log_gradient_flow=False, weight_decay=1e-2
)
(6) Fit Model
- trainer.fit(
모델
,train 데이터로더
,validation 데이터로더
)
trainer.fit(
net,
train_dataloader=train_dataloader,
val_dataloaders=val_dataloader,
)