( 참고 : Fastcampus 강의 )

[ 34. (paper 1) DQN (Deep Q-Network) code review 1 ]


1. Import Packages

import gym
import numpy as np
import torch
import torch.nn as nn
from IPython.display import YouTubeVideo

from src.part3.MLP import MultiLayerPerceptron as MLP


2. New Environment, “CartPole-v1”

(1) Concept & Goal

cart는 마찰이 없는 평면의 트랙 위에서 움직이고, 중력 또한 작용하기 때문에 가만히 있으면 바닥으로 떨어진다. 이를 떨어지지 않게끔 잘 움직여줘야하는 것이 목표이다. ( = 잘 세워둔다 )

  • 세워둔다 = 막대가 수직으로 부터 15° 이내에 있게 유지하는 것


(2) State, Action, Reward, Discount

상태 \(s\), 행동 \(a\), 보상 \(r\), 감소율 \(\gamma\)

a) state ( 4-dim )

  1. x : 카트의 위치
  2. θ : 막대의 수직 각도
  3. dx/dt : 카트의 속도
  4. dθ/dt : 막대의 각속도

b) action ( 2-dim )

  • +1 / -1 ( 좌/우로 움직이기 )

c) reward

  • 매 1tick step() 마다 1.0 만큼의 보상 \(r\) 을 받음

d) discount

  • \(\gamma = 1.0\).


(3) Code

env = gym.make('CartPole-v1')
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.n

print("state space dimension: {}".format(s_dim))
print("action space dimension: {}".format(a_dim))
state space dimension: 4
action space dimension: 2


3. DQN (Deep Q-Network)

(1) _init_

  • state_dim : state의 차원 ( 여기서는 4 )

  • action_dim : action의 차원 ( 여기서는 2 )

  • qnet : 사용할 Q-network ( NN 사용 )

  • lr : learning rate

  • gamma : discount rate

  • opt : optimizer ( 여기서는 Adam을 사용 )

  • epsilon : \(\epsilon\)-greedy 시, 사용할 \(\epsilon\)값

    ( 학습할 때 update하지만, back prop할때 update하는 것은 X )

  • loss function으로는 MSE 사용

def __init__(self,state_dim: int,action_dim: int,
             qnet: nn.Module,lr: float,
             gamma: float,epsilon: float):
    super(NaiveDQN, self).__init__()
    self.state_dim = state_dim
    self.action_dim = action_dim
    self.qnet = qnet
    self.lr = lr
    self.gamma = gamma
    self.opt = torch.optim.Adam(params=self.qnet.parameters(), lr=lr)
    self.register_buffer('epsilon', torch.ones(1) * epsilon)

    self.criteria = nn.MSELoss()


(2) get_action

figure2

def get_action(self, state):
    qs = self.qnet(state)  # 2d ( batch x action )

    ############# 행동 정책 mu : epsilon-greedy policy ##############
    if self.train:  
        prob = np.random.uniform(0.0, 1.0, 1)
        # (1) Random 행동
        if torch.from_numpy(prob).float() <= self.epsilon:  
            action = np.random.choice(range(self.action_dim))
		# (2) Greedy 행동
		else: 
            action = qs.argmax(dim=-1)

    ############# 평가 정책 pi : greedy policy ##############
	else: 
        action = qs.argmax(dim=-1)
        return int(action)


(3) update_sample

def update_sample(self, s, a, r, s_, done):
    # (1) TARGET
    q_max, q_max_idx = self.qnet(s_).max(dim=-1)
    q_target = r + self.gamma * q_max * (1 - done)
    q_target = q_target.detach() # detach 대신 torch.no_grad()도 OK
    
    # (2) PREDICTION
    q_pred = self.qnet(s)[0, a]
	
    # (3) Loss 계산
    loss = self.criteria(q_pred, q_target)
    self.opt.zero_grad()
    loss.backward()
    self.opt.step()


4. Agent & NN 생성

(1) NN

qnet = MLP(input_dim=s_dim,
           output_dim=a_dim,
           num_neurons=[128],
           hidden_act='ReLU',
           out_act='Identity')


(2) Agent

agent = NaiveDQN(state_dim=s_dim,
                 action_dim=a_dim,
                 qnet=qnet,
                 lr=1e-4,
                 gamma=1.0,
                 epsilon=1.0)


5. 평가 Metric 생성

시간적 비용 절약 위해, 모델의 성능을 평가하기 위해서, 지금까지 얻었던 return들을 weighted sum 하여 측정한다.

  • ex) 지수 이동평균법 (Exponential Moving Average)
class EMAMeter:
    def __init__(self,alpha:float = 0.5):
        self.s = None
        self.alpha = alpha
    
    def update(self, y):        
        if self.s is None:
            self.s = y
        else:
            self.s = self.alpha * y + (1-self.alpha) * self.s


6. Train

(1) Settings

n_episode = 10000
print_log = 500
ema_factor = 0.5
ema = EMAMeter(ema_factor)


(2) Run 10000 epsiodes

for ep in range(n_episode):
    env.reset()  
    # (1) (10,000번의 epsiode 동안의) Cumulative Reward 
    cum_r = 0
    
    # (2) epsiode 끝날때까지 반복
    while True:
    	## (a) state/action -> (b) next state/reward 받기 -> (c) UPDATE
        s = env.state
        s = torch.tensor(s).float().view(1, 4)
        
        a = agent.get_action(s)
        
        s_, r, done, info = env.step(a)
        s_ = torch.tensor(s_).float()
        
        agent.update_sample(s, a, r, s_, done)
        cum_r += r
        if done:
        	############ REWARD 누적하여 저장 ############
            ema.update(cum_r)
            if ep % print_log == 0:
                print("Episode {} || EMA: {} || EPS : {}".format(ep, ema.s, agent.epsilon))
            if ep >= 150:
                agent.epsilon *= 0.999
            break
env.close()