( 참고 : Fastcampus 강의 )

[ 11. Asynchronous(비동기적) DP 실습 ]

  • 동기적 DP : 모든 state에 대해 연산 \(\rightarrow\) 높은 계산량

  • 비동기적 DP : 모든 state를 다 거치지 않음 \(\rightarrow\) 하지만, 수렴성은 보장됨!


1. 환경 설정하기

1-1. Import Packages

import sys; sys.path.append('..')
import matplotlib.pyplot as plt

from src.part2.async_dp import AsyncDP
from src.common.gridworld import GridworldEnv
from src.common.grid_visualization import visualize_value_function, visualize_policy


1-2. Make Environment

nx = 5
ny = 5
env = GridworldEnv([ny, nx])


1-3. Environment 소개

print(env.nS) ## 25
print(env.nA) ## 4
print(env.P_tensor.shape) # 4x25x25
print(env.R_tensor.shape) # 25x4


2. Agent 초기화

asyncDPagent = AsyncDP()
asyncDPagent.set_env(env)

Agent의 input

  • \(\gamma\) : 할인율 ( default = 1.0 )
  • error_tol : 수렴의 기준 ( default : 1e-5 )


3. Ex 1 ) Inplace Value Iteration

Key Point : “\(V(s)\)만을 유지하고, \(V(s')\) 계산할 때 \(V(s)\)을 참조한다.”

in_place_vi의 input

  • v_init: (optional) value function에 대한 guesstimation
def in_place_vi(self, v_init=None):
    if v_init is not None:
        value = v_init
    else:
        value = np.zeros(self.ns)
      
    # info : 각종 값들을 저장할 dictionary
    info = dict()
    info['v'] = list()
    info['pi'] = list()
    info['converge'] = None
    info['gap'] = list()

    steps = 0
    while True:
        delta_v = 0 # 현재-이전의 value 차이
        info['v'].append(value)
        pi = self.construct_policy_from_v(value)
        info['pi'].append(pi)

		"""
        임의의 순서로 Sweeping (FUll Sweeping)
        순서에 따라 속도에 차이는 있지만, 수렴한다는 사실은 불변
        여기서는 "state 순서 0~24"로 수행할 것
		"""
        for s in range(self.ns):
            # (1) V_s를 통해, 현재 state에 대한 Q_s값 계산
            qs = self.compute_q_from_v(value)[:, s]
            # (2) Q_s값 최대화하는 V_s 계산
            v = qs.max(axis=0)  
			# (3) 차이 계산
            delta_v += np.linalg.norm(value[s] - v)
            value[s] = v
            
        info['gap'].append(delta_v)
        if delta_v < self.error_tol:
            info['converge'] = steps
            break
        else:
            steps += 1
    return info


Inplace Value Iteration 수행

info_ip_vi = asyncDPagent.in_place_vi()


4. Ex 2 ) Prioirtized sweeping value iteration

Prioirtized sweeping = (1) + (2)

  • (1) 우선순위를 준 (Prioirtized)
    • Bellman Error에따라 우선순위 결정
    • $$e(s) = \max_{a \in \cal{A}}(R_s^a+ \gamma \sum_{s’\in \cal{S}} P_{ss’}^a V(s’)) - V(s) $$.
  • (2) 가치값 업데이트하기 (sweeping)


def prioritized_sweeping_vi(self, v_init=None):
    if v_init is not None:
        value = v_init
    else:
        value = np.zeros(self.ns)

    info = dict()
    info['v'] = list()
    info['pi'] = list()
    info['converge'] = None
    info['gap'] = list()

    steps = 0
    while True:
        # (1) Bellman Error 계산
        bellman_errors = value - (self.R.T + self.P.dot(value)).max(axis=0)
        state_indices = range(self.ns)

        # (2) 우선순위 queue
        # error앞에 (-) 붙이는 이유?
        # error가 클 수록 높은 순위!
        priority_queue = PriorityQueue()
        for bellman_error, s_idx in zip(bellman_errors, state_indices):
            priority_queue.put((-bellman_error, s_idx))

        info['v'].append(value)
        pi = self.construct_policy_from_v(value)
        info['pi'].append(pi)
        delta_v = 0
		
        # (3) queue가 empty될 때 까지
        while not priority_queue.empty():
            be, s = priority_queue.get()
            qs = self.compute_q_from_v(value)[:, s]
            v = qs.max(axis=0) 
            delta_v += np.linalg.norm(value[s] - v)
            value[s] = v
        info['gap'].append(delta_v)

        if delta_v < self.error_tol:
            info['converge'] = steps
            break
        else:
            steps += 1
    return info


Prioirtized sweeping value iteration 수행

info_ps_vi = asyncDPagent.prioritized_sweeping_vi()


5. Ex 3 ) Partial Sweeping value iteration

  • update_prob : 얼마의 확률로 state를 update할지?
  • ex) update_prob=1.0 : 모든 \(s\)를 다 udpate
    • 반드시 update를 다 하지 않아도, 수렴은 보장된다 ( 여러 번 반복할 경우 )
def in_place_vi_partial_update(self,v_init=None,update_prob=0.5,
                                   num_iters: int = 100):
        if v_init is not None:
            value = v_init
        else:
            value = np.zeros(self.ns)
        info = dict()
        info['v'] = list()
        info['pi'] = list()
        info['gap'] = list()

        for steps in range(num_iters):
            delta_v = 0
            # Update할 지 말지 결정하는 perform_update ( 확률 : update_prob )
            for s in range(self.ns):
                perform_update = np.random.binomial(size=1, n=1, p=update_prob)
                if not perform_update:
                    continue
				# 이하 과정은 위와 동일
                qs = self.compute_q_from_v(value)[:, s]
                v = qs.max(axis=0)  
                delta_v += np.linalg.norm(value[s] - v)
                value[s] = v
                
            info['gap'].append(delta_v)
            info['v'].append(value.copy())
            pi = self.construct_policy_from_v(value)
            info['pi'].append(pi)

        return info


Parital sweeping VI 수행

  • update probability : 0.2
  • 총 iteration 횟수 : 100
info_ip_partial_vi = asyncDPagent.in_place_vi_partial_update(update_prob=0.2, 
                                                             num_iters=100)