桂林医院网站建设,咨询公司管理制度,建筑企业和建设企业区别,保定头条新闻最新今天大家好#xff0c;今天和各位分享一下深度强化学习中的近端策略优化算法#xff08;proximal policy optimization#xff0c;PPO#xff09;#xff0c;并借助 OpenAI 的 gym 环境完成一个小案例#xff0c;完整代码可以从我的 GitHub 中获得#xff1a;
https://gith…大家好今天和各位分享一下深度强化学习中的近端策略优化算法proximal policy optimizationPPO并借助 OpenAI 的 gym 环境完成一个小案例完整代码可以从我的 GitHub 中获得
https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model 1. 算法原理
PPO 算法之所以被提出根本原因在于 Policy Gradient 在处理连续动作空间时 Learning rate 取值抉择困难。Learning rate 取值过小就会导致深度强化学习收敛性较差陷入完不成训练的局面取值过大则导致新旧策略迭代时数据不一致造成学习波动较大或局部震荡。除此之外Policy Gradient 因为在线学习的性质进行迭代策略时原先的采样数据无法被重复利用每次迭代都需要重新采样
同样地置信域策略梯度算法Trust Region Policy OptimizationTRPO虽然利用重要性采样Important-sampling、共轭梯度法求解提升了样本效率、训练速率等但在处理函数的二阶近似时会面临计算量过大以及实现过程复杂、兼容性差等缺陷。
PPO 算法具备 Policy Gradient、TRPO 的部分优点采样数据和使用随机梯度上升方法优化代替目标函数之间交替进行虽然标准的策略梯度方法对每个数据样本执行一次梯度更新但 PPO 提出新目标函数可以实现小批量更新。
鉴于上述问题该算法在迭代更新时观察当前策略在 t 时刻智能体处于状态 s 所采取的行为概率与之前策略所采取行为概率 计算概率的比值来控制新策略更新幅度比值 记作 若新旧策略差异明显且优势函数较大则适当增加更新幅度若 比值越接近 1表明新旧策略差异越小。
优势函数代表在状态 s 下行为 a 相对于均值的偏差。在论文中优势函数 使用 GAEgeneralized advantage estimation来计算 PPO 算法可依据 Actor 网络的更新方式细化为含有自适应 KL-散度KL Penalty的 PPO-Penalty 和含有 Clippped Surrogate Objective 函数的 PPO-Clip。
1PPO-Penalty 基于KL 惩罚项优化目标函数实验证明惩罚项系数 在迭代过程中并非固定值需要动态调整惩罚权重其目标函数 L 可以定义为 惩罚项 的初始值的选择对算法几乎无影响原因是它能在每次迭代时依据新旧策略的 KL 散度做适宜调整首先设置 KL 散度阈值 再通过下面的表达式计算 如果 时证明散度较小需要弱化惩罚力度 调整为
如果 时证明散度较大需要增强惩罚力度 调整为 。
2PPO-Clip 直接对新旧策略比例进行一定程度的 Clip 操作以约束变化幅度。其目标函数的计算方式如下 其中 代表截断超参数一般设定值为 0.2 表示截断函数负责限制比例 在 区间之内以保证收敛性最终 借助 函数选取未截断与截断目标之间的更小值形成目标下限。 可以分为优势函数 A 为正数和负数两种情况其变化趋势如下图所示 如果优势函数为正数需要增大新旧策略比值 然而当 时将不提供额外的激励如果优势函数是负数需要减少新旧策略比值 但在 时不提供额外的激励这使得新旧策略的差异被限制在合理范围内。PPO 本质上基于 Actor-Critic 框架算法流程如下 PPO 算法主要由 Actor 和 Critic 两部分构成Critic 部分更新方式与其他Actor-Critic 类型相似通常采用计算 TD error时序差分误差形式。对于 Actor 的更新方式PPO 可在KLPENL 、CLIPL 之间选择对于当前实验环境稳定性适用性更强的目标函数经过 OpenAI 研究团队实验论证PPO- Clip 比 PPO- Penalty有更好的数据效率和可行性。 2. 代码实现
下面我就采用 Clip 形式的 PPO。模型构建代码如下。下面的模型适用于 action 是离散的情况连续情况的代码可以从我的 GitHub 中获取。
# 代码用于离散环境的模型
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F# ----------------------------------- #
# 构建策略网络--actor
# ----------------------------------- #class PolicyNet(nn.Module):def __init__(self, n_states, n_hiddens, n_actions):super(PolicyNet, self).__init__()self.fc1 nn.Linear(n_states, n_hiddens)self.fc2 nn.Linear(n_hiddens, n_actions)def forward(self, x):x self.fc1(x) # [b,n_states]--[b,n_hiddens]x F.relu(x)x self.fc2(x) # [b, n_actions]x F.softmax(x, dim1) # [b, n_actions] 计算每个动作的概率return x# ----------------------------------- #
# 构建价值网络--critic
# ----------------------------------- #class ValueNet(nn.Module):def __init__(self, n_states, n_hiddens):super(ValueNet, self).__init__()self.fc1 nn.Linear(n_states, n_hiddens)self.fc2 nn.Linear(n_hiddens, 1)def forward(self, x):x self.fc1(x) # [b,n_states]--[b,n_hiddens]x F.relu(x)x self.fc2(x) # [b,n_hiddens]--[b,1] 评价当前的状态价值state_valuereturn x# ----------------------------------- #
# 构建模型
# ----------------------------------- #class PPO:def __init__(self, n_states, n_hiddens, n_actions,actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):# 实例化策略网络self.actor PolicyNet(n_states, n_hiddens, n_actions).to(device)# 实例化价值网络self.critic ValueNet(n_states, n_hiddens).to(device)# 策略网络的优化器self.actor_optimizer torch.optim.Adam(self.actor.parameters(), lractor_lr)# 价值网络的优化器self.critic_optimizer torch.optim.Adam(self.critic.parameters(), lr critic_lr)self.gamma gamma # 折扣因子self.lmbda lmbda # GAE优势函数的缩放系数self.epochs epochs # 一条序列的数据用来训练轮数self.eps eps # PPO中截断范围的参数self.device device# 动作选择def take_action(self, state):# 维度变换 [n_state]--tensor[1,n_states]state torch.tensor(state[np.newaxis, :]).to(self.device)# 当前状态下每个动作的概率分布 [1,n_states]probs self.actor(state)# 创建以probs为标准的概率分布action_list torch.distributions.Categorical(probs)# 依据其概率随机挑选一个动作action action_list.sample().item()return action# 训练def learn(self, transition_dict):# 提取数据集states torch.tensor(transition_dict[states], dtypetorch.float).to(self.device)actions torch.tensor(transition_dict[actions]).to(self.device).view(-1,1)rewards torch.tensor(transition_dict[rewards], dtypetorch.float).to(self.device).view(-1,1)next_states torch.tensor(transition_dict[next_states], dtypetorch.float).to(self.device)dones torch.tensor(transition_dict[dones], dtypetorch.float).to(self.device).view(-1,1)# 目标下一个状态的state_value [b,1]next_q_target self.critic(next_states)# 目标当前状态的state_value [b,1]td_target rewards self.gamma * next_q_target * (1-dones)# 预测当前状态的state_value [b,1]td_value self.critic(states)# 目标值和预测值state_value之差 [b,1]td_delta td_target - td_value# 时序差分值 tensor--numpy [b,1]td_delta td_delta.cpu().detach().numpy()advantage 0 # 优势函数初始化advantage_list []# 计算优势函数for delta in td_delta[::-1]: # 逆序时序差分值 axis1轴上倒着取 [], [], []# 优势函数GAE的公式advantage self.gamma * self.lmbda * advantage deltaadvantage_list.append(advantage)# 正序advantage_list.reverse()# numpy -- tensor [b,1]advantage torch.tensor(advantage_list, dtypetorch.float).to(self.device)# 策略网络给出每个动作的概率根据action得到当前时刻下该动作的概率old_log_probs torch.log(self.actor(states).gather(1, actions)).detach()# 一组数据训练 epochs 轮for _ in range(self.epochs):# 每一轮更新一次策略网络预测的状态log_probs torch.log(self.actor(states).gather(1, actions))# 新旧策略之间的比例ratio torch.exp(log_probs - old_log_probs)# 近端策略优化裁剪目标函数公式的左侧项surr1 ratio * advantage# 公式的右侧项ratio小于1-eps就输出1-eps大于1eps就输出1epssurr2 torch.clamp(ratio, 1-self.eps, 1self.eps) * advantage# 策略网络的损失函数actor_loss torch.mean(-torch.min(surr1, surr2))# 价值网络的损失函数当前时刻的state_value - 下一时刻的state_valuecritic_loss torch.mean(F.mse_loss(self.critic(states), td_target.detach()))# 梯度清0self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()# 反向传播actor_loss.backward()critic_loss.backward()# 梯度更新self.actor_optimizer.step()self.critic_optimizer.step() 3. 案例演示
基于 OpenAI 的 gym 环境完成一个推车游戏一个离散的环境目标是左右移动小车将黄色的杆子保持竖直。动作维度为2属于离散值状态维度为 4分别是坐标、速度、角度、角速度。 import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
from RL_brain import PPOdevice torch.device(cuda) if torch.cuda.is_available() \else torch.device(cpu)# ----------------------------------------- #
# 参数设置
# ----------------------------------------- #num_episodes 100 # 总迭代次数
gamma 0.9 # 折扣因子
actor_lr 1e-3 # 策略网络的学习率
critic_lr 1e-2 # 价值网络的学习率
n_hiddens 16 # 隐含层神经元个数
env_name CartPole-v1
return_list [] # 保存每个回合的return# ----------------------------------------- #
# 环境加载
# ----------------------------------------- #env gym.make(env_name, render_modehuman)
n_states env.observation_space.shape[0] # 状态数 4
n_actions env.action_space.n # 动作数 2# ----------------------------------------- #
# 模型构建
# ----------------------------------------- #agent PPO(n_statesn_states, # 状态数n_hiddensn_hiddens, # 隐含层数n_actionsn_actions, # 动作数actor_lractor_lr, # 策略网络学习率critic_lrcritic_lr, # 价值网络学习率lmbda 0.95, # 优势函数的缩放因子epochs 10, # 一组序列训练的轮次eps 0.2, # PPO中截断范围的参数gammagamma, # 折扣因子device device)# ----------------------------------------- #
# 训练--回合更新 on_policy
# ----------------------------------------- #for i in range(num_episodes):state env.reset()[0] # 环境重置done False # 任务完成的标记episode_return 0 # 累计每回合的reward# 构造数据集保存每个回合的状态数据transition_dict {states: [],actions: [],next_states: [],rewards: [],dones: [],}while not done:action agent.take_action(state) # 动作选择next_state, reward, done, _, _ env.step(action) # 环境更新# 保存每个时刻的状态\动作\...transition_dict[states].append(state)transition_dict[actions].append(action)transition_dict[next_states].append(next_state)transition_dict[rewards].append(reward)transition_dict[dones].append(done)# 更新状态state next_state# 累计回合奖励episode_return reward# 保存每个回合的returnreturn_list.append(episode_return)# 模型训练agent.learn(transition_dict)# 打印回合信息print(fiter:{i}, return:{np.mean(return_list[-10:])})# -------------------------------------- #
# 绘图
# -------------------------------------- #plt.plot(return_list)
plt.title(return)
plt.show()
训练100回合绘制每回合的 return