( 참고 : Fastcampus 강의 )
[ 11. Asynchronous(비동기적) DP 실습 ]
-
동기적 DP : 모든 state에 대해 연산 \(\rightarrow\) 높은 계산량
-
비동기적 DP : 모든 state를 다 거치지 않음 \(\rightarrow\) 하지만, 수렴성은 보장됨!
1. 환경 설정하기
1-1. Import Packages
import sys; sys.path.append('..')
import matplotlib.pyplot as plt
from src.part2.async_dp import AsyncDP
from src.common.gridworld import GridworldEnv
from src.common.grid_visualization import visualize_value_function, visualize_policy
1-2. Make Environment
nx = 5
ny = 5
env = GridworldEnv([ny, nx])
1-3. Environment 소개
print(env.nS) ## 25
print(env.nA) ## 4
print(env.P_tensor.shape) # 4x25x25
print(env.R_tensor.shape) # 25x4
2. Agent 초기화
asyncDPagent = AsyncDP()
asyncDPagent.set_env(env)
Agent의 input
- \(\gamma\) : 할인율 ( default = 1.0 )
- error_tol : 수렴의 기준 ( default : 1e-5 )
3. Ex 1 ) Inplace Value Iteration
Key Point : “\(V(s)\)만을 유지하고, \(V(s')\) 계산할 때 \(V(s)\)을 참조한다.”
in_place_vi
의 input
- v_init: (optional) value function에 대한 guesstimation
def in_place_vi(self, v_init=None):
if v_init is not None:
value = v_init
else:
value = np.zeros(self.ns)
# info : 각종 값들을 저장할 dictionary
info = dict()
info['v'] = list()
info['pi'] = list()
info['converge'] = None
info['gap'] = list()
steps = 0
while True:
delta_v = 0 # 현재-이전의 value 차이
info['v'].append(value)
pi = self.construct_policy_from_v(value)
info['pi'].append(pi)
"""
임의의 순서로 Sweeping (FUll Sweeping)
순서에 따라 속도에 차이는 있지만, 수렴한다는 사실은 불변
여기서는 "state 순서 0~24"로 수행할 것
"""
for s in range(self.ns):
# (1) V_s를 통해, 현재 state에 대한 Q_s값 계산
qs = self.compute_q_from_v(value)[:, s]
# (2) Q_s값 최대화하는 V_s 계산
v = qs.max(axis=0)
# (3) 차이 계산
delta_v += np.linalg.norm(value[s] - v)
value[s] = v
info['gap'].append(delta_v)
if delta_v < self.error_tol:
info['converge'] = steps
break
else:
steps += 1
return info
Inplace Value Iteration 수행
info_ip_vi = asyncDPagent.in_place_vi()
4. Ex 2 ) Prioirtized sweeping value iteration
Prioirtized sweeping = (1) + (2)
- (1) 우선순위를 준 (Prioirtized)
- Bellman Error에따라 우선순위 결정
-
$$e(s) = \max_{a \in \cal{A}}(R_s^a+ \gamma \sum_{s’\in \cal{S}} P_{ss’}^a V(s’)) - V(s) $$.
- (2) 가치값 업데이트하기 (sweeping)
def prioritized_sweeping_vi(self, v_init=None):
if v_init is not None:
value = v_init
else:
value = np.zeros(self.ns)
info = dict()
info['v'] = list()
info['pi'] = list()
info['converge'] = None
info['gap'] = list()
steps = 0
while True:
# (1) Bellman Error 계산
bellman_errors = value - (self.R.T + self.P.dot(value)).max(axis=0)
state_indices = range(self.ns)
# (2) 우선순위 queue
# error앞에 (-) 붙이는 이유?
# error가 클 수록 높은 순위!
priority_queue = PriorityQueue()
for bellman_error, s_idx in zip(bellman_errors, state_indices):
priority_queue.put((-bellman_error, s_idx))
info['v'].append(value)
pi = self.construct_policy_from_v(value)
info['pi'].append(pi)
delta_v = 0
# (3) queue가 empty될 때 까지
while not priority_queue.empty():
be, s = priority_queue.get()
qs = self.compute_q_from_v(value)[:, s]
v = qs.max(axis=0)
delta_v += np.linalg.norm(value[s] - v)
value[s] = v
info['gap'].append(delta_v)
if delta_v < self.error_tol:
info['converge'] = steps
break
else:
steps += 1
return info
Prioirtized sweeping value iteration 수행
info_ps_vi = asyncDPagent.prioritized_sweeping_vi()
5. Ex 3 ) Partial Sweeping value iteration
update_prob
: 얼마의 확률로 state를 update할지?- ex)
update_prob=1.0
: 모든 \(s\)를 다 udpate- 반드시 update를 다 하지 않아도, 수렴은 보장된다 ( 여러 번 반복할 경우 )
def in_place_vi_partial_update(self,v_init=None,update_prob=0.5,
num_iters: int = 100):
if v_init is not None:
value = v_init
else:
value = np.zeros(self.ns)
info = dict()
info['v'] = list()
info['pi'] = list()
info['gap'] = list()
for steps in range(num_iters):
delta_v = 0
# Update할 지 말지 결정하는 perform_update ( 확률 : update_prob )
for s in range(self.ns):
perform_update = np.random.binomial(size=1, n=1, p=update_prob)
if not perform_update:
continue
# 이하 과정은 위와 동일
qs = self.compute_q_from_v(value)[:, s]
v = qs.max(axis=0)
delta_v += np.linalg.norm(value[s] - v)
value[s] = v
info['gap'].append(delta_v)
info['v'].append(value.copy())
pi = self.construct_policy_from_v(value)
info['pi'].append(pi)
return info
Parital sweeping VI 수행
- update probability : 0.2
- 총 iteration 횟수 : 100
info_ip_partial_vi = asyncDPagent.in_place_vi_partial_update(update_prob=0.2,
num_iters=100)