Transformer 구현

( 참고 : “딥러닝을 이용한 자연어 처리 입문” (https://wikidocs.net/book/2155) )

구현 순서

  1. Positional Encoding
  2. Attention Function
  3. Multi-head Attention
  4. Masking
    1. Masking 1) 최대 문장길이 못미치는 것 padding한 부분
    2. Masking 2) Cheating 방지용
  5. Encoder
    1. encoder layer
    2. encoder
  6. Decoder
    1. decoder layer
    2. decoder
  7. Transformer

.


1.Positional Encoding


.


\(\begin{array}{l}P E*_{(p o s, 2 i)}=\sin \left(p o s / 10000^{\left(2 i / d_*{\text {model }}\right)}\right) \\P E*_{(p o s, 2 i+1)}=\cos \left(p o s / 10000^{\left(2 i / d_*{\text {model }}\right)}\right)\end{array}\).

class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, pos, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(pos, d_model)

  def angles(self, pos, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    return pos * angles

  def positional_encoding(self, pos, d_model):
    angle_rads = self.angles(
        position=tf.range(pos, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)

    angle_rads = np.zeros(angle_rads.shape)
    angle_rads[:, 0::2] = tf.math.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = tf.math.cos(angle_rads[:, 1::2])
    pos_encoding = tf.constant(angle_rads)
    pos_encoding = pos_encoding[tf.newaxis, ...]

    print('Finished Positional Encoding! shape : ',pos_encoding.shape)
    return tf.cast(pos_encoding, tf.float32)

  def call(self, x):
    return x + self.pos_encoding[:, :tf.shape(x)[1], :]


2. Attention Function


.


.


\[\text { Attention }(Q, K, V)=\operatorname{softmax}\left(\frac{Q K^{T}}{\sqrt{d_{k}}}\right) V\]
def attention_func(Q, K, V, mask):  
  attention_score = tf.matmul(Q, K, transpose_b=True)
  d_k = tf.cast(tf.shape(K)[-1], tf.float32)
  attention_logits = attention_score / tf.math.sqrt(depth)

  if mask is not None:
    logits += (mask * -1e9)

  attention_weights = tf.nn.softmax(attention_logits, axis=-1)
  output = tf.matmul(attention_weights, V)

  return output, attention_weights


3. Multi-head Attention

.


step 1) $W_Q$, $W_K$, $W_V$에 해당하는 $d_{model}$ 크기의 Dense layer에 태우고

step 2) 지정된 head 수( $\text{num}_{heads}$)만큼 나누고

step 3) (scaled dot-product) Attention

step 4) 나눠졌던 head들 concatenate하고

step 5) $W_O$에 해당하는 Dense layer에 태우기!


class MultiHeadAttention(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads
    self.Q_layer = tf.keras.layers.Dense(units=d_model)
    self.K_layer = tf.keras.layers.Dense(units=d_model)
    self.V_layer = tf.keras.layers.Dense(units=d_model)
    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    Q, K, V, mask = inputs['Q'], inputs['K'], inputs['V'], inputs['mask']
    batch_size = tf.shape(Q)[0]
    
    # step 1) 
    Q = self.Q_layer(Q)
    K = self.K_layer(K)
    V = self.V_layer(V)

    # step 2)
    Q = self.split_heads(Q, batch_size)
    K = self.split_heads(K, batch_size)
    V = self.split_heads(V, batch_size)

    # step 3)
    scaled_attention, _ = attention_func(Q, K, V, mask)
    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # step 4)
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # step 5)
    outputs = self.dense(concat_attention)

    return outputs


4. Masking

Masking

  1. Masking 1) 최대 문장길이 못미치는 것 padding한 부분
  2. Masking 2) Cheating 방지용


4-1. Masking 1


.


