[ 5. Policy Iteration 실습 1 ]
( https://sumniya.tistory.com/10?category=781573 의 도움을 받아 코드를 작성하였습니다 )
import numpy as np
1. get_state
def get_state(s,a,grid_size):
# agent가 취할 수 있는 4가지 행동(A)
A = [(-1,0),(1,0),(0,-1),(0,1)]
s[0] += A[a][0]
s[1] += A[a][1]
# agent가 grid 밖으로 벗어날 수는 없다
if s[0]<0:
s[0]=0
elif s[0]>grid_size-1:
s[0]=grid_size-1
if s[1]<0:
s[1]=0
elif s[1]>grid_size-1:
s[1]=grid_size-1
return s[0],s[1]
# ex) 4x4크기의 grid 세계
# 시작하는 state : (0,0)~(3,3)
# action 3 (즉 (0,-1))을 했을 때
# 위치하게 되는 놓이게 되는 state는?
for i in range(3):
for j in range(3):
x,y = get_state(s=[i,j],a=3,grid_size=4)
print(x,y)
0 1
0 2
0 3
1 1
1 2
1 3
2 1
2 2
2 3
2. Policy Evaluation
getting the “value function”
\(v_{\pi}(s) = E[R_{t+1}+\gamma R_{t+2} + ... \mid S_t = s]\) 구하기
def policy_eval(grid_size,action,policy,iter_num,reward=-1,dis=0.9):
post_value_table = np.zeros([grid_size,grid_size])
for i in range(iter_num):
episode_table = np.zeros([grid_size,grid_size])
for m in range(grid_size):
for n in range(grid_size):
if (m==n==0) or (m==n==grid_size-1):
value_t = 0
else :
value_t = 0
for a in action:
m_, n_ = get_state([m,n],a,grid_size) # get s(t+1)
value = policy[m][n][a]*(reward+dis*post_value_table[m_][n_])
value_t += value # get v(t+1)
episode_table[m][n] = round(value_t,2)
i += 1
if i%20 ==0:
print('Iteration : {} \n {} \n'.format(i,episode_table))
post_value_table = episode_table
return episode_table
다음의 상황을 가정해보자. ( grid의 크기 : 4x4 )
grid_size = 4
action = [0,1,2,3]
policy = np.empty([grid_size,grid_size,len(action)])
for i in range(grid_size):
for j in range(grid_size):
for k in range(len(action)):
if i==j and ((i==0) or (i==grid_size)):
policy[i][j]=0.00
else :
policy[i][j]=0.25
policy (array) 해석
- policy 안에 있는 4개의 큰 array는 “grid의 행 위치”
- 4개의 큰 array안에 있는 각각의 행은 “grid의 열 위치”
- 4개의 큰 array안에 있는 각각의 열은 “action의 종류(상,하,좌,우)를 할 확률”
policy
array([[[0. , 0. , 0. , 0. ],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25]],
[[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25]],
[[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25]],
[[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25],
[0.25, 0.25, 0.25, 0.25]]])
최종적으로 얻게 되는 value ( 200 번의 iteration을 반복한 이후 )
final_value = policy_eval(grid_size,action,policy,200)
Iteration : 20
[[ 0. -5.08 -6.83 -7.32]
[-5.08 -6.35 -6.89 -6.83]
[-6.83 -6.89 -6.35 -5.08]
[-7.32 -6.83 -5.08 0. ]]
Iteration : 40
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 60
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 80
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 100
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 120
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 140
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 160
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 180
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
Iteration : 200
[[ 0. -5.27 -7.11 -7.63]
[-5.27 -6.59 -7.16 -7.11]
[-7.11 -7.16 -6.59 -5.27]
[-7.63 -7.11 -5.27 0. ]]
3. Policy Improvement
getting the “optimal policy”
위 2.Policy Evaluation에서 구한 Value Function을 바탕으로, Policy Improvement를 시행한다.
\[\pi(s) = greedy(v_{\pi}) = \underset{a\in A}{argmax}\; \gamma \sum_{s'\in S}^{ }P_{sa}(s')V^{*}(s')\]def policy_imp(value,action,policy,grid_size,reward=-1):
act_class = ['UP','DOWN','LEFT','RIGHT']
act_table = []
for m in range(grid_size):
for n in range(grid_size):
Q_list = []
# outside the grid
if (m==n==0) or (m==n==grid_size-1):
act_table.append('DONE')
else :
# find the best action
for a in range(len(action)):
m_,n_ = get_state([m,n],a,grid_size)
Q_list.append(value[m_][n_])
max_act = [action_index for action_index,val
in enumerate(Q_list) if val==max(Q_list)]
# update policy
policy[m][n] =np.zeros(len(action))
for k in max_act:
policy[m][n][k] = (1/len(max_act))
# get action
best_idx = np.argmax(policy[m][n])
act_table.append(act_class[best_idx])
act_table=np.asarray(act_table).reshape((grid_size,grid_size))
print('Optimal policy : ',policy)
print('Best Action to do at each state is ','\n', act_table)
return policy
new_policy = policy_imp(final_value,action,policy,4)
Optimal policy : [[[0. 0. 0. 0. ]
[0. 0. 1. 0. ]
[0. 0. 1. 0. ]
[0. 0.5 0.5 0. ]]
[[1. 0. 0. 0. ]
[0.5 0. 0.5 0. ]
[0. 0.5 0.5 0. ]
[0. 1. 0. 0. ]]
[[1. 0. 0. 0. ]
[0.5 0. 0. 0.5 ]
[0. 0.5 0. 0.5 ]
[0. 1. 0. 0. ]]
[[0.5 0. 0. 0.5 ]
[0. 0. 0. 1. ]
[0. 0. 0. 1. ]
[0.25 0.25 0.25 0.25]]]
Best Action to do at each state is
[['DONE' 'LEFT' 'LEFT' 'DOWN']
['UP' 'UP' 'DOWN' 'DOWN']
['UP' 'UP' 'DOWN' 'DOWN']
['UP' 'RIGHT' 'RIGHT' 'DONE']]
( 위 코드에서는 (Policy Evaluation & Policy Improvement)를 iterative하게 반복하지 않고, 딱 한 번만 시행 (P.E 한 번& P.I한 번)하였다. 위의 문제는 4x4 grid의 매우 간단한 문제여서 여러 번의 반복 없이 한번 만으로도 괜찮은 결과나 나왔지만, 실제의 세상은 더 복잡하여 더 많은 iteration이 필요하다 )