Attention 구현

import random
import tensorflow as tf
from konlpy.tag import Okt

1) Hyperparameter

EPOCHS = 50
NUM_WORDS=2000


2) Modeling


1. Encoder

  • embedding : maximum word length : 2000 & 64 embedding nodes
  • lstm : 512 nodes ( return_state = True : to get the Hidden & Cell state )
  • no need of (predicted y) in LSTM ! will give (real y) as a input of next state
class Encoder(tf.keras.Model):
    def __init__(self):
        super(Encoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64) # maximum of 2000 words
        #self.lstm = tf.keras.layers.LSTM(512,return_state=True)
        self.lstm = tf.keras.layers.LSTM(512,return_sequences=True, return_state=True)
    
    def call(self,x,training=False,mask=None):
        x = self.emb(x)
        #_,h,c = self.lstm(x) 
        H,h,c = self.lstm(x) # H : 모든 hidden state에 대한 output 필요! 
        return H,h,c


2. Decoder

  • embedding & LSTM & Dense
  • Dense : give probability of 2000 words
  • give output of lstm as a input of the next state
class Decoder(tf.keras.Model):
    def __init__(self):
        super(Decoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64)
        self.lstm = tf.keras.layers.LSTM(512, return_sequences=True, return_state=True)
        self.att = tf.keras.layers.Attention()
        self.dense = tf.keras.layers.Dense(NUM_WORDS,activation='softmax') 
    
    def call(self,inputs,training=False, mask=None):
        x,s0,c0,H = inputs 
        # x : shifted output
        # s0 : (initial : context) & output of decoder hidden state
        # c0 : (initial : context) & output of decoder cell state
        # H : EVERY hidden state output
        x = self.emb(x)
        S,h,c = self.lstm(x, initial_state=[s0,c0])
        
        S_ = tf.concat([s0[:,tf.newaxis,:],S[:,:-1,:]],axis=1) # Query (initial : context + after : output of decoder hidden state)
        A = self.att([S_,H]) 
        y = tf.concat([S,A], axis=-1)
        
        return self.dense(y), h,c


3. Attention_s2s

class Attention_s2s(tf.keras.Model):
    def __init__(self,sos,eos):
        super(Seq2seq,self).__init__()
        self.enc= Encoder()
        self.dec = Decoder()
        self.sos = sos
        self.eos = eos
    
    def call(self, inputs, training=False, mask=None):
        # (1) Training
        if training is True:
            x,y = inputs
            H,h,c = self.enc(x) # encode ( input : X / output : H(Hidden) & C(Cell) )
            y,_,_ = self.dec((y,h,c,H)) # decode ( input : previous actual Y, H, C / output : predicted Y )
            return y # final : return the predicted words!
        
        # (2) Testing
        else :
            x = inputs # no label data
            H,h,c = self.enc(x) # encode 
            y = tf.conver_to_tensor(self.sos) # initial y : sos
            y = tf.reshape(y,(1,1))            
            seq = tf.TensorArray(tf.int32, 64)
            
            for idx in tf.range(64):
                y,h,c = self.dec([y,h,c,H]) # input : y(previous prediction),embedded h&c
                                                    # output : y(Softmax result)
                y = tf.cast(tf.argmax(y, axis=-1), dtype=tf.int32)  # get the biggest prob
                y = tf.reshape(y,(1,1))
                seq = seq.write(idx,y)  # ex. (idx:5, result : Boy)
                if y==self.eos:
                    break
            return tf.reshape(seq.stack(), (1,64))


3) Train & Test function

@tf.function
def train_step(model, inputs, labels, loss_object, optimizer, train_loss,train_accuracy):
    output_labels = labels[:,1:] # real answer (2,3,4..100)
    shifted_labels = labels[:,:-1] # give label as an input to next state (1,2,3...,99)
    
    with tf.GradientTape() as tape:
        # input variable : inputs(H,C) + shifted_labels(previous label)
        predictions = model([inputs, shifted_labels], training=True) 
        loss = loss_object(output_labels, predictions) # (real answer) VS (predicted answer)
    gradients = tape.gradient(loss, model.trainable_variables)
    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(output_labels, predictions)
    
