系列文章目录

强化学习DDPG手写代码实现过程



前言

随着深度学习的发展,强化学习在复杂决策问题中的应用逐渐扩展到连续控制领域。然而,传统的值函数方法(如 DQN)主要适用于离散动作空间,在机器人控制、自动驾驶、机械臂操作等连续动作场景中难以直接应用。为了解决这一问题,Deep Deterministic Policy Gradient(DDPG)算法被提出,用于处理高维、连续动作空间下的强化学习问题。

DDPG 是一种基于 Actor-Critic 框架的离策略算法,它结合了深度神经网络与确定性策略梯度方法,通过引入目标网络(Target Network)和经验回放(Replay Buffer)机制,提高了训练的稳定性与样本利用效率。该算法通过 Critic 网络估计状态-动作价值函数 Q(s, a),并利用 Actor 网络直接输出连续动作,实现端到端的策略优化。

相比传统策略梯度方法,DDPG 能够在连续控制任务中更高效地逼近最优策略,在机器人控制、物理仿真环境(如 MuJoCo)以及工业控制等领域得到了广泛应用。尽管 DDPG 在训练稳定性和超参数敏感性方面仍存在一定挑战,但它为后续算法(如 TD3、SAC)的发展奠定了重要基础。

因此,研究 DDPG 的原理与实现过程,不仅有助于理解深度强化学习在连续控制问题中的核心思想,也为进一步学习和改进先进算法提供理论与实践基础。


一、实现步骤

1.引入库

import numpy as np
import gymnasium as gym
import torch 
import torch.nn as nn
import torch.optim as optim
from dataclasses import dataclass
import random
from collections import deque

本代码首先导入了实现 DDPG 算法所需的核心库。numpy 用于数值计算和数组操作;gymnasium 用于创建强化学习环境并与环境进行交互;torch 及其子模块 nn 和 optim 用于构建神经网络模型并进行反向传播优化;dataclass 用于定义算法的配置参数结构,使代码更加清晰规范;random 用于生成随机数,例如经验回放采样;collections.deque 用于构建经验回放缓冲区(Replay Buffer),以存储和管理交互数据,提高样本利用效率和训练稳定性。

2.写入超参数

代码如下(示例):

@dataclass
class DDPGconfig:
    seed :int =0
    gamma: float = 0.99
    tau: float = 0.005
    actor_lr: float = 1e-3
    critic_lr: float = 1e-3
    buffer_size: int =200_000
    batch_size: int = 256
    #对于噪音设计
    noise_start: float = 0.3
    noise_end: float = 0.05
    noise_decay_steps: int = 50_000
    #训练步子
    warmup_steps: int = 2_000  # 前面先随机探索填 buffer
    total_steps: int = 120_000

3.创建经验回放池

class ReplayBuffer:
    def __init__(self,buffer_size,state_dim,action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.buffer_size = buffer_size
        self.ptr =0
        self.size = 0
        self.a = np.zeros((buffer_size,action_dim),dtype = np.float32)
        self.s = np.zeros((buffer_size,state_dim),dtype = np.float32)
        self.s_n = np.zeros((buffer_size,state_dim),dtype = np.float32)
        self.r = np.zeros((buffer_size,1),dtype = np.float32)
        self.done = np.zeros((buffer_size,1),dtype = np.float32)

    def add(self, s, a, r, s_n, done):
        self.s[self.ptr] = s
        self.a[self.ptr] = a
        self.r[self.ptr] = r        
        self.s_n[self.ptr] = s_n
        self.done[self.ptr] = done

        self.ptr = (self.ptr+1)%self.buffer_size
        self.size = min(self.size+1,self.buffer_size)

    def sample(self,batch_size):
        index = np.random.randint(0,self.size,batch_size)
        return(
            self.s[index],
            self.a[index],
            self.r[index],
            self.s_n[index],
            self.done[index]
        )

该 ReplayBuffer 类用于实现经验回放机制(Experience Replay),是 DDPG 等离策略强化学习算法中的核心组件。其主要作用是存储智能体与环境交互产生的状态、动作、奖励、下一状态以及终止标志,并在训练时随机采样小批量数据进行更新,从而打破数据之间的时间相关性,提高训练稳定性和样本利用效率。

在初始化阶段,缓冲区根据 buffer_size 预先分配固定大小的 NumPy 数组,用于分别存储状态 s、动作 a、奖励 r、下一状态 s_n 和结束标志 done。ptr 用于指示当前写入位置,采用循环覆盖机制(取模运算)实现先进先出结构;size 用于记录当前已存储的数据量。

add 方法负责将新的交互数据写入缓冲区,并更新指针位置;当缓冲区写满后,新数据会覆盖最旧的数据。sample 方法则从当前已存储的数据中随机采样一个批次(batch),用于后续神经网络训练。随机采样可以有效减少样本间的相关性,使梯度更新更加稳定。

Actor和Critic网络

class Actor(nn.Module):
    def __init__(self,state_dim,action_dim,Hidden=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim,Hidden),
            nn.ReLU(),
            nn.Linear(Hidden,Hidden),
            nn.ReLU(),
            nn.Linear(Hidden,action_dim),
            nn.Tanh()   
        )
    def forward(self,s):
        return self.net(s)
    
