11/6/24, 9:06 PM Reinforcement Learning.
ipynb - Colab
keyboard_arrow_down Question: REINFORCEMENT LEARNING
Reinforcement Learning Environment: GridWorld MDP environment with a 4x4 grid layout, where an agent learns to navigate to a goal while
avoiding obstacles.
The objective is to maximize cumulative rewards by employing two methods: Value Iteration, which computes the optimal policy using Bellman
updates, and Q-Learning, a model-free approach that enables the agent to learn from interactions with the environment.
The agent receives rewards for reaching the goal (+1), penalties for obstacles (-1), and step penalties (-0.1). The dataset is split into training and
evaluation phases, allowing for performance comparison between the two methods.
Install Packages
# Install necessary packages
!pip install gymnasium matplotlib seaborn
Collecting gymnasium
Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Requirement already satisfied: matplotlib in /usr/local/lib/python3.10/dist-packages (3.8.0)
Requirement already satisfied: seaborn in /usr/local/lib/python3.10/dist-packages (0.13.2)
Requirement already satisfied: numpy>=1.21.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (1.26.4)
Requirement already satisfied: cloudpickle>=1.2.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (3.1.0)
Requirement already satisfied: typing-extensions>=4.3.0 in /usr/local/lib/python3.10/dist-packages (from gymnasium) (4.12.2)
Collecting farama-notifications>=0.0.1 (from gymnasium)
Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Requirement already satisfied: contourpy>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.3.0)
Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (0.12.1)
Requirement already satisfied: fonttools>=4.22.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (4.54.1)
Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.4.7)
Requirement already satisfied: packaging>=20.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (24.1)
Requirement already satisfied: pillow>=6.2.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (10.4.0)
Requirement already satisfied: pyparsing>=2.3.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (3.2.0)
Requirement already satisfied: python-dateutil>=2.7 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (2.8.2)
Requirement already satisfied: pandas>=1.2 in /usr/local/lib/python3.10/dist-packages (from seaborn) (2.2.2)
Requirement already satisfied: pytz>=2020.1 in /usr/local/lib/python3.10/dist-packages (from pandas>=1.2->seaborn) (2024.2)
Requirement already satisfied: tzdata>=2022.7 in /usr/local/lib/python3.10/dist-packages (from pandas>=1.2->seaborn) (2024.2)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.10/dist-packages (from python-dateutil>=2.7->matplotlib) (1.16.0)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 958.1/958.1 kB 19.3 MB/s eta 0:00:00
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Import Libraries and Setup
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
import random
import time
from typing import Dict, Tuple
import gymnasium as gym
keyboard_arrow_down Define the GridWorldMDP Class
GridWorld environment implementing MDP principles
States: Grid positions
Actions: Up (0), Right (1), Down (2), Left (3)
Rewards: +1 for goal, -1 for obstacles, -0.1 for steps
class GridWorldMDP:
def __init__(self, size=4):
self.size = size
self.goal = (0, size-1)
self.obstacles = [(1, 1), (2, 2)] # Add some obstacles
self.action_space = 4
self.state_space = size * size
# Transition probabilities (P(s'|s,a))
# For simplicity: 0.8 probability of intended action, 0.2 probability of random action
https://colab.research.google.com/drive/1rjGNjwfKPum7RjOj7TDmUBhU88w1f6Wy#scrollTo=yIoOgdqkv73h&printMode=true 1/5
11/6/24, 9:06 PM Reinforcement Learning.ipynb - Colab
p y p y , p y
self.transition_prob = 0.8
# Initialize state transition and reward matrices
self.initialize_matrices()
def initialize_matrices(self):
states = [(i, j) for i in range(self.size) for j in range(self.size)]
self.P = {} # State transition probabilities
self.R = {} # Rewards
for state in states:
for action in range(self.action_space):
self.P[(state, action)] = self._get_transition_prob(state, action)
self.R[(state, action)] = self._get_reward(state)
def _get_transition_prob(self, state, action):
transitions = {}
next_state = self._get_next_state(state, action)
# Main transition with probability 0.8
transitions[next_state] = self.transition_prob
# Random transitions with probability 0.2
other_actions = [a for a in range(self.action_space) if a != action]
for a in other_actions:
random_next_state = self._get_next_state(state, a)
transitions[random_next_state] = (1 - self.transition_prob) / 3
return transitions
def _get_next_state(self, state, action):
x, y = state
if action == 0: # up
x = max(0, x-1)
elif action == 1: # right
y = min(self.size-1, y+1)
elif action == 2: # down
x = min(self.size-1, x+1)
elif action == 3: # left
y = max(0, y-1)
next_state = (x, y)
return next_state if next_state not in self.obstacles else state
def _get_reward(self, state):
"""Get reward for being in a state"""
if state == self.goal:
return 1.0
elif state in self.obstacles:
return -1.0
else:
return -0.1
keyboard_arrow_down Define the ValueIteration Class
Implementation of Value Iteration algorithm using Bellman Equation
V(s) = max_a [R(s,a) + γ * Σ P(s'|s,a) * V(s')]
class ValueIteration:
def __init__(self, mdp: GridWorldMDP, gamma=0.99, theta=1e-6):
self.mdp = mdp
self.gamma = gamma # Discount factor
self.theta = theta # Convergence threshold
self.V = {(i, j): 0 for i in range(mdp.size) for j in range(mdp.size)} # Value function
self.policy = {} # Optimal policy
def solve(self, max_iterations=1000):
"""Run value iteration to find optimal value function and policy"""
for i in range(max_iterations):
delta = 0
V_new = self.V.copy()
# Update value function for each state using Bellman equation
for state in self.V.keys():
if state == self.mdp.goal or state in self.mdp.obstacles:
continue
https://colab.research.google.com/drive/1rjGNjwfKPum7RjOj7TDmUBhU88w1f6Wy#scrollTo=yIoOgdqkv73h&printMode=true 2/5
11/6/24, 9:06 PM Reinforcement Learning.ipynb - Colab
# Calculate value for each action and take maximum
action_values = []
for action in range(self.mdp.action_space):
transitions = self.mdp.P[(state, action)]
value = self.mdp.R[(state, action)]
# Apply Bellman equation
for next_state, prob in transitions.items():
value += self.gamma * prob * self.V[next_state]
action_values.append(value)
# Update value function and track maximum change
V_new[state] = max(action_values)
delta = max(delta, abs(V_new[state] - self.V[state]))
self.V = V_new
# Check convergence
if delta < self.theta:
break
# Extract optimal policy
self._extract_policy()
def _extract_policy(self):
"""Extract optimal policy from value function"""
for state in self.V.keys():
if state == self.mdp.goal or state in self.mdp.obstacles:
continue
action_values = []
for action in range(self.mdp.action_space):
transitions = self.mdp.P[(state, action)]
value = self.mdp.R[(state, action)]
for next_state, prob in transitions.items():
value += self.gamma * prob * self.V[next_state]
action_values.append(value)
self.policy[state] = np.argmax(action_values)
keyboard_arrow_down Define the QLearningAgent Class
Q-Learning agent with experience replay and improved exploration
Uses Q-learning update: Q(s,a) = Q(s,a) + α[R + γ max_a' Q(s',a') - Q(s,a)]
class QLearningAgent:
def __init__(self, state_size, action_size, learning_rate=0.1, gamma=0.99, epsilon=1.0):
self.state_size = state_size
self.action_size = action_size
self.lr = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
# Initialize Q-table and experience replay buffer
self.q_table = {}
self.experience_buffer = []
self.max_buffer_size = 1000
def get_q_value(self, state, action):
return self.q_table.get((state, action), 0.0)
def choose_action(self, state):
#Epsilon-greedy action selection with optimistic initialization
if random.random() < self.epsilon:
return random.randint(0, self.action_size-1)
# Choose best action based on Q-values
q_values = [self.get_q_value(state, a) for a in range(self.action_size)]
return np.argmax(q_values)
https://colab.research.google.com/drive/1rjGNjwfKPum7RjOj7TDmUBhU88w1f6Wy#scrollTo=yIoOgdqkv73h&printMode=true 3/5
11/6/24, 9:06 PM Reinforcement Learning.ipynb - Colab
def store_experience(self, state, action, reward, next_state, done):
self.experience_buffer.append((state, action, reward, next_state, done))
if len(self.experience_buffer) > self.max_buffer_size:
self.experience_buffer.pop(0)
def learn(self, batch_size=32):
if len(self.experience_buffer) < batch_size:
return
# Sample batch of experiences
batch = random.sample(self.experience_buffer, batch_size)
for state, action, reward, next_state, done in batch:
# Get best next action Q-value
next_q_values = [self.get_q_value(next_state, a) for a in range(self.action_size)]
next_max_q = max(next_q_values)
# Q-learning update (Bellman equation)
current_q = self.get_q_value(state, action)
new_q = current_q + self.lr * (reward + self.gamma * next_max_q * (not done) - current_q)
self.q_table[(state, action)] = new_q
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
Plotting Functions
def plot_value_function(V, size):
#Visualize the value function
plt.figure(figsize=(8, 6))
values = np.zeros((size, size))
for (x, y), value in V.items():
values[x, y] = value
sns.heatmap(values, annot=True, fmt='.2f', cmap='RdYlBu_r')
plt.title('State Value Function')
plt.show()
def plot_policy(policy, size):
#Visualize the policy
plt.figure(figsize=(8, 6))
arrows = ['↑', '→', '↓', '←']
policy_grid = np.empty((size, size), dtype=str)
Main Method
def main():
# Initialize the GridWorld environment
env = GridWorldMDP(size=4)
# Initialize and solve using Value Iteration
value_iter = ValueIteration(env)
print("Running Value Iteration...")
value_iter.solve()
# Visualize the results of Value Iteration
plot_value_function(value_iter.V, env.size)
plot_policy(value_iter.policy, env.size)
# Train the Q-Learning agent
q_agent = QLearningAgent(env.size, env.action_space)
print("\nTraining Q-Learning Agent...")
episodes = 1000
for episode in range(episodes):
state = (env.size - 1, 0) # Start state (bottom-left corner)
total_reward = 0
done = False
while not done:
action = q_agent.choose_action(state)
next_state = env._get_next_state(state, action)
reward = env._get_reward(next_state)
done = next_state == env.goal
# Store experience and learn
https://colab.research.google.com/drive/1rjGNjwfKPum7RjOj7TDmUBhU88w1f6Wy#scrollTo=yIoOgdqkv73h&printMode=true 4/5
11/6/24, 9:06 PM Reinforcement Learning.ipynb - Colab
# Store experience and learn
q_agent.store_experience(state, action, reward, next_state, done)
q_agent.learn()
total_reward += reward
state = next_state
if (episode + 1) % 100 == 0:
print(f"Episode {episode + 1}, Total Reward: {total_reward:.2f}, Epsilon: {q_agent.epsilon:.2f}")
main()
Running Value Iteration...
Training Q-Learning Agent...
Episode 100, Total Reward: 0.50, Epsilon: 0.02
Episode 200, Total Reward: 0.50, Epsilon: 0.01
Episode 300, Total Reward: 0.50, Epsilon: 0.01
Episode 400, Total Reward: 0.50, Epsilon: 0.01
Episode 500, Total Reward: 0.50, Epsilon: 0.01
Episode 600, Total Reward: 0.50, Epsilon: 0.01
Episode 700, Total Reward: 0.50, Epsilon: 0.01
Episode 800, Total Reward: 0.50, Epsilon: 0.01
Episode 900, Total Reward: 0.50, Epsilon: 0.01
Episode 1000, Total Reward: 0.50, Epsilon: 0.01
<Figure size 800x600 with 0 Axes>
https://colab.research.google.com/drive/1rjGNjwfKPum7RjOj7TDmUBhU88w1f6Wy#scrollTo=yIoOgdqkv73h&printMode=true 5/5