Seq2Seq 구현

import random
import tensorflow as tf
from konlpy.tag import Okt

1) Hyperparameter

EPOCHS = 50
NUM_WORDS=2000


2) Modeling

1. Encoder

  • embedding : 64차원

  • lstm : 512 nodes ( return_state = True : Hidden & Cell state 반환 받기 위해 )

  • LSTM의 예측값은 필요 없다

    ( 실제 정답(Y)을 다음 state의 input으로 넣을 것이기 때문이다 )

class Encoder(tf.keras.Model):
    def __init__(self):
        super(Encoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64) # maximum of 2000 words
        self.lstm = tf.keras.layers.LSTM(512,return_state=True)
    
    def call(self,x,training=False,mask=None):
        x = self.emb(x)
        _,h,c = self.lstm(x) # output of LSTM : Y, H(hidden state), C(cell state)
        return h,c


2. Decoder

  • embedding & LSTM & Dense
  • Dense : 2000개 단어에 대한 확률 값
class Decoder(tf.keras.Model):
    def __init__(self):
        super(Decoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64)
        self.lstm = tf.keras.layers.LSTM(512, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(NUM_WORDS,activation='softmax') 
    
    def call(self,inputs,training=False, mask=None):
        x,h,c = inputs
        x = self.emb(x)
        x,h,c = self.lstm(x, initial_state=[h,c])
        return self.dense(x), h,c


3. Seq2Seq

class Seq2seq(tf.keras.Model):
    def __init__(self,sos,eos):
        super(Seq2seq,self).__init__()
        self.enc= Encoder()
        self.dec = Decoder()
        self.sos = sos
        self.eos = eos
    
    def call(self, inputs, training=False, mask=None):
        # (1) Training
        if training is True:
            x,y = inputs
            h,c = self.enc(x) # encode ( input : X / output : H(Hidden) & C(Cell) )
            y,_,_ = self.dec((y,h,c)) # decode ( input : previous actual Y, H, C / output : predicted Y )
            return y # final : return the predicted words!
        
        # (2) Testing
        else :
            x = inputs # no label data
            h,c = self.enc(x) # encode 
            y = tf.conver_to_tensor(self.sos) # initial y : sos
            y = tf.reshape(y,(1,1))            
            seq = tf.TensorArray(tf.int32, 64)
            
            for idx in tf.range(64):
                y,h,c = self.dec([y,h,c]) # input : y(previous prediction),embedded h&c
                                                    # output : y(Softmax result)
                y = tf.cast(tf.argmax(y, axis=-1), dtype=tf.int32)  # get the biggest prob
                y = tf.reshape(y,(1,1))
                seq = seq.write(idx,y)  # ex. (idx:5, result : Boy)
                if y==self.eos:
                    break
            return tf.reshape(seq.stack(), (1,64))


3) Train & Test function

@tf.function
def train_step(model, inputs, labels, loss_object, optimizer, train_loss,train_accuracy):
    output_labels = labels[:,1:] # real answer (2,3,4..100)
    shifted_labels = labels[:,:-1] # give label as an input to next state (1,2,3...,99)
    
    with tf.GradientTape() as tape:
        # input variable : inputs(H,C) + shifted_labels(previous label)
        predictions = model([inputs, shifted_labels], training=True) 
        loss = loss_object(output_labels, predictions) # (real answer) VS (predicted answer)
    gradients = tape.gradient(loss, model.trainable_variables)
    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(output_labels, predictions)
    
@tf.function
def test_step(model,inputs):
    return model(inputs, training=False)


4) Final Dataset

dataset_file = 'chatbot_data.csv'
okt = Okt()

## 1. preparing Q&A dataset
with open(dataset_file,'r',encoding="utf-8") as file:
    lines = file.readlines()
    seq = [' '.join(okt.morphs(line)) for line in lines] # 형태소 분석

questions = seq[::2] # even numbers 
answers = ['\t' + lines for lines in seq[1::2]]

num_samples = len(questions)
perm = list(range(num_samples)) # 
random.seed(0)
random.shuffle(perm)

train_q = list()
train_a = list()
test_q = list()
test_a = list()

for idx, qna in enumerate(zip(questions, answers)):
    q,a= qna
    # train_test split : (8 : 2)
    if perm[idx] > num_samples//5:
        train_q.append(q)
        train_a.append(a)
    else:
        test_q.append(q)
        test_a.append(a)

# 2. Tokenize ( word -> number )& filtering
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=NUM_WORDS,
                                                 filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~')
tokenizer.fit_on_texts(train_q+train_a) # fit tokenizer
train_q_seq = tokenizer.texts_to_sequences(train_q)
train_a_seq = tokenizer.texts_to_sequences(train_a)
test_q_seq = tokenizer.texts_to_sequences(test_q)
test_a_seq = tokenizer.texts_to_sequences(test_a)

# 3. Padding ( train : pre_pad, test : post_pad)
x_train = tf.keras.preprocessing.sequence.pad_sequences(train_q_seq,
                                                       value=0,padding='pre',maxlen=64)
y_train = tf.keras.preprocessing.sequence.pad_sequences(train_a_seq,
                                                       value=0,padding='post',maxlen=64)
x_test = tf.keras.preprocessing.sequence.pad_sequences(test_q_seq,
                                                       value=0,padding='pre',maxlen=64)
y_test = tf.keras.preprocessing.sequence.pad_sequences(test_a_seq,
                                                       value=0,padding='post',maxlen=64)

# 4. Final Dataset
train_ds = tf.data.Dataset.from_tensor_slices((x_train,y_train)).shuffle(10000).batch(32).prefetch(1024)
test_ds = tf.data.Dataset.from_tensor_slices((x_test,y_test)).batch(1).prefetch(1024)


5) Model, Losses, Optimizer

model = Seq2seq(sos=tokenizer.word_index['\t'],
               eos=tokenizer.word_index['\n'])
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')


6) Train

for epoch in range(EPOCHS):
    # train in one epoch
    for seqs, labels in train_ds:
        train_step(model,seqs,labels,loss_object,optimizer, train_loss, train_accuracy)
        
    # result
    template = 'Epoch {}, Loss : {}, Accuracy : {}'
    if epoch%10==0:
        print(template.format(epoch+1,
                              train_loss.result(),
                              train_accuracy.result()*100))
    
    train_loss.reset_states()
    train_accuracy.reset_states()
Epoch 1, Loss : 0.4995741844177246, Accuracy : 89.34902954101562
Epoch 11, Loss : 0.41853445768356323, Accuracy : 92.2663803100586
Epoch 21, Loss : 0.3441966474056244, Accuracy : 93.5553207397461


7) Test

for test_seq, test_labels in test_ds:
    pred = test_step(model, test_seq) # model & x input
    question_text = tokenizer.sequences_to_texts(test_seq.numpy())
    real_text = tokenizer.sequences_to_texts(test_labels.numpy())
    pred_text = tokenizer.sequences_to_texts(pred.numpy())
    print('_________________________________')
    print('question: ', question_text)
    print('real answer: ', real_text)
    print('predicted answer: ', pred_text)