# 0인 부분에 mask 씌우기
def masking1(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  return mask[:, tf.newaxis, tf.newaxis, :]


4-2. Masking 2


.


def masing2(x):
  mask1 = masking1(x)
  seq_len = tf.shape(x)[1]
  mask2 = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0) 
  return tf.maximum(mask2, mask1)


5. Encoder

Encoder

  1. encoder layer
  2. encoder


.


5-1. Encoder Layer

def encoder_layer(d_ff, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")

  # Layer (1)
  mask1 = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
  attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'Q': inputs, 'K': inputs, 'V': inputs,'mask': mask1})
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)

  # Layer (2)
  outputs = tf.keras.layers.Dense(units=d_ff, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(inputs=[inputs, mask1], outputs=outputs, name=name)


5-2. Encoder

def encoder(vocab_size, num_layers, d_ff,
            d_model, num_heads, dropout,
            name="encoder"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # Layer (0) ( encoder에 들어가기 이전)
  mask1 = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PE(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # ( Layer (1) ~ Layer(2) ) x num_layers(회)
  for i in range(num_layers):
    outputs = encoder_layer(d_ff=d_ff, d_model=d_model, num_heads=num_heads,
        dropout=dropout, name="encoder_layer_{}".format(i),
    )([outputs, mask1])

  return tf.keras.Model(inputs=[inputs, mask1], outputs=outputs, name=name)


6. Decoder

Decoder

  1. decoder layer
  2. decoder


6-1. Decoder Layer

def decoder_layer(d_ff, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")  
  mask1 = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
  mask2 = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")

  
  # Multi-Head self Attention
  attention1 = MultiHeadAttention(d_model, num_heads, name="attention_1")(inputs={
          'Q': inputs, 'K': inputs, 'V': inputs, 'mask': mask2})
  attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)

  # Decoder-Encoder Attention
  attention2 = MultiHeadAttention(d_model, num_heads, name="attention_2")(inputs={
          'Q': attention1, 'K': enc_outputs, 'V': enc_outputs,'mask': mask1 })
  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

  # Position-wide FFNN
  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention2)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, mask2, mask1],
      outputs=outputs,
      name=name)


6-2. Decoder

def decoder(vocab_size, num_layers, d_ff,d_model, num_heads, dropout,name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')

  mask1 = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
  mask2 = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')
  

  # Layer (0)
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PE(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # ( Layer (1) ~ Layer(2) ) x num_layers(회)
  for i in range(num_layers):
    outputs = decoder_layer(d_ff=d_ff, d_model=d_model, num_heads=num_heads,
        dropout=dropout, name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, mask2, mask1])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, mask2, mask1],
      outputs=outputs,
      name=name)


7. Transformer

def transformer(vocab_size, num_layers, d_ff,d_model, num_heads, dropout,name="transformer"):

  # Encoder 입력
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # Decoder 입력
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

  # Encoder의 mask
  enc_mask1 = tf.keras.layers.Lambda(
      masking1, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)

  # Decoder의 mask 1 ( 최대 길이 못 미치는 padding mask )
  dec_mask1 = tf.keras.layers.Lambda(
      masking1, output_shape=(1, 1, None),
      name='dec_padding_mask')(inputs)

  # Decoder의 mask 2 ( cheating 방지 mask )
  dec_mask2 = tf.keras.layers.Lambda(
      masking2, output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)

  # Encoder 출력 
  enc_outputs = encoder(vocab_size=vocab_size, num_layers=num_layers, d_ff=d_ff,
      d_model=d_model, num_heads=num_heads, dropout=dropout,
  )(inputs=[inputs, enc_mask1]) # 인코더의 입력은 입력 문장과 패딩 마스크

  # Decoder 출력
  dec_outputs = decoder(vocab_size=vocab_size, num_layers=num_layers, d_ff=d_ff,
      d_model=d_model, num_heads=num_heads, dropout=dropout,
  )(inputs=[dec_inputs, enc_outputs, dec_mask2, dec_mask1])

  # 최종 출력
  outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

  return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)