数据一览

数据是informer代码里自带的数据集。可以看到第一列是时间,以日期和小时为单位;后面的几列都是数据。我们要根据所有数据去预测最后一列的值,即WetBulbCelsius。

data_parser

data_parser = {
    'ETTh1':{'data':'ETTh1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    'ETTh2':{'data':'ETTh2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    'ETTm1':{'data':'ETTm1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    'ETTm2':{'data':'ETTm2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
    'WTH':{'data':'WTH.csv','T':'WetBulbCelsius','M':[12,12,12],'S':[1,1,1],'MS':[12,12,1]},
    'ECL':{'data':'ECL.csv','T':'MT_320','M':[321,321,321],'S':[1,1,1],'MS':[321,321,1]},
    'Solar':{'data':'solar_AL.csv','T':'POWER_136','M':[137,137,137],'S':[1,1,1],'MS':[137,137,1]},
}
参数 解释
data 数据文件名(如 WTH.csv 表示气象数据集文件)
T 目标列名称

 比如这次我们训练使用的数据集是WTH.csv。我们目标预测的变量是WetBulbCelsius。

后面一个参数是预测模式维度配置:

模式 编码器输入 解码器输入 输出维度 说明
M 多变量→多变量 多变量→多变量 多变量→多变量 ECLM: [321,321,321] 表示 321 维输入输出
S 单变量→单变量 单变量→单变量 单变量→单变量 所有数据集 S: [1,1,1] 表示单一特征预测
MS 多变量→单变量 多变量→单变量 多变量→单变量 SolarMS: [137,137,1] 表示 137 维输入预测单变量

我们训练使用的WTH.csv里面有12种物理变量。在预测单变量的时候就是'MS':[12,12,1]。

dataloader

        train_data, train_loader = self._get_data(flag = 'train')
        vali_data, vali_loader = self._get_data(flag = 'val')
        test_data, test_loader = self._get_data(flag = 'test')

在代码里使用_get_data方法来获取dataset和dataloader。

def _get_data(self, flag):
   
    args = self.args
    
    # 数据集类型映射字典:将数据集名称映射到对应的Dataset类
    data_dict = {
        'ETTh1':Dataset_ETT_hour,    # ETT小时数据集(电力数据)
        'ETTh2':Dataset_ETT_hour,    # 另一个ETT小时数据集
        'ETTm1':Dataset_ETT_minute,   # ETT分钟数据集(更高采样频率)
        'ETTm2':Dataset_ETT_minute,   # 另一个ETT分钟数据集
        'WTH':Dataset_Custom,         # 气象数据集使用自定义加载器
        'ECL':Dataset_Custom,         # 电力消耗数据
        'Solar':Dataset_Custom,       # 太阳能发电数据
        'custom':Dataset_Custom,      # 用户自定义数据集入口
    }
    
    # 时间编码标记:当使用'timeF'时间特征编码方式时启用时间编码(0/1标志位)
    timeenc = 0 if args.embed != 'timeF' else 1  # embed参数控制编码类型

    # 根据数据阶段配置加载参数
    if flag == 'test':
        # 测试集配置:不打乱顺序,丢弃不完整批次,使用标准批次大小
        shuffle_flag = False; drop_last = True; batch_size = args.batch_size; freq = args.freq
    elif flag == 'pred':
        # 预测阶段配置:不打乱顺序,保留不完整批次,批次大小为1(单样本预测)
        shuffle_flag = False; drop_last = False; batch_size = 1; freq = args.detail_freq
        Data = Dataset_Pred  # 使用专门设计的预测数据集类
    else:
        # 训练/验证配置:打乱顺序,丢弃不完整批次,使用标准批次大小
        shuffle_flag = True; drop_last = True; batch_size = args.batch_size; freq = args.freq

    # 实例化数据集对象
    data_set = Data(
        root_path=args.root_path,      # 数据集根目录(如'./data/')
        data_path=args.data_path,      # 数据文件名(如'WTH.csv')
        flag=flag,                     # 数据阶段标记(train/test/val/pred)
        size=[args.seq_len, args.label_len, args.pred_len],  # 时序参数配置
        features=args.features,       # 特征模式(M/S/MS)
        target=args.target,            # 预测目标列名(如'OT')
        inverse=args.inverse,          # 是否对输出做逆标准化(True/False)
        timeenc=timeenc,               # 是否应用时间编码(0/1)
        freq=freq,                     # 时间编码频率(如'h'/'t'等)
        cols=args.cols                 # 指定使用的数据列(None表示全选)
    )
    print(flag, len(data_set))  # 打印当前阶段名称及数据集样本数量

    # 创建数据加载器
    data_loader = DataLoader(
        data_set,
        batch_size=batch_size,        # 根据阶段设置批次大小
        shuffle=shuffle_flag,          # 是否打乱顺序
        num_workers=args.num_workers,  # 多线程加载数(0表示禁用)
        drop_last=drop_last           # 是否丢弃不完整批次
    )

    return data_set, data_loader  # 返回数据集对象和数据加载器(前者可用于分析,后者用于迭代)

数据集配置

    # 根据数据阶段配置加载参数
    if flag == 'test':
        # 测试集配置:不打乱顺序,丢弃不完整批次,使用标准批次大小
        shuffle_flag = False; drop_last = True; batch_size = args.batch_size; freq = args.freq
    elif flag == 'pred':
        # 预测阶段配置:不打乱顺序,保留不完整批次,批次大小为1(单样本预测)
        shuffle_flag = False; drop_last = False; batch_size = 1; freq = args.detail_freq
        Data = Dataset_Pred  # 使用专门设计的预测数据集类
    else:
        # 训练/验证配置:打乱顺序,丢弃不完整批次,使用标准批次大小
        shuffle_flag = True; drop_last = True; batch_size = args.batch_size; freq = args.freq

drop_last参数的用法:
drop_last=True → 丢弃尾部数据(保持批次完整)
drop_last=False → 保留不完整批次(适用于预测全部数据)

比如9050个数据,每个batch_size是1000,分出9个batch时还剩50个数据,就把这50个丢弃掉。

这里data使用的是custom类型。我们看下对应Dataset_Pred类怎么实现的。

        if size == None:
            self.seq_len = 24*4*4
            self.label_len = 24*4
            self.pred_len = 24*4
        else:
            self.seq_len = size[0]
            self.label_len = size[1]
            self.pred_len = size[2]

 这里有时间序列预测中三个核心长度参数,序列长度,标签长度,预测长度

seq_len(输入序列长度)

‌模型观察的历史窗口长度,也是输入给编码器的序列长度。

假设每小时记录一次电力负荷数据,设置 seq_len=72, 表示模型每次预测时会查看过去72小时的历史数据。

label_len(标签引导长度)

‌解码器可观察的真实标签长度,当 label_len=24 时,模型在预测未来时会先看到前24小时的‌真实负荷值‌

pred_len(预测长度)

‌需要预测的未来时间步数,设置 pred_len=48 表示,模型将生成未来48小时的负荷预测。

对于编码器的输入长度是72,对于解码器的输入长度是48+24 =72。为什么不直接预测48小时的数据?因为解码器如果不输入一些真实数据,预测的结果表现不太好。

数据标准化

def __read_data__(self):
    # 初始化标准化器(Z-Score标准化)
    self.scaler = StandardScaler()
    
    # 从CSV文件加载原始数据(路径拼接:根目录+数据文件名)
    df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path))
    
    '''
    原始数据结构:
    df_raw.columns: ['date', ...(其他特征列), 目标特征列]
    '''
    
    # 特征列筛选逻辑
    if self.cols:
        # 当指定特征列时:复制列列表并移除目标列(避免重复)
        cols = self.cols.copy()
        cols.remove(self.target)
    else:
        # 未指定时:自动获取所有列,排除日期列和目标列
        cols = list(df_raw.columns)
        cols.remove(self.target)
        cols.remove('date')
    
    # 重组DataFrame列顺序:[日期列, 特征列..., 目标列]
    df_raw = df_raw[['date'] + cols + [self.target]]
    
    # 计算数据截取边界(用于时序滑动窗口)
    border1 = len(df_raw) - self.seq_len  # 有效数据起始位置(保留足够的历史长度)
    border2 = len(df_raw)                 # 数据终止位置(取到最新数据)
    
    # 特征模式选择
    if self.features == 'M' or self.features == 'MS':
        # 多变量模式:使用所有特征列(排除日期列)
        cols_data = df_raw.columns[1:]  # 第一列是日期,后续为特征
        df_data = df_raw[cols_data]
    elif self.features == 'S':
        # 单变量模式:仅使用目标列
        df_data = df_raw[[self.target]]
    
    # 数据标准化处理
    if self.scale:
        # 训练标准化器并转换数据(fit_transform)
        self.scaler.fit(df_data.values)
        data = self.scaler.transform(df_data.values)
    else:
        # 直接使用原始数据
        data = df_data.values
    
    # 时间戳处理(用于时间特征编码)
    tmp_stamp = df_raw[['date']][border1:border2]  # 截取有效时间范围
    tmp_stamp['date'] = pd.to_datetime(tmp_stamp.date)  # 转换为datetime类型
    
    # 生成预测时间段的时间戳(从最后一个已知时间点延伸)
    pred_dates = pd.date_range(
        tmp_stamp.date.values[-1],  # 从最后一个有效时间点开始
        periods=self.pred_len + 1,  # 生成长度=预测长度+1(包含起始点)
        freq=self.freq  # 按数据频率生成(如'h'=每小时)
    )
    
    # 合并历史时间戳和预测时间戳
    df_stamp = pd.DataFrame(columns=['date'])
    df_stamp.date = list(tmp_stamp.date.values) + list(pred_dates[1:])  # 排除重复的起始点
    
    # 生成时间特征(如小时、星期等编码)
    data_stamp = time_features(
        df_stamp, 
        timeenc=self.timeenc,  # 时间编码方式(0=数值编码,1=周期编码)
        freq=self.freq[-1:]    # 频率单位(取最后一个字符,如'h'/'t')
    )
    
    # 存储最终数据
    self.data_x = data[border1:border2]  # 输入特征序列(最后seq_len个时间步)
    
    # 输出数据处理(逆标准化控制)
    if self.inverse:
        # 保留原始数值(用于后续逆变换)
        self.data_y = df_data.values[border1:border2]
    else:
        # 使用标准化后的数值
        self.data_y = data[border1:border2]
    
    # 存储时间特征矩阵
    self.data_stamp = data_stamp

