MTAD-GAT 코드 리뷰

( 논문 리뷰 : https://seunghan96.github.io/ts/gnn/ts46/)

1. mtad_gat.py

import torch
import torch.nn as nn

from modules import (
    ConvLayer,
    FeatureAttentionLayer,
    TemporalAttentionLayer,
    GRULayer,
    Forecasting_Model,
    ReconstructionModel,
)
class MTAD_GAT(nn.Module):

    def __init__(
        self,
        n_features, # Number of input features ( = TS의 개수 )
        window_size, # Length of the input sequence ( = BACKcast length )
        out_dim, # Number of features to output 
      	#------------------------------------------------------------------#
        kernel_size=7, # 1-D conv의 kernel size
      	#------------------------------------------------------------------#
        feat_gat_embed_dim=None, # [FEATURE-oriented GAT layer]의 임베딩 차원
        time_gat_embed_dim=None, # [TIME-oriented GAT layer]의 임베딩 차원
        use_gatv2=True, # GAT 대신 GAT-v2 
        #------------------------------------------------------------------#
        gru_n_layers=1, # GRU layer 개수
        gru_hid_dim=150, # GRU 임베딩 차원
      	#------------------------------------------------------------------#
        forecast_n_layers=1,
        forecast_hid_dim=150,
        recon_n_layers=1,
        recon_hid_dim=150,
      	#------------------------------------------------------------------#
        dropout=0.2,
        alpha=0.2
    ):
        super(MTAD_GAT, self).__init__()

        self.conv = ConvLayer(n_features, kernel_size)
        self.feature_gat = FeatureAttentionLayer(n_features, window_size, dropout, alpha, feat_gat_embed_dim, use_gatv2)
        self.temporal_gat = TemporalAttentionLayer(n_features, window_size, dropout, alpha, time_gat_embed_dim, use_gatv2)
        self.gru = GRULayer(3 * n_features, gru_hid_dim, gru_n_layers, dropout)
        self.forecasting_model = Forecasting_Model(gru_hid_dim, forecast_hid_dim, out_dim, forecast_n_layers, dropout)
        self.recon_model = ReconstructionModel(window_size, gru_hid_dim, recon_hid_dim, out_dim, recon_n_layers, dropout)

    def forward(self, x):
        # X의 차원 : (b,n,k)
        # --- b : batch size
        # --- n : window size ( = backcast length )
        # --- k : num of featrues ( = TS의 개수 )
				
        # (1) 1x1 conv로 임베딩 (kernel=7)
        x = self.conv(x)
        
        # (2) 2개의 GAT layer들 거치기 ( + concatenate )
        h_feat = self.feature_gat(x)
        h_temp = self.temporal_gat(x)
        h_cat = torch.cat([x, h_feat, h_temp], dim=2)  # (b, n, (k+k+k) )
	
  			# (3) concatenate된 결과를 GRU에 통과시키기
        _, h_end = self.gru(h_cat)
        h_end = h_end.view(x.shape[0], -1) # 마지막 hidden state 펼치기

        # (4) 나온 결과로 예측하기
        ## (4-1) Forecast model
        predictions = self.forecasting_model(h_end)
        ## (4-2) Reconstruction model
        recons = self.recon_model(h_end)

        return predictions, recons


2. modules.py

(1) ConvLayer

  • 맨 처음 input \(x\) 를 1-D conv로 임베딩
class ConvLayer(nn.Module):
    def __init__(self, n_features, kernel_size=7):
        super(ConvLayer, self).__init__()
        self.padding = nn.ConstantPad1d((kernel_size - 1) // 2, 0.0)
        self.conv = nn.Conv1d(in_channels=n_features, out_channels=n_features, kernel_size=kernel_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.permute(0, 2, 1) # (b,n,k) -> (b,k,n)
        x = self.padding(x)
        x = self.relu(self.conv(x))
        return x.permute(0, 2, 1)  #  (b,k,n) -> (b,n,k)


GAT vs GAT-v2

\(e_{i j}=\operatorname{LeakyReLU}\left(w^{\top} \cdot\left(v_{i} \oplus v_{j}\right)\right)\).

GAT

  • step 1) linear layer 거치기
  • step 2) \(v\) = attention을 위한 matrix 생성
  • step 3) \(e\) = Leaky ReLU ( \(w\) x \(v\) )


GAT-v2 ( = Dynamic GAT )

  • step 1) attention을 위한 matrix 생성
  • step 2) \(v\) = Leaky ReLU ( linear layer 거치기 )
  • step 3) \(e\) = \(w\) x \(v\)


(2) FeatureAttentionLayer

  • Feature-oriented GAT
