( 참고 : Fastcampus 강의 )

[ 6. Policy Iteration 실습 2 ]


1. 환경 설정하기

1-1. Import Packages

import sys; sys.path.append('..') 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from src.part2.tensorized_dp import TensorDP
from src.common.gridworld import GridworldEnv
from src.common.grid_visualization import visualize_value_function, visualize_policy

np.random.seed(0)


1-2. Make Environment

nx = 5
ny = 5
env = GridworldEnv([ny, nx])


1-3. Environment 소개

print(env.nS) ## 25
print(env.nA) ## 4
print(env.P_tensor.shape) # 4x25x25
print(env.R_tensor.shape) # 25x4


2. Agent 초기화

dp_agent = TensorDP(gamma=1.0,error_tol=1e-5)

Agent의 input

  • \(\gamma\) : 할인율 ( default = 1.0 )
  • error_tol : 수렴의 기준 ( default : 1e-5 )


Agent에게 환경을 설정해주기

def set_env(self, env, policy=None):
        self.env = env
        # policy를 주어주지 않으면, default로 uniform하게!
        if policy is None:
            self.policy = np.ones([env.nS, env.nA]) / env.nA

		# state 개수(25), action 개수(4), transition matrix, reward 설정
        self.ns = env.nS
        self.na = env.nA
        self.P = env.P_tensor  # [num. actions x num. states x num. states]
        self.R = env.R_tensor  # [num. actions x num. states]

        print("Tensor DP agent initialized")
        print("Environment spec:  Num. state = {} | Num. actions = {} ".format(env.nS, env.nA))
dp_agent.set_env(env)


3. Policy Iteration (1) Policy Evaluation

\[T^{\pi}(V) \leftarrow R^{\pi} + \gamma P^{\pi} V\]
  • \(R^{\pi}\) 의 25개의 요소는 \(R^{\pi}_s\), where \(s=1,...,25\) ( $$R^{\pi} \in \mathbb{R}^{ \cal{S} }$$ )
    • $$R^{\pi}s = \sum{a \in \cal{A}} \pi(a s) R_s^a $$.
  • \(P^{\pi}\) 의 25x25개의 요소는 \(P^{\pi}_{ss'}\), where \(s,s'=1,...,25\) ( $$R^{\pi} \in \mathbb{R}^{ \cal{S} \times \cal{S} }$$ )
    • $$P^{\pi}{ss’} = \sum{a \in \cal{A}} \pi(a s) P_{ss’}^a $$.


3-1. \(R^{\pi}\) 계산하기

Agent의 (1) Policy, (2) Return, (3) weighted Return

policy = dp_agent.policy 
R = dp_agent.env.R_tensor
weigthed_R = policy * R 

print(policy.shape) # 25x4
print(R.shape) # 25x4
print(weigthed_R.shape) # 25x4


3-2. \(P^{\pi}\) 계산하기

df = pd.DataFrame(dp_agent.get_p_pi(dp_agent.policy))
print(df.shape) # 25x25


3-3. Policy Evaluation 코드

policy_evaluation 함수 소개

  • Goal : policy(정책) 평가하기!
  • INPUT
    • policy: 평가하고자 하는 policy
    • v_init: initial value function
  • OUTPUT
    • v_pi: 해당 policy에 맞는 value function
def policy_evaluation(self, policy=None, v_init=None):
    if policy is None:
        policy = self.policy
    r_pi = self.get_r_pi(policy)  # 25(상태) x 1(보상값)
    p_pi = self.get_p_pi(policy)  # 25(상태) x 25(상태)

    if v_init is None:
        v_old = np.zeros(self.ns)
    else:
        v_old = v_init

    while True:
        v_new = r_pi + self.gamma * np.matmul(p_pi, v_old) # value function 업데이트
        err = np.linalg.norm(v_new - v_old) # 수렴 여부 확인
        if err <= self.error_tol:
            break
        else:
            v_old = v_new
    return v_new


Policy evaluation 수행하기

v_pi = dp_agent.policy_evaluation()
print(v_pi.shape) # 25 ( 현재 policy하에서, 25개의 state에 대한 value값들 )

v_old = v_pi 


3-4. 결과 시각화

fig, ax = plt.subplots(1,2, figsize=(12,6))
visualize_value_function(ax[0], v_pi, nx, ny)
_ = ax[0].set_title("Value pi")

visualize_policy(ax[1], dp_agent.policy, nx, ny)
_ = ax[1].set_title("Policy")

figure2

  • 오른쪽 그림에서도 알 수 있듯, 아직 policy가 개선된 것이 아님!
  • 단지 현재 policy대로 했을 경우, 얻게 되는 value function을 계산한 것일 뿐! ( = policy EVALUATION )


4. Policy Iteration (2) Policy Improvement

[ Algorithm ]

Step 1) \(V^{\pi}(s)\) 와 \(P\), \(R\) 를 이용해 \(Q^{\pi}(s,a)\) 를 계산한다.

  • \[Q^\pi(s,a) = R_s^{a} + \gamma \Sigma_{s' \in \cal{S}}P_{ss'}^aV^{\pi}(s')\]
Step 2) 개선된 정책 $$\pi’(a s)\(을 가장 높은\)Q^{\pi}(s,a)\(값을 주는\)a$$ 에 대해서 1로, 나머지는 0으로


4-1. Policy Improvement 코드

def policy_improvement(self, policy=None, v_pi=None):
    if policy is None:
        policy = self.policy
	if v_pi is None:
        v_pi = self.policy_evaluation(policy)

	# (1) [Step 1] Compute Q_pi(s,a) from V_pi(s)
	r_pi = self.get_r_pi(policy)
	q_pi = r_pi + self.P.dot(v_pi)

	# (2) [Step 2] Greedy improvement
	policy_improved = np.zeros_like(policy)
	policy_improved[np.arange(q_pi.shape[1]), q_pi.argmax(axis=0)] = 1
	return policy_improved


Policy Improvement 수행하기

p_new = dp_agent.policy_improvement()
dp_agent.set_policy(p_new) # 정책 update하기


방금 개선한 policy에 따라 value function 계산하기

v_pi = dp_agent.policy_evaluation()
v_new = v_pi 


4-2. 정책 개선 정리

BETTER policy \(\rightarrow\) HIGHER value function

\(\pi' \geq \pi\).

\(\pi' \geq \pi \leftrightarrow V_{\pi'}(s) \geq V_{\pi}(s) \forall s \in S\).


앞서서 improvement이전과 이후의 value function을 각각 v_oldv_new에 저장하였다.

delta_v = v_new - v_old
sum(delta_v<0) # 0

모든 state에서의 value가 더 커진 것을 확인할 수 있다.


4-3. 결과 시각화

fig, ax = plt.subplots(1,2, figsize=(12,6))
visualize_value_function(ax[0], v_pi, nx, ny)
_ = ax[0].set_title("Value pi")
visualize_policy(ax[1], dp_agent.policy, nx, ny)
_ = ax[1].set_title("Policy")

figure2