Hindsight Experience Replay (HER)
1 Overview
Hindsight Experience Replay (HER) was proposed by M. Andrychowicz et al.1
The algorithm performs well for goal oriented, sparse binary reward environment without manual reward shaping. (Moreover reward shaping harms HER permormance.) HER utilizes unsuccessful episodes as if the policy could reach their goals by re-labeling goal afterwards.
There are 4 strategies to pick up pseudo goals.
- final
- A goal become the end of the state in the episode.
- future
k-goals are randomly selected from future states in the episode.- episode
k-goals are randomly selected from states in the episode.- random
k-goals are randomly selected from all the states in the replay buffer.
Except for “final” strategy, hyper parameter k defines the number of
additional goals for each tansition. According to the authors,
“future” strategy with k=4 or k=8 performs best for “pusing”,
“sliding”, and “pick-and-place” environment.
cpprb provides HindsightReplayBuffer class. The class can be
constructed with following parameters;
| Parameter | Type | Description |
|---|---|---|
size |
int |
Buffer Size |
env_dict |
dict |
Specifying environment. (See here) "goal" and "rew" are added automatically, user should not specify them. |
max_episode_len |
int |
Maximum episode length |
reward_func |
Callable[[np.ndarray, np.ndarray, np.ndarray], np.ndarray] |
Batch reward function of \(S \times A \times G \to R\) |
goal_func=None |
Callable[[np.ndarray], np.ndarray] |
Batch function for goal from state: \(S \to G\). If None (default), state itself is considered as goal. |
goal_shape=None |
Iterable[int] |
Goal shape. If None (default), state shape is used. |
state="obs" |
str |
State name in env_dict |
action="act" |
str |
Action name in env_dict |
next_state="next_obs" |
str |
Next state name in env_dict |
strategy="future" |
"future", "episode", "random", or "final" |
Goal sampling strategy |
additional_goals=4 |
int |
The number of additional goals per a transition |
prioritized=True |
bool |
Whether use PER |
The usage is similar to other Replay Buffer classes. One of the major
difference in usage is passing goal to on_episode_end method.
hrb.on_episode_end(goal)2 Example Usage
The following pseudo code shows example usage;
from cpprb import HindsightReplayBuffer
buffer_size = 1e+6
nsteps = 1e+6
max_episode_len = 100
nwarmup = 100
batch_size = 32
hrb = HindsightReplayBuffer(buffer_size,
{"obs": {}, "act": {}, "next_obs": {}},
max_episode_len=max_episode_len,
reward_func=lambda s,a,g: -1*(s!=g))
env = # Initialize Environment
policy = # Initialize Policy
obs = env.reset()
goal = # Set Goal
for i in range(nsteps):
act = policy(obs, goal)
next_obs, _, done, _ = env.step(act)
hrb.add(obs=obs, act=act, next_obs=next_obs)
if done or (i >= max_episode_len):
hrb.on_episode_end(goal)
obs = env.reset()
goal = # Set Goal
else:
obs = next_obs
if hrb.get_stored_size() < nwarmup:
continue
sample = hrb.sample(batch_size)
policy.train(sample)The full example code is as follows;
import os
import datetime
import numpy as np
import gym
from gym.spaces import Box, Discrete
import tensorflow as tf
from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.summary import create_file_writer
from cpprb import HindsightReplayBuffer
class BitFlippingEnv(gym.Env):
"""
bit-flipping environment: https://arxiv.org/abs/1707.01495
* Environment has n-bit state.
* Initial state and goal state are randomly selected.
* Action is one of the 0, ..., n-1, which flips single bit
* Reward is 0 if state == goal, otherwise reward is -1. (Sparse Binary Reward)
Simple RL algorithms tend to fail for large ``n`` like ``n > 40``
"""
def __init__(self, n):
seeds = np.random.SeedSequence().spawn(3)
self.np_random = np.random.default_rng(seeds[0])
self.observation_space = Box(low=0, high=1, shape=(n,), dtype=int)
self.action_space = Discrete(self.n)
self.observation_space.seed(seeds[1].entropy)
self.action_space.seed(seeds[2].entropy)
def step(self, action):
action = int(action)
self.bit[action] = 1 - self.bit[action]
done = (self.bit == self.goal).all()
rew = 0 if done else -1
return self.bit.copy(), rew, done, {}
def reset(self):
self.bit = self.np_random.integers(low=0, high=1, size=self.action_space.n,
endpoint=True, dtype=int)
self.goal = self.np_random.integers(low=0, high=1, size=self.action_space.n,
endpoint=True, dtype=int)
return self.bit.copy()
gamma = 0.99
batch_size = 64
N_iteration = int(1.5e+4)
nwarmup = 100
target_update_freq = 1000
eval_freq = 100
egreedy = 0.1
max_episode_len = 100
nbit = 10
# Log
dir_name = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = os.path.join("logs", dir_name)
writer = create_file_writer(logdir + "/metrics")
writer.set_as_default()
# Env
env = BitFlippingEnv(nbit)
eval_env = BitFlippingEnv(nbit)
model = Sequential([Dense(64,activation='relu',
input_shape=(env.observation_space.shape[0] * 2,)),
Dense(64,activation='relu'),
Dense(env.action_space.n)])
target_model = clone_model(model)
# Loss Function
@tf.function
def Huber_loss(absTD):
return tf.where(absTD > 1.0, absTD, tf.math.square(absTD))
@tf.function
def MSE(absTD):
return tf.math.square(absTD)
loss_func = Huber_loss
optimizer = Adam()
buffer_size = 1e+6
env_dict = {"obs":{"shape": env.observation_space.shape},
"act":{"shape": 1,"dtype": np.ubyte},
"next_obs": {"shape": env.observation_space.shape}}
discount = tf.constant(gamma)
# Prioritized Experience Replay: https://arxiv.org/abs/1511.05952
# See https://ymd_h.gitlab.io/cpprb/features/per/
prioritized = True
# Hindsigh Experience Replay : https://arxiv.org/abs/1707.01495
# See https://ymd_h.gitlab.io/cpprb/features/her/
rb = HindsightReplayBuffer(buffer_size, env_dict,
max_episode_len = max_episode_len,
reward_func = lambda s,a,g: -1*((s!=g).any(axis=1)),
prioritized = prioritized)
if prioritized:
# Beta linear annealing
beta = 0.4
beta_step = (1 - beta)/N_iteration
def sg(state, goal):
state = state.reshape((state.shape[0], -1))
goal = goal.reshape((goal.shape[0], -1))
return tf.constant(np.concatenate((state, goal), axis=1), dtype=tf.float32)
@tf.function
def Q_func(model,obs,act,act_shape):
return tf.reduce_sum(model(obs) * tf.one_hot(act,depth=act_shape), axis=1)
@tf.function
def DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
return gamma*tf.reduce_max(target(next_obs),axis=1)*(1.0-done) + rew
@tf.function
def Double_DQN_target_func(model,target,next_obs,rew,done,gamma,act_shape):
"""
Double DQN: https://arxiv.org/abs/1509.06461
"""
act = tf.math.argmax(model(next_obs),axis=1)
return gamma*tf.reduce_sum(target(next_obs)*tf.one_hot(act,depth=act_shape), axis=1)*(1.0-done) + rew
target_func = Double_DQN_target_func
def evaluate(model,env):
obs = env.reset()
goal = env.goal.copy().reshape((1, -1))
n_episode = 20
i_episode = 0
success = 0
ep = 0
while i_episode < n_episode:
Q = tf.squeeze(model(sg(obs.reshape((1, -1)), goal)))
act = np.argmax(Q)
obs, _, done, _ = env.step(act)
ep += 1
if done or (ep >= max_episode_len):
if done:
success += 1
obs = env.reset()
goal = env.goal.copy().reshape((1, -1))
i_episode += 1
ep = 0
return success / n_episode
# Start Experiment
n_episode = 0
obs = env.reset()
goal = env.goal.copy().reshape((1, -1))
ep = 0
for n_step in range(N_iteration):
if np.random.rand() < egreedy:
act = env.action_space.sample()
else:
Q = tf.squeeze(model(sg(obs.reshape(1, -1), goal)))
act = np.argmax(Q)
next_obs, _, done, info = env.step(act)
ep += 1
rb.add(obs=obs,
act=act,
next_obs=next_obs)
if done or (ep >= max_episode_len):
obs = env.reset()
goal = env.goal.copy().reshape((1, -1))
rb.on_episode_end(goal)
n_episode += 1
ep = 0
else:
obs = next_obs
if rb.get_stored_size() < nwarmup:
continue
if prioritized:
sample = rb.sample(batch_size, beta)
beta += beta_step
else:
sample = rb.sample(batch_size)
weights = sample["weights"].ravel() if prioritized else tf.constant(1.0)
with tf.GradientTape() as tape:
tape.watch(model.trainable_weights)
Q = Q_func(model,
sg(sample["obs"], sample["goal"]),
tf.constant(sample["act"].ravel()),
tf.constant(env.action_space.n))
sample_rew = tf.constant(sample["rew"].ravel())
sample_done = 1.0 + sample_rew
target_Q = tf.stop_gradient(target_func(model,target_model,
sg(sample["next_obs"],sample["goal"]),
sample_rew,
sample_done,
discount,
tf.constant(env.action_space.n)))
absTD = tf.math.abs(target_Q - Q)
loss = tf.reduce_mean(loss_func(absTD)*weights)
grad = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(grad, model.trainable_weights))
tf.summary.scalar("Loss vs training step", data=loss, step=n_step)
if prioritized:
Q = Q_func(model,
sg(sample["obs"], sample["goal"]),
tf.constant(sample["act"].ravel()),
tf.constant(env.action_space.n))
absTD = tf.math.abs(target_Q - Q)
rb.update_priorities(sample["indexes"], absTD)
if n_step % target_update_freq == 0:
target_model.set_weights(model.get_weights())
if n_step % eval_freq == eval_freq-1:
eval_rew = evaluate(model, eval_env)
tf.summary.scalar("success rate vs training step",
data=eval_rew, step=n_step)3 Notes
The number of stored transitions (get_stored_size()) is increased by
calling on_episode_end(goal).
4 Technical Detail
HindsightReplayBuffer internally contains two replay buffers. The
one (rb) is for main buffer, the other (episode_rb) is for the
current episode. At first, transitions are inserted to
episode_rb. At the episode end (on_episode_end), goal relabeling
are executed, and those transitions are moved to rb.
This architecture might be helpful for similar needs such as reward-to-go.