class FeatureAttentionLayer(nn.Module):

    def __init__(self, n_features, window_size, dropout, 
                 alpha, embed_dim=None, use_gatv2=True, use_bias=True):
        super(FeatureAttentionLayer, self).__init__()
        self.n_features = n_features # feature 개수 ( = TS 개수 )
        self.num_nodes = n_features 
        self.window_size = window_size # input의 길이 ( = backcast length )
        self.dropout = dropout
        self.embed_dim = embed_dim if embed_dim is not None else window_size
        self.use_gatv2 = use_gatv2 
        self.use_bias = use_bias

        #-------------------------------------------------------#
        # [ GAT vs GAT-v2 ( = Dynamic GAT ) ]
        # ----- GAT-v2 : GAT + (linear transformation)
        #-------------------------------------------------------#
        if self.use_gatv2:
            self.embed_dim *= 2
            lin_input_dim = 2 * window_size
            a_input_dim = self.embed_dim
        else:
            lin_input_dim = window_size
            a_input_dim = 2 * self.embed_dim

        self.lin = nn.Linear(lin_input_dim, self.embed_dim)
        self.a = nn.Parameter(torch.empty((a_input_dim, 1)))
        nn.init.xavier_uniform_(self.a.data, gain=1.414)

        if self.use_bias:
            self.bias = nn.Parameter(torch.empty(n_features, n_features))

        self.leakyrelu = nn.LeakyReLU(alpha)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # X의 차원 : (b,n,k)
        # --- b : batch size
        # --- n : window size ( = backcast length )
        # --- k : num of featrues ( = TS의 개수 )
        
			  #---------------------------------------------------------------------------------#
      	# (1) e 계산하기 ( GAT / GAT-v2)
        x = x.permute(0, 2, 1) # (b,n,k) -> (b,k,n)
        if self.use_gatv2:
            a_input = self._make_attention_input(x)                 # (b, k, k, 2*window_size)
            a_input = self.leakyrelu(self.lin(a_input))             # (b, k, k, embed_dim)
            e = torch.matmul(a_input, self.a).squeeze(3)            # (b, k, k, 1)
        else:
            Wx = self.lin(x)                                                  # (b, k, k, embed_dim)
            a_input = self._make_attention_input(Wx)                          # (b, k, k, 2*embed_dim)
            e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3)      # (b, k, k, 1)
        if self.use_bias:
            e += self.bias
				
        #---------------------------------------------------------------------------------#
        # (2) attention weight 계산하기 ( = do(softmax(e)) )
        attention = torch.softmax(e, dim=2)
        attention = torch.dropout(attention, self.dropout, train=self.training)

        #---------------------------------------------------------------------------------#
        # (3) node update
        h = self.sigmoid(torch.matmul(attention, x))
        return h.permute(0, 2, 1)

    def _make_attention_input(self, v):
        """Preparing the feature attention mechanism.
        Creating matrix with all possible combinations of concatenations of node.
        Each node consists of all values of that node within the window
            v1 || v1,
            ...
            v1 || vK,
            v2 || v1,
            ...
            v2 || vK,
            ...
            ...
            vK || v1,
            ...
            vK || vK,
        """
        K = self.num_nodes
        blocks_repeating = v.repeat_interleave(K, dim=1)  # Left-side of the matrix
        blocks_alternating = v.repeat(1, K, 1)  # Right-side of the matrix
        combined = torch.cat((blocks_repeating, blocks_alternating), dim=2)  # (b, K*K, 2*window_size)

        if self.use_gatv2:
            return combined.view(v.size(0), K, K, 2 * self.window_size)
        else:
            return combined.view(v.size(0), K, K, 2 * self.embed_dim)


(3) TemporalAttentionLayer

  • Time-oriented GAT

  • 코드 구현은 위의 (2) FeatureAttentionLayer 와 거의 유사