@tf.function
def test_step(model,inputs):
    return model(inputs, training=False)


4) Final Dataset

dataset_file = 'chatbot_data.csv'
okt = Okt()

## 1. preparing Q&A dataset
with open(dataset_file,'r',encoding="utf-8") as file:
    lines = file.readlines()
    seq = [' '.join(okt.morphs(line)) for line in lines] # 형태소 분석

questions = seq[::2] # even numbers 
answers = ['\t' + lines for lines in seq[1::2]]

num_samples = len(questions)
perm = list(range(num_samples)) # 
random.seed(0)
random.shuffle(perm)

train_q = list()
train_a = list()
test_q = list()
test_a = list()

for idx, qna in enumerate(zip(questions, answers)):
    q,a= qna
    # train_test split : (8 : 2)
    if perm[idx] > num_samples//5:
        train_q.append(q)
        train_a.append(a)
    else:
        test_q.append(q)
        test_a.append(a)

# 2. Tokenize ( word -> number )& filtering
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=NUM_WORDS,
                                                 filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~')
tokenizer.fit_on_texts(train_q+train_a) # fit tokenizer
train_q_seq = tokenizer.texts_to_sequences(train_q)
train_a_seq = tokenizer.texts_to_sequences(train_a)
test_q_seq = tokenizer.texts_to_sequences(test_q)
test_a_seq = tokenizer.texts_to_sequences(test_a)

# 3. Padding ( train : pre_pad, test : post_pad)
x_train = tf.keras.preprocessing.sequence.pad_sequences(train_q_seq,
                                                       value=0,padding='pre',maxlen=64)
y_train = tf.keras.preprocessing.sequence.pad_sequences(train_a_seq,
                                                       value=0,padding='post',maxlen=65)
x_test = tf.keras.preprocessing.sequence.pad_sequences(test_q_seq,
                                                       value=0,padding='pre',maxlen=64)
y_test = tf.keras.preprocessing.sequence.pad_sequences(test_a_seq,
                                                       value=0,padding='post',maxlen=65)

# 4. Final Dataset
train_ds = tf.data.Dataset.from_tensor_slices((x_train,y_train)).shuffle(10000).batch(32).prefetch(1024)
test_ds = tf.data.Dataset.from_tensor_slices((x_test,y_test)).batch(1).prefetch(1024)


5) Model, Losses, Optimizer

model = Seq2seq(sos=tokenizer.word_index['\t'],
               eos=tokenizer.word_index['\n'])
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')


6) Train

for epoch in range(EPOCHS):
    # train in one epoch
    for seqs, labels in train_ds:
        train_step(model,seqs,labels,loss_object,optimizer, train_loss, train_accuracy)
        
    # result
    template = 'Epoch {}, Loss : {}, Accuracy : {}'
    if epoch%10==0:
        print(template.format(epoch,
                              train_loss.result(),
                              train_accuracy.result()*100))
    
    train_loss.reset_states()
    train_accuracy.reset_states()
Epoch 0, Loss : 2.722676992416382, Accuracy : 84.54730224609375
Epoch 10, Loss : 0.4059935212135315, Accuracy : 92.79056549072266
Epoch 20, Loss : 0.32857412099838257, Accuracy : 93.7108383178711
Epoch 30, Loss : 0.27873319387435913, Accuracy : 94.25516510009766
Epoch 40, Loss : 0.23898127675056458, Accuracy : 94.72118377685547


7) Test

for test_seq, test_labels in test_ds:
    pred = test_step(model, test_seq) # model & x input
    question_text = tokenizer.sequences_to_texts(test_seq.numpy())
    real_text = tokenizer.sequences_to_texts(test_labels.numpy())
    pred_text = tokenizer.sequences_to_texts(pred.numpy())
    print('_________________________________')
    print('question: ', question_text)
    print('real answer: ', real_text)
    print('predicted answer: ', pred_text)