import numpy as np
import random
from itertools import product
class GridWorld:
def __init__(self, size=5):
self.size = size
self.actions = [(0,1), (0,-1), (1,0), (-1,0)] # Right, Left, Down, Up
self.features = np.zeros((size, size, 4)) # 4 features per state
self._generate_features()
def _generate_features(self):
# Feature 1: Distance from center
center = self.size // 2
for i, j in product(range(self.size), range(self.size)):
self.features[i,j,0] = -np.sqrt((i-center)**2 + (j-center)**2)
# Feature 2: Distance from corners
for i, j in product(range(self.size), range(self.size)):
min_corner_dist = min(
np.sqrt(i**2 + j**2), # Top-left
np.sqrt(i**2 + (self.size-1-j)**2), # Top-right
np.sqrt((self.size-1-i)**2 + j**2), # Bottom-left
np.sqrt((self.size-1-i)**2 + (self.size-1-j)**2) # Bottom-right
)
self.features[i,j,1] = -min_corner_dist
# Feature 3: Distance from edges
for i, j in product(range(self.size), range(self.size)):
edge_dist = min(i, j, self.size-1-i, self.size-1-j)
self.features[i,j,2] = -edge_dist
# Feature 4: Random terrain (fixed)
np.random.seed(42)
self.features[:,:,3] = np.random.randn(self.size, self.size)
def get_features(self, state):
return self.features[state[0], state[1]]
def get_next_state(self, state, action):
next_state = (state[0] + action[0], state[1] + action[1])
if 0 <= next_state[0] < self.size and 0 <= next_state[1] < self.size:
return next_state
return state
class MaxMarginIRL:
def __init__(self, env, gamma=0.99, learning_rate=0.01):
self.env = env
self.gamma = gamma
self.lr = learning_rate
self.n_features = env.features.shape[2]
self.weights = np.random.randn(self.n_features)
def compute_state_values(self, policy):
V = np.zeros((self.env.size, self.env.size))
theta = 0.01
while True:
delta = 0
for i, j in product(range(self.env.size), range(self.env.size)):
v = V[i,j]
action = policy[i,j]
next_state = self.env.get_next_state((i,j), action)
reward = np.dot(self.weights, self.env.get_features((i,j)))
V[i,j] = reward + self.gamma * V[next_state[0], next_state[1]]
delta = max(delta, abs(v - V[i,j]))
if delta < theta:
break
return V
def update(self, expert_trajectories, n_iterations=100):
for _ in range(n_iterations):
# Compute expert feature expectations
expert_fe = np.zeros(self.n_features)
for trajectory in expert_trajectories:
for state, _ in trajectory:
expert_fe += self.env.get_features(state)
expert_fe /= len(expert_trajectories)
# Find optimal policy under current weights
policy = self.find_optimal_policy()
# Compute learner feature expectations
learner_fe = np.zeros(self.n_features)
states_visited = self.sample_trajectories(policy, n_trajectories=10)
for state in states_visited:
learner_fe += self.env.get_features(state)
learner_fe /= len(states_visited)
# Update weights
gradient = expert_fe - learner_fe
self.weights += self.lr * gradient
def find_optimal_policy(self):
policy = np.zeros((self.env.size, self.env.size, 2), dtype=int)
for i, j in product(range(self.env.size), range(self.env.size)):
max_value = float('-inf')
best_action = self.env.actions[0]
for action in self.env.actions:
next_state = self.env.get_next_state((i,j), action)
value = np.dot(self.weights, self.env.get_features(next_state))
if value > max_value:
max_value = value
best_action = action
policy[i,j] = best_action
return policy