import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class MultiAgentEnvironment:
def __init__(self, num_agents=2, grid_size=5):
self.num_agents = num_agents
self.grid_size = grid_size
self.reset()
def reset(self):
# Initialize random positions for agents
self.agent_positions = np.random.randint(0, self.grid_size, size=(self.num_agents, 2))
self.food_position = np.random.randint(0, self.grid_size, size=2)
return self._get_observations()
def _get_observations(self):
observations = []
for agent_idx in range(self.num_agents):
# Observation includes: agent's position, other agents' positions, food position
obs = np.concatenate([
self.agent_positions[agent_idx],
self.agent_positions[np.arange(self.num_agents) != agent_idx].flatten(),
self.food_position
])
observations.append(obs)
return observations
def step(self, actions):
# Actions: 0=up, 1=down, 2=left, 3=right
rewards = np.zeros(self.num_agents)
# Move agents
for agent_idx, action in enumerate(actions):
if action == 0: # Up
self.agent_positions[agent_idx][0] = max(0, self.agent_positions[agent_idx][0] - 1)
elif action == 1: # Down
self.agent_positions[agent_idx][0] = min(self.grid_size-1, self.agent_positions[agent_idx][0] + 1)
elif action == 2: # Left
self.agent_positions[agent_idx][1] = max(0, self.agent_positions[agent_idx][1] - 1)
elif action == 3: # Right
self.agent_positions[agent_idx][1] = min(self.grid_size-1, self.agent_positions[agent_idx][1] + 1)
# Check for food collection (cooperative reward)
for agent_idx in range(self.num_agents):
if np.array_equal(self.agent_positions[agent_idx], self.food_position):
rewards += 1.0 # All agents get reward when any agent reaches food
self.food_position = np.random.randint(0, self.grid_size, size=2)
# Small penalty for distance to encourage cooperation
for agent_idx in range(self.num_agents):
distance_to_food = np.linalg.norm(self.agent_positions[agent_idx] - self.food_position)
rewards[agent_idx] -= 0.1 * distance_to_food
done = False # In this simple environment, episodes don't end
return self._get_observations(), rewards, done
class IndependentQLearningAgent:
def __init__(self, state_size, action_size, learning_rate=0.01):
self.state_size = state_size
self.action_size = action_size
self.q_table = np.zeros((state_size, action_size))
self.lr = learning_rate
self.gamma = 0.95
self.epsilon = 0.1
def select_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_size)
return np.argmax(self.q_table[state])
def learn(self, state, action, reward, next_state):
old_value = self.q_table[state, action]
next_max = np.max(self.q_table[next_state])
new_value = (1 - self.lr) * old_value + self.lr * (reward + self.gamma * next_max)
self.q_table[state, action] = new_value
# Example usage:
def train_independent_q_learning(num_episodes=1000):
env = MultiAgentEnvironment(num_agents=2, grid_size=5)
agents = [
IndependentQLearningAgent(state_size=10, action_size=4)
for _ in range(env.num_agents)
]
for episode in range(num_episodes):
states = env.reset()
total_rewards = np.zeros(env.num_agents)
for step in range(100): # Max steps per episode
# Select actions
actions = [agent.select_action(state.tobytes()) for agent, state in zip(agents, states)]
# Environment step
next_states, rewards, done = env.step(actions)
# Learn
for agent_idx in range(env.num_agents):
agents[agent_idx].learn(
states[agent_idx].tobytes(),
actions[agent_idx],
rewards[agent_idx],
next_states[agent_idx].tobytes()
)
total_rewards += rewards
states = next_states
if done:
break
if episode % 100 == 0:
print(f"Episode {episode}, Average Rewards: {total_rewards / 100}")
# To run:
# train_independent_q_learning()