class Critic(nn.Module):
    def __init__(self,state_dim,action_dim,Hidden=256):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear (state_dim+action_dim,Hidden),
            nn.ReLU(),
            nn.Linear(Hidden,Hidden),
            nn.ReLU(),
            nn.Linear(Hidden,1)
        )
    def forward(self,s,a):
        x = torch.cat([s, a], dim=-1)
        return(self.net(x))

5.agent

class DDPGAgent():
    def __init__(self,state_dim,action_dim,action_low,action_high,config,device):
        self.state_dim = state_dim
        self.action_dim = action_dim


        self.cfg = config
        self.device = device

        self.train_step = 0

        self.rb = ReplayBuffer(self.cfg.buffer_size,self.state_dim,self.action_dim)

        self.Actor = Actor(self.state_dim,self.action_dim).to(device)
        self.Critic = Critic(self.state_dim,self.action_dim).to(device)
        self.Actor_t = Actor(self.state_dim,self.action_dim).to(device)
        self.Critic_t = Critic(self.state_dim,self.action_dim).to(device)

        self.action_low_np  = np.asarray(action_low, dtype=np.float32)
        self.action_high_np = np.asarray(action_high, dtype=np.float32)
        self.action_low_t = torch.tensor(self.action_low_np,dtype=torch.float32,device = self.device)
        self.action_high_t = torch.tensor(self.action_high_np,dtype=torch.float32,device = self.device)

        self.Actor_t.load_state_dict(self.Actor.state_dict())
        self.Critic_t.load_state_dict(self.Critic.state_dict())

        self.opt_Actor = optim.Adam(self.Actor.parameters(),lr = self.cfg.actor_lr)
        self.opt_Critic = optim.Adam(self.Critic.parameters(),lr = self.cfg.critic_lr)

    def _scale_np(self, a01_np: np.ndarray) -> np.ndarray:
        # a01_np in [-1, 1]
        return self.action_low_np + (a01_np + 1.0) * 0.5 * (self.action_high_np - self.action_low_np)

    def _scale_torch(self, a01_t: torch.Tensor) -> torch.Tensor:
        # a01_t in [-1, 1]
        return self.action_low_t + (a01_t + 1.0) * 0.5 * (self.action_high_t - self.action_low_t)

    def noise_scale(self, global_step: int) -> float:
        t = min(global_step, self.cfg.noise_decay_steps)
        return float(np.interp(
            t,
            [0, self.cfg.noise_decay_steps],
            [self.cfg.noise_start, self.cfg.noise_end]
        ))

    @torch.no_grad()
    def act(self, s_np, global_step: int, explore: bool = True):
        # 1) numpy state -> torch, add batch dim
        s = torch.tensor(s_np, dtype=torch.float32, device=self.device).unsqueeze(0)

        # 2) actor output in [-1,1], torch -> numpy
        a01 = self.Actor(s).cpu().numpy()[0].astype(np.float32)

        # 3) exploration noise (in [-1,1] space)
        if explore:
            sigma = self.noise_scale(global_step)
            a01 = a01 + sigma * np.random.randn(self.action_dim).astype(np.float32)
            a01 = np.clip(a01, -1.0, 1.0)

        # 4) scale to env action range and clip
        env_a = self._scale_np(a01)
        env_a = np.clip(env_a, self.action_low_np, self.action_high_np)
        return env_a.astype(np.float32)

    def addtransition(self, s, a, r, s_n, done):
        return self.rb.add( s, a, r, s_n,  done)

    def softupdata(self,net,net_n):
        tau = self.cfg.tau
        for p,p_n in zip(net.parameters(),net_n.parameters()):
                p_n.data.mul_(1 - tau)
                p_n.data.add_(tau * p.data)#写错了

    def updata(self):
        if self.rb.size< self.cfg.batch_size:
            return {}
        s,a,r, s_n,don = self.rb.sample(self.cfg.batch_size)
        s = torch.tensor(s,dtype=torch.float32,device = self.device)
        a_env = torch.tensor(a, dtype=torch.float32, device=self.device)
        r = torch.tensor(r, dtype=torch.float32, device=self.device)
        s2 = torch.tensor(s_n, dtype=torch.float32, device=self.device)
        d = torch.tensor(don, dtype=torch.float32, device=self.device)
        with torch.no_grad():
            #进行critic网络参事初始化
            a2_n = self.Actor_t(s2)
            a2_env_n = self._scale_torch(a2_n)
            q_target = self.Critic_t(s2,a2_env_n)
            y = r+self.cfg.gamma*(1.0-d)* q_target
        q = self.Critic(s,a_env)
        critic_loss = nn.functional.mse_loss(q, y)
        self.opt_Critic.zero_grad()
        critic_loss.backward()
        self.opt_Critic.step()
        #对Actor进行
        a01 = self.Actor(s)
        a01_env =self._scale_torch(a01)
        actor_loss = -self.Critic(s, a01_env).mean()

        self.opt_Actor.zero_grad()
        actor_loss.backward()
        self.opt_Actor.step()

        self.softupdata(self.Actor, self.Actor_t)
        self.softupdata(self.Critic, self.Critic_t)

        self.train_step += 1
        return {
            "critic_loss": float(critic_loss.item()),
            "actor_loss": float(actor_loss.item()),
            "q_mean": float(q.mean().item()),
        }




这段 DDPGAgent 代码实现了 DDPG 算法的核心结构和完整训练流程。初始化阶段构建经验回放池,并创建 Actor 和 Critic 两套网络,同时为它们分别建立对应的目标网络,用于稳定训练。Actor 负责根据状态输出连续动作,Critic 负责评估当前状态和动作的价值。目标网络在开始时复制在线网络参数,并在训练过程中通过软更新缓慢跟随,从而减少训练震荡。

在与环境交互时,act 函数会将状态输入 Actor 得到动作,并在探索阶段加入逐渐衰减的随机噪声,以增强探索能力。由于神经网络输出范围固定在 [-1,1],代码通过缩放函数将动作映射到环境真实动作区间。

在训练阶段,从经验回放池中随机采样一批数据。首先更新 Critic,使其学会更准确地评估当前策略下的动作价值;随后更新 Actor,使其倾向于输出能获得更高评价的动作。最后通过软更新机制同步目标网络参数,使训练过程更加平稳。整个结构体现了 DDPG 作为离策略、连续控制强化学习算法的典型设计思想。

main函数执行

def main():
    cfg = DDPGconfig()
    set_seed(cfg.seed)

    env = gym.make("Pendulum-v1")
    obs, _ = env.reset(seed=cfg.seed)

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    action_low = env.action_space.low
    action_high = env.action_space.high

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    agent = DDPGAgent(
        state_dim,
        action_dim,
        action_low,
        action_high,
        cfg,
        device
    )

    episode_return = 0.0
    episode_len = 0
    episode = 0

    for global_step in range(cfg.total_steps):

        # -------- 选动作 --------
        if global_step < cfg.warmup_steps:
            action = env.action_space.sample().astype(np.float32)
        else:
            action = agent.act(obs, global_step, explore=True)

        # -------- 环境交互 --------
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # -------- 存入 ReplayBuffer --------
        agent.addtransition(obs, action, reward, next_obs, done)

        obs = next_obs
        episode_return += reward
        episode_len += 1

        # -------- 更新网络 --------
        info = agent.updata()

        # -------- episode 结束 --------
        if done:
            episode += 1

            if info:
                print(
                    f"Episode {episode:4d} | "
                    f"Return {episode_return:8.2f} | "
                    f"Len {episode_len:4d} | "
                    f"Critic {info['critic_loss']:.4f} | "
                    f"Actor {info['actor_loss']:.4f} | "
                    f"Q {info['q_mean']:.4f}"
                )
            else:
                print(
                    f"Episode {episode:4d} | "
                    f"Return {episode_return:8.2f} | "
                    f"Len {episode_len:4d}"
                )

            obs, _ = env.reset()
            episode_return = 0.0
            episode_len = 0

    env.close()


if __name__ == "__main__":
    main()

这段 main() 函数实现了整个 DDPG 训练流程的主程序入口,负责环境创建、智能体初始化以及训练循环的调度。

首先读取配置参数并设置随机种子,然后创建 Pendulum-v1 环境,并根据环境自动获取状态维度、动作维度以及动作上下界。接着根据当前是否有 GPU 选择计算设备,并实例化 DDPGAgent,完成网络和经验回放池的初始化。

进入训练循环后,在前期的 warmup 阶段使用随机动作与环境交互,用于填充经验回放池;之后则使用智能体的策略输出动作,并加入探索噪声。每一步交互都会将状态、动作、奖励和终止标志存入回放池。当样本数量足够时,开始调用 updata() 进行网络训练,更新 Actor 和 Critic。

程序会持续累计每个 episode 的总奖励和长度,当回合结束时打印当前回合的回报、步数以及网络的损失信息,用于观察训练效果。随后重置环境,进入下一回合。训练完成后关闭环境,整个流程结束。

整体上,这个函数承担的是“调度与控制”的角色,将环境交互、数据存储和网络更新串联成完整的 DDPG 训练闭环。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