scaler

class StandardScaler():
    def __init__(self):
        # 初始化标准化参数(未训练时默认均值为0,标准差为1)
        # 注意:必须在调用fit方法后才能正确使用
        self.mean = 0.    # 存储特征维度的均值(numpy数组格式)
        self.std = 1.     # 存储特征维度的标准差(numpy数组格式)

    def fit(self, data):
        """学习数据的标准化参数
        Args:
            data: 原始输入数据,支持numpy数组或pandas DataFrame
                 形状为(样本数, 特征数),按列计算统计量
        """
        # 沿样本维度计算各特征的均值和标准差
        # 保留numpy数组格式以便后续转换
        self.mean = data.mean(0)   # 均值向量 shape=(特征数,)
        self.std = data.std(0)     # 标准差向量 shape=(特征数,)

    def transform(self, data):
        """应用标准化转换
        Args:
            data: 输入数据,可以是numpy数组或PyTorch张量
        Returns:
            标准化后的数据,保持与输入相同的数据类型和设备
        """
        # 设备与类型兼容性处理
        if torch.is_tensor(data):
            # 将统计参数转换为与data同设备同类型的张量
            mean = torch.from_numpy(self.mean).type_as(data).to(data.device)
            std = torch.from_numpy(self.std).type_as(data).to(data.device)
        else:
            # 保持numpy数组格式用于非张量数据
            mean = self.mean
            std = self.std
            
        # 执行标准化公式:(x - μ) / σ
        return (data - mean) / std

    def inverse_transform(self, data):
        """逆标准化转换(将标准化数据还原)
        Args:
            data: 标准化后的数据,支持numpy数组或PyTorch张量
        Returns:
            原始尺度数据,保持与输入相同的数据类型和设备
        """
        # 设备与类型兼容性处理(与transform逻辑一致)
        if torch.is_tensor(data):
            mean = torch.from_numpy(self.mean).type_as(data).to(data.device)
            std = torch.from_numpy(self.std).type_as(data).to(data.device)
        else:
            mean = self.mean
            std = self.std
            
        # 执行逆变换公式:x * σ + μ
        return (data * std) + mean

上面代码的核心是做标准化处理和逆标准化处理的。

# 执行标准化公式:(x - μ) / σ
        return (data - mean) / std

通过pandas将csv数据导入。 

df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path)) 

将数据里的目标列和日期列去掉,因为他们俩不是特征

cols.remove(self.target); cols.remove('date')

后面有句代码刚看到的时候感觉很奇怪,这样重组是为了啥?

df_raw = df_raw[['date']+cols+[self.target]]

研究了一下,这样做有几个好处。

结构化数据布局

通过强制指定列顺序为 [日期列, 特征列..., 目标列],实现以下目标:

  • 时间列独立‌:将 date(时间戳)放在首位,便于后续提取时间特征。
  • 特征与目标分离‌:所有输入特征集中在中间列,目标变量单独放在末尾,符合机器学习任务中 X(特征)和 y(目标)的规范格式。

特征筛选控制

通过cols动态选择需要的特征列,把不需要的特征去掉。

数据划分

数据集划分逻辑‌

代码按 ‌7:2:1(训练:测试:验证)‌ 的比例划分数据集

  • 训练集‌:前70% = 24,544条
  • 测试集‌:后20% = 7,012条
  • 验证集‌:中间剩余部分 = 35,064 - 24,544 - 7,012 = ‌3,508条‌

数据集划分的边界处理‌

代码中通过 border1sborder2s 动态计算各数据集的起始和结束位置,关键逻辑如下:

  • 训练集‌:[0, 24544)
    直接使用前70%数据,无需回退序列长度(因训练数据足够从头开始切割完整序列)。

  • 验证集‌:[24544-96, 24544+3508)
    起始位置回退96步(24544-96=24448),确保第一个验证样本能包含完整的历史输入序列。

  • 测试集‌:[35064-7012-96, 35064)
    起始位置回退96步(35064-7012-96=27956),道理同上。

以训练集为例,训练集的开始位置就是border1s[0],结束位置是border2s[0]。

归一化

        if self.scale:
            train_data = df_data[border1s[0]:border2s[0]]
            self.scaler.fit(train_data.values)
            data = self.scaler.transform(df_data.values)

数据划分完之后做归一化处理,归一化的同时转成tensor形式。归一化前:

归一化后:

时间特征提取

        df_stamp = df_raw[['date']][border1:border2]
        df_stamp['date'] = pd.to_datetime(df_stamp.date)
        data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq)

首先用pandas将时间戳这一列做下转换。核心处理是这句:

 dates = pd.to_datetime(dates.date.values)
        return np.vstack([feat(dates) for feat in time_features_from_frequency_str(freq)]).transpose(1,0)

上面的写法太难读了,把写法更改一下:

# 原始数据中的日期列转换为pandas的datetime类型
dates_series = dates.date.values  # 提取原始日期列的值(假设是字符串格式)
datetime_objs = pd.to_datetime(dates_series)  # 转换为datetime对象

# 获取时间特征生成函数列表
# 假设 time_features_from_frequency_str(freq) 返回类似 [Year, Month, Day...] 的特征函数
feature_funcs = time_features_from_frequency_str(freq)

# 逐个应用特征函数生成特征数组
feature_arrays = [feature_func(datetime_objs) for feature_func in feature_funcs]

# 将特征堆叠为二维数组并转置
# 假设原特征形状为 (num_features, num_samples),转置后为 (num_samples, num_features)
feature_matrix = np.vstack(feature_arrays)  # 垂直堆叠
feature_matrix = feature_matrix.transpose(1, 0)  # 行列转置

return feature_matrix

 1. ‌日期格式转换

将原始日期字符串(如 "2023-01-01")转换为 pandas 的datetime类型对象

2. ‌获取时间特征函数

feature_funcs = time_features_from_frequency_str(freq)

 这行代码的作用是,根据时间频率 freq(如 "D" 表示天,"H" 表示小时)生成一组特征提取函数。输入h的话会返回4个特征提取函数。

3.生成时间特征

feature_arrays = [feature_func(datetime_objs) for feature_func in feature_funcs]

 每个函数对 datetime_objs 处理,返回一个特征数组。比如 get_month(datetime_objs) 可能返回 [1, 1, 3, ...](月份数组)。

4. ‌堆叠与转置

feature_matrix = np.vstack(feature_arrays)  # 形状变为 (3, 100)
feature_matrix = feature_matrix.transpose(1, 0)  # 形状变为 (100, 3)
  • np.vstack‌:将多个一维数组垂直堆叠成二维数组
  • transpose‌:将特征维度从第一维调整到第二维,符合机器学习输入格式 (样本数,特征数)

 

最后的数据如上图所示,原先代表时间戳的序列经过四个时间特征提取方法,得到了四组时间序列数据。可以看到图片里第一行数据都是有变化的,这是因为第一行表示的是小时与天的关系;后面的数据没变化,是因为他们都在同一天,对于周,月,年来说特征是相同的。

具体的计算代码如下:

class HourOfDay(TimeFeature):
    """Hour of day encoded as value between [-0.5, 0.5]"""
    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.hour / 23.0 - 0.5

class DayOfWeek(TimeFeature):
    """Hour of day encoded as value between [-0.5, 0.5]"""
    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return index.dayofweek / 6.0 - 0.5

class DayOfMonth(TimeFeature):
    """Day of month encoded as value between [-0.5, 0.5]"""
    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.day - 1) / 30.0 - 0.5

class DayOfYear(TimeFeature):
    """Day of year encoded as value between [-0.5, 0.5]"""
    def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
        return (index.dayofyear - 1) / 365.0 - 0.5

可以看到,4个时间特征都被压缩到了同一区间[-0.5,0.5]内,主要有两个好处:

  • 周期性特征表达‌:通过将离散时间属性转换为连续值,帮助模型捕捉周期性规律(如小时、星期循环。
  • 数值稳定性‌:归一化到[-0.5, 0.5]区间可避免不同尺度特征对模型训练的影响

 最后根据dataset生成dataloader。

        data_loader = DataLoader(
            data_set,
            batch_size=batch_size,
            shuffle=shuffle_flag,
            num_workers=args.num_workers,
            drop_last=drop_last)

get_item

def __getitem__(self, index):
    # 获取索引对应的数据样本
    # index: 当前样本的索引,通常由DataLoader迭代时传入
    
    # 计算输入序列的起始和结束位置
    s_begin = index  # 输入序列的起始位置等于索引
    s_end = s_begin + self.seq_len  # 输入序列的结束位置(编码器输入长度)
    
    # 计算解码器相关序列的起始和结束位置
    # Informer的Decoder需要两部分输入:
    # 1. label_len: 解码器的已知输入序列(通常是编码器末尾的部分序列)
    # 2. pred_len: 需要预测的未来序列长度
    r_begin = s_end - self.label_len  # 解码器输入的起始位置(从编码器序列末尾回退label_len长度)
    r_end = r_begin + self.label_len + self.pred_len  # 解码器输入+预测的总结束位置
    
    # 获取编码器输入序列(历史数据)
    # seq_x.shape: [seq_len, n_features]
    seq_x = self.data_x[s_begin:s_end]  # 编码器输入:从s_begin到s_end的序列
    
    # 获取解码器目标序列
    if self.inverse:
        # 当需要逆变换时(例如数据被标准化后需要还原):
        # 解码器的输入部分取原始数据(data_x),预测部分取目标数据(data_y)
        # 这种模式常见于推理时,需要将预测结果逆变换到原始数据空间
        seq_y = np.concatenate([
            self.data_x[r_begin:r_begin+self.label_len],  # 前label_len步用原始数据
            self.data_y[r_begin+self.label_len:r_end]     # 后pred_len步用目标数据(可能包含空值或占位符)
        ], axis=0)
    else:
        # 常规模式下直接使用目标数据
        # seq_y包含解码器的输入(label_len)和预测目标(pred_len)
        # seq_y.shape: [label_len + pred_len, n_features]
        seq_y = self.data_y[r_begin:r_end]
    
    # 获取时间戳特征(例如月份、星期、节假日等)
    # seq_x_mark: 编码器输入的时间特征
    # seq_y_mark: 解码器输入及预测区间的时间特征
    # data_stamp通常包含时间相关的one-hot编码或数值特征
    seq_x_mark = self.data_stamp[s_begin:s_end]    # 编码器时间特征
    seq_y_mark = self.data_stamp[r_begin:r_end]    # 解码器时间特征(包含预测区间)
    
    # 返回的四个对象:
    # seq_x: 编码器输入序列(历史数据)
    # seq_y: 解码器输入(前label_len) + 预测目标(后pred_len)的拼接序列
    # seq_x_mark: 编码器时间特征
    # seq_y_mark: 解码器时间特征(含预测区间)
    return seq_x, seq_y, seq_x_mark, seq_y_mark

 编码器的输入比较简单,因为编码器不处理预测行为,所以输入序列长度就是我们超参数里定义的序列长度seq_len。

解码器的输入就比较讲究了,解码器的输入长度取决于标签长度label_len和预测长度pred_len。其中标签长度的数据从编码器输出的末尾取得。因此解码器序列的开始位置是send-label_len。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