class TemporalAttentionLayer(nn.Module):


    def __init__(self, n_features, window_size, dropout, alpha, embed_dim=None, use_gatv2=True, use_bias=True):
        super(TemporalAttentionLayer, self).__init__()
				self.n_features = n_features # feature 개수 ( = TS 개수 )
        self.num_nodes = n_features 
        self.window_size = window_size # input의 길이 ( = backcast length )
        self.dropout = dropout
        self.use_gatv2 = use_gatv2
        self.embed_dim = embed_dim if embed_dim is not None else n_features
        self.use_bias = use_bias
        
        if self.use_gatv2:
            self.embed_dim *= 2
            lin_input_dim = 2 * n_features # 차이점 1
            a_input_dim = self.embed_dim
        else:
            lin_input_dim = n_features # 차이점 2
            a_input_dim = 2 * self.embed_dim

        self.lin = nn.Linear(lin_input_dim, self.embed_dim)
        self.a = nn.Parameter(torch.empty((a_input_dim, 1)))
        nn.init.xavier_uniform_(self.a.data, gain=1.414)

        if self.use_bias:
            self.bias = nn.Parameter(torch.empty(window_size, window_size))  # 차이점 3

        self.leakyrelu = nn.LeakyReLU(alpha)
        self.sigmoid = nn.Sigmoid()

        
    def forward(self, x):
      	# X의 차원 : (b,n,k)
        # --- b : batch size
        # --- n : window size ( = backcast length )
        # --- k : num of featrues ( = TS의 개수 )
        #-------------------------------------------------------------#
        # 차이점 4 : permutation이 없음
        if self.use_gatv2:
            a_input = self._make_attention_input(x)              # (b, n, n, 2*n_features)
            a_input = self.leakyrelu(self.lin(a_input))          # (b, n, n, embed_dim)
            e = torch.matmul(a_input, self.a).squeeze(3)         # (b, n, n, 1)
        else:
            Wx = self.lin(x)                                                  # (b, n, n, embed_dim)
            a_input = self._make_attention_input(Wx)                          # (b, n, n, 2*embed_dim)
            e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3)      # (b, n, n, 1)
        if self.use_bias:
            e += self.bias  # (b, n, n, 1)
            
				#---------------------------------------------------------------------------------#
        # (2) attention weight 계산하기 ( = do(softmax(e)) )
        attention = torch.softmax(e, dim=2)
        attention = torch.dropout(attention, self.dropout, train=self.training)

        #---------------------------------------------------------------------------------#
        # (3) node update하기
        h = self.sigmoid(torch.matmul(attention, x))    # (b, n, k)
        return h

      
    def _make_attention_input(self, v): # 차이점 5
        """Preparing the temporal attention mechanism.
        Creating matrix with all possible combinations of concatenations of node values:
            (v1, v2..)_t1 || (v1, v2..)_t1
            (v1, v2..)_t1 || (v1, v2..)_t2
            ...
            ...
            (v1, v2..)_tn || (v1, v2..)_t1
            (v1, v2..)_tn || (v1, v2..)_t2
        """

        K = self.num_nodes
        blocks_repeating = v.repeat_interleave(K, dim=1)  # Left-side of the matrix
        blocks_alternating = v.repeat(1, K, 1)  # Right-side of the matrix
        combined = torch.cat((blocks_repeating, blocks_alternating), dim=2)

        if self.use_gatv2:
            return combined.view(v.size(0), K, K, 2 * self.n_features)
        else:
            return combined.view(v.size(0), K, K, 2 * self.embed_dim)


(4) GRULayer

  • \(x \rightarrow z\).
class GRULayer(nn.Module):
    def __init__(self, in_dim, hid_dim, n_layers, dropout):
        super(GRULayer, self).__init__()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.dropout = 0.0 if n_layers == 1 else dropout
        self.gru = nn.GRU(in_dim, hid_dim, num_layers=n_layers, 
                          batch_first=True, dropout=self.dropout)

    def forward(self, x):
        out, h = self.gru(x)
        out, h = out[-1, :, :], h[-1, :, :]  
        return out, h # 마지막 layer의 hidden/out state 뽑아내기


(5) RNNDecoder

  • Reconstruction model에서 사용 될 decoder
  • GRU 사용
  • \(z \rightarrow x\).
class RNNDecoder(nn.Module):
    def __init__(self, in_dim, hid_dim, n_layers, dropout):
        super(RNNDecoder, self).__init__()
        self.in_dim = in_dim
        self.dropout = 0.0 if n_layers == 1 else dropout
        self.rnn = nn.GRU(in_dim, hid_dim, n_layers,
                          batch_first=True, dropout=self.dropout)

    def forward(self, x):
        decoder_out, _ = self.rnn(x)
        return decoder_out


(6) ReconstructionModel

  • 사용 모델 : 위에서 구현한 GRU 기반 디코더

  • input & output

    • input = GRU layer의 “마지막 hidden state”

    • output = FC ( DECODER ( (backcast length만큼 복제한) input ) )

class ReconstructionModel(nn.Module):
    def __init__(self, window_size, in_dim, hid_dim, out_dim, n_layers, dropout):
        super(ReconstructionModel, self).__init__()
        self.window_size = window_size
        self.decoder = RNNDecoder(in_dim, hid_dim, n_layers, dropout)
        self.fc = nn.Linear(hid_dim, out_dim)

    def forward(self, x):
        h_end = x
        h_end_rep = h_end.repeat_interleave(self.window_size, dim=1).view(x.size(0), self.window_size, -1)
        decoder_out = self.decoder(h_end_rep)
        out = self.fc(decoder_out)
        return out


(7) Forecasting_Model

  • 사용 모델 : FC

  • input & output

    • input = GRU layer의 “마지막 hidden state”

    • output = FC ( x )

class Forecasting_Model(nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim, n_layers, dropout):
        super(Forecasting_Model, self).__init__()
        layers = [nn.Linear(in_dim, hid_dim)]
        for _ in range(n_layers - 1):
            layers.append(nn.Linear(hid_dim, hid_dim))
        layers.append(nn.Linear(hid_dim, out_dim))
        self.layers = nn.ModuleList(layers)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        for i in range(len(self.layers) - 1):
            x = self.relu(self.layers[i](x))
            x = self.dropout(x)
        return self.layers[-1](x)

Categories: ,

Updated: