Informer源码解析2——数据处理
数据是informer代码里自带的数据集。可以看到第一列是时间,以日期和小时为单位;后面的几列都是数据。我们要根据所有数据去预测最后一列的值,即WetBulbCelsius。
数据一览
数据是informer代码里自带的数据集。可以看到第一列是时间,以日期和小时为单位;后面的几列都是数据。我们要根据所有数据去预测最后一列的值,即WetBulbCelsius。
data_parser
data_parser = {
'ETTh1':{'data':'ETTh1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTh2':{'data':'ETTh2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTm1':{'data':'ETTm1.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'ETTm2':{'data':'ETTm2.csv','T':'OT','M':[7,7,7],'S':[1,1,1],'MS':[7,7,1]},
'WTH':{'data':'WTH.csv','T':'WetBulbCelsius','M':[12,12,12],'S':[1,1,1],'MS':[12,12,1]},
'ECL':{'data':'ECL.csv','T':'MT_320','M':[321,321,321],'S':[1,1,1],'MS':[321,321,1]},
'Solar':{'data':'solar_AL.csv','T':'POWER_136','M':[137,137,137],'S':[1,1,1],'MS':[137,137,1]},
}
参数 | 解释 |
data | 数据文件名(如 WTH.csv 表示气象数据集文件) |
T | 目标列名称 |
比如这次我们训练使用的数据集是WTH.csv。我们目标预测的变量是WetBulbCelsius。
后面一个参数是预测模式维度配置:
模式 | 编码器输入 | 解码器输入 | 输出维度 | 说明 |
---|---|---|---|---|
M | 多变量→多变量 | 多变量→多变量 | 多变量→多变量 | 如 ECL 的 M: [321,321,321] 表示 321 维输入输出 |
S | 单变量→单变量 | 单变量→单变量 | 单变量→单变量 | 所有数据集 S: [1,1,1] 表示单一特征预测 |
MS | 多变量→单变量 | 多变量→单变量 | 多变量→单变量 | 如 Solar 的 MS: [137,137,1] 表示 137 维输入预测单变量 |
我们训练使用的WTH.csv里面有12种物理变量。在预测单变量的时候就是'MS':[12,12,1]。
dataloader
train_data, train_loader = self._get_data(flag = 'train')
vali_data, vali_loader = self._get_data(flag = 'val')
test_data, test_loader = self._get_data(flag = 'test')
在代码里使用_get_data方法来获取dataset和dataloader。
def _get_data(self, flag):
args = self.args
# 数据集类型映射字典:将数据集名称映射到对应的Dataset类
data_dict = {
'ETTh1':Dataset_ETT_hour, # ETT小时数据集(电力数据)
'ETTh2':Dataset_ETT_hour, # 另一个ETT小时数据集
'ETTm1':Dataset_ETT_minute, # ETT分钟数据集(更高采样频率)
'ETTm2':Dataset_ETT_minute, # 另一个ETT分钟数据集
'WTH':Dataset_Custom, # 气象数据集使用自定义加载器
'ECL':Dataset_Custom, # 电力消耗数据
'Solar':Dataset_Custom, # 太阳能发电数据
'custom':Dataset_Custom, # 用户自定义数据集入口
}
# 时间编码标记:当使用'timeF'时间特征编码方式时启用时间编码(0/1标志位)
timeenc = 0 if args.embed != 'timeF' else 1 # embed参数控制编码类型
# 根据数据阶段配置加载参数
if flag == 'test':
# 测试集配置:不打乱顺序,丢弃不完整批次,使用标准批次大小
shuffle_flag = False; drop_last = True; batch_size = args.batch_size; freq = args.freq
elif flag == 'pred':
# 预测阶段配置:不打乱顺序,保留不完整批次,批次大小为1(单样本预测)
shuffle_flag = False; drop_last = False; batch_size = 1; freq = args.detail_freq
Data = Dataset_Pred # 使用专门设计的预测数据集类
else:
# 训练/验证配置:打乱顺序,丢弃不完整批次,使用标准批次大小
shuffle_flag = True; drop_last = True; batch_size = args.batch_size; freq = args.freq
# 实例化数据集对象
data_set = Data(
root_path=args.root_path, # 数据集根目录(如'./data/')
data_path=args.data_path, # 数据文件名(如'WTH.csv')
flag=flag, # 数据阶段标记(train/test/val/pred)
size=[args.seq_len, args.label_len, args.pred_len], # 时序参数配置
features=args.features, # 特征模式(M/S/MS)
target=args.target, # 预测目标列名(如'OT')
inverse=args.inverse, # 是否对输出做逆标准化(True/False)
timeenc=timeenc, # 是否应用时间编码(0/1)
freq=freq, # 时间编码频率(如'h'/'t'等)
cols=args.cols # 指定使用的数据列(None表示全选)
)
print(flag, len(data_set)) # 打印当前阶段名称及数据集样本数量
# 创建数据加载器
data_loader = DataLoader(
data_set,
batch_size=batch_size, # 根据阶段设置批次大小
shuffle=shuffle_flag, # 是否打乱顺序
num_workers=args.num_workers, # 多线程加载数(0表示禁用)
drop_last=drop_last # 是否丢弃不完整批次
)
return data_set, data_loader # 返回数据集对象和数据加载器(前者可用于分析,后者用于迭代)
数据集配置
# 根据数据阶段配置加载参数
if flag == 'test':
# 测试集配置:不打乱顺序,丢弃不完整批次,使用标准批次大小
shuffle_flag = False; drop_last = True; batch_size = args.batch_size; freq = args.freq
elif flag == 'pred':
# 预测阶段配置:不打乱顺序,保留不完整批次,批次大小为1(单样本预测)
shuffle_flag = False; drop_last = False; batch_size = 1; freq = args.detail_freq
Data = Dataset_Pred # 使用专门设计的预测数据集类
else:
# 训练/验证配置:打乱顺序,丢弃不完整批次,使用标准批次大小
shuffle_flag = True; drop_last = True; batch_size = args.batch_size; freq = args.freq
drop_last参数的用法:
drop_last=True → 丢弃尾部数据(保持批次完整)
drop_last=False → 保留不完整批次(适用于预测全部数据)
比如9050个数据,每个batch_size是1000,分出9个batch时还剩50个数据,就把这50个丢弃掉。
这里data使用的是custom类型。我们看下对应Dataset_Pred类怎么实现的。
if size == None:
self.seq_len = 24*4*4
self.label_len = 24*4
self.pred_len = 24*4
else:
self.seq_len = size[0]
self.label_len = size[1]
self.pred_len = size[2]
这里有时间序列预测中三个核心长度参数,序列长度,标签长度,预测长度。
seq_len(输入序列长度)
模型观察的历史窗口长度,也是输入给编码器的序列长度。
假设每小时记录一次电力负荷数据,设置 seq_len=72, 表示模型每次预测时会查看过去72小时的历史数据。
label_len(标签引导长度)
解码器可观察的真实标签长度,当 label_len=24 时,模型在预测未来时会先看到前24小时的真实负荷值。
pred_len(预测长度)
需要预测的未来时间步数,设置 pred_len=48 表示,模型将生成未来48小时的负荷预测。
对于编码器的输入长度是72,对于解码器的输入长度是48+24 =72。为什么不直接预测48小时的数据?因为解码器如果不输入一些真实数据,预测的结果表现不太好。
数据标准化
def __read_data__(self):
# 初始化标准化器(Z-Score标准化)
self.scaler = StandardScaler()
# 从CSV文件加载原始数据(路径拼接:根目录+数据文件名)
df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path))
'''
原始数据结构:
df_raw.columns: ['date', ...(其他特征列), 目标特征列]
'''
# 特征列筛选逻辑
if self.cols:
# 当指定特征列时:复制列列表并移除目标列(避免重复)
cols = self.cols.copy()
cols.remove(self.target)
else:
# 未指定时:自动获取所有列,排除日期列和目标列
cols = list(df_raw.columns)
cols.remove(self.target)
cols.remove('date')
# 重组DataFrame列顺序:[日期列, 特征列..., 目标列]
df_raw = df_raw[['date'] + cols + [self.target]]
# 计算数据截取边界(用于时序滑动窗口)
border1 = len(df_raw) - self.seq_len # 有效数据起始位置(保留足够的历史长度)
border2 = len(df_raw) # 数据终止位置(取到最新数据)
# 特征模式选择
if self.features == 'M' or self.features == 'MS':
# 多变量模式:使用所有特征列(排除日期列)
cols_data = df_raw.columns[1:] # 第一列是日期,后续为特征
df_data = df_raw[cols_data]
elif self.features == 'S':
# 单变量模式:仅使用目标列
df_data = df_raw[[self.target]]
# 数据标准化处理
if self.scale:
# 训练标准化器并转换数据(fit_transform)
self.scaler.fit(df_data.values)
data = self.scaler.transform(df_data.values)
else:
# 直接使用原始数据
data = df_data.values
# 时间戳处理(用于时间特征编码)
tmp_stamp = df_raw[['date']][border1:border2] # 截取有效时间范围
tmp_stamp['date'] = pd.to_datetime(tmp_stamp.date) # 转换为datetime类型
# 生成预测时间段的时间戳(从最后一个已知时间点延伸)
pred_dates = pd.date_range(
tmp_stamp.date.values[-1], # 从最后一个有效时间点开始
periods=self.pred_len + 1, # 生成长度=预测长度+1(包含起始点)
freq=self.freq # 按数据频率生成(如'h'=每小时)
)
# 合并历史时间戳和预测时间戳
df_stamp = pd.DataFrame(columns=['date'])
df_stamp.date = list(tmp_stamp.date.values) + list(pred_dates[1:]) # 排除重复的起始点
# 生成时间特征(如小时、星期等编码)
data_stamp = time_features(
df_stamp,
timeenc=self.timeenc, # 时间编码方式(0=数值编码,1=周期编码)
freq=self.freq[-1:] # 频率单位(取最后一个字符,如'h'/'t')
)
# 存储最终数据
self.data_x = data[border1:border2] # 输入特征序列(最后seq_len个时间步)
# 输出数据处理(逆标准化控制)
if self.inverse:
# 保留原始数值(用于后续逆变换)
self.data_y = df_data.values[border1:border2]
else:
# 使用标准化后的数值
self.data_y = data[border1:border2]
# 存储时间特征矩阵
self.data_stamp = data_stamp
scaler
class StandardScaler():
def __init__(self):
# 初始化标准化参数(未训练时默认均值为0,标准差为1)
# 注意:必须在调用fit方法后才能正确使用
self.mean = 0. # 存储特征维度的均值(numpy数组格式)
self.std = 1. # 存储特征维度的标准差(numpy数组格式)
def fit(self, data):
"""学习数据的标准化参数
Args:
data: 原始输入数据,支持numpy数组或pandas DataFrame
形状为(样本数, 特征数),按列计算统计量
"""
# 沿样本维度计算各特征的均值和标准差
# 保留numpy数组格式以便后续转换
self.mean = data.mean(0) # 均值向量 shape=(特征数,)
self.std = data.std(0) # 标准差向量 shape=(特征数,)
def transform(self, data):
"""应用标准化转换
Args:
data: 输入数据,可以是numpy数组或PyTorch张量
Returns:
标准化后的数据,保持与输入相同的数据类型和设备
"""
# 设备与类型兼容性处理
if torch.is_tensor(data):
# 将统计参数转换为与data同设备同类型的张量
mean = torch.from_numpy(self.mean).type_as(data).to(data.device)
std = torch.from_numpy(self.std).type_as(data).to(data.device)
else:
# 保持numpy数组格式用于非张量数据
mean = self.mean
std = self.std
# 执行标准化公式:(x - μ) / σ
return (data - mean) / std
def inverse_transform(self, data):
"""逆标准化转换(将标准化数据还原)
Args:
data: 标准化后的数据,支持numpy数组或PyTorch张量
Returns:
原始尺度数据,保持与输入相同的数据类型和设备
"""
# 设备与类型兼容性处理(与transform逻辑一致)
if torch.is_tensor(data):
mean = torch.from_numpy(self.mean).type_as(data).to(data.device)
std = torch.from_numpy(self.std).type_as(data).to(data.device)
else:
mean = self.mean
std = self.std
# 执行逆变换公式:x * σ + μ
return (data * std) + mean
上面代码的核心是做标准化处理和逆标准化处理的。
# 执行标准化公式:(x - μ) / σ
return (data - mean) / std
通过pandas将csv数据导入。
df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path))

将数据里的目标列和日期列去掉,因为他们俩不是特征
cols.remove(self.target); cols.remove('date')
后面有句代码刚看到的时候感觉很奇怪,这样重组是为了啥?
df_raw = df_raw[['date']+cols+[self.target]]
研究了一下,这样做有几个好处。
结构化数据布局
通过强制指定列顺序为 [日期列, 特征列..., 目标列]
,实现以下目标:
- 时间列独立:将
date
(时间戳)放在首位,便于后续提取时间特征。 - 特征与目标分离:所有输入特征集中在中间列,目标变量单独放在末尾,符合机器学习任务中
X
(特征)和y
(目标)的规范格式。
特征筛选控制
通过cols
动态选择需要的特征列,把不需要的特征去掉。
数据划分
数据集划分逻辑
代码按 7:2:1(训练:测试:验证) 的比例划分数据集
- 训练集:前70% = 24,544条
- 测试集:后20% = 7,012条
- 验证集:中间剩余部分 = 35,064 - 24,544 - 7,012 = 3,508条
数据集划分的边界处理
代码中通过 border1s
和 border2s
动态计算各数据集的起始和结束位置,关键逻辑如下:
-
训练集:
[0, 24544)
直接使用前70%数据,无需回退序列长度(因训练数据足够从头开始切割完整序列)。 -
验证集:
[24544-96, 24544+3508)
起始位置回退96步(24544-96=24448
),确保第一个验证样本能包含完整的历史输入序列。 -
测试集:
[35064-7012-96, 35064)
起始位置回退96步(35064-7012-96=27956
),道理同上。
以训练集为例,训练集的开始位置就是border1s[0],结束位置是border2s[0]。
归一化
if self.scale:
train_data = df_data[border1s[0]:border2s[0]]
self.scaler.fit(train_data.values)
data = self.scaler.transform(df_data.values)
数据划分完之后做归一化处理,归一化的同时转成tensor形式。归一化前:
归一化后:
时间特征提取
df_stamp = df_raw[['date']][border1:border2]
df_stamp['date'] = pd.to_datetime(df_stamp.date)
data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq)
首先用pandas将时间戳这一列做下转换。核心处理是这句:
dates = pd.to_datetime(dates.date.values)
return np.vstack([feat(dates) for feat in time_features_from_frequency_str(freq)]).transpose(1,0)
上面的写法太难读了,把写法更改一下:
# 原始数据中的日期列转换为pandas的datetime类型
dates_series = dates.date.values # 提取原始日期列的值(假设是字符串格式)
datetime_objs = pd.to_datetime(dates_series) # 转换为datetime对象
# 获取时间特征生成函数列表
# 假设 time_features_from_frequency_str(freq) 返回类似 [Year, Month, Day...] 的特征函数
feature_funcs = time_features_from_frequency_str(freq)
# 逐个应用特征函数生成特征数组
feature_arrays = [feature_func(datetime_objs) for feature_func in feature_funcs]
# 将特征堆叠为二维数组并转置
# 假设原特征形状为 (num_features, num_samples),转置后为 (num_samples, num_features)
feature_matrix = np.vstack(feature_arrays) # 垂直堆叠
feature_matrix = feature_matrix.transpose(1, 0) # 行列转置
return feature_matrix
1. 日期格式转换
将原始日期字符串(如 "2023-01-01"
)转换为 pandas 的datetime类型对象
2. 获取时间特征函数
feature_funcs = time_features_from_frequency_str(freq)
这行代码的作用是,根据时间频率 freq
(如 "D"
表示天,"H"
表示小时)生成一组特征提取函数。输入h的话会返回4个特征提取函数。
3.生成时间特征
feature_arrays = [feature_func(datetime_objs) for feature_func in feature_funcs]
每个函数对 datetime_objs
处理,返回一个特征数组。比如 get_month(datetime_objs)
可能返回 [1, 1, 3, ...]
(月份数组)。
4. 堆叠与转置
feature_matrix = np.vstack(feature_arrays) # 形状变为 (3, 100)
feature_matrix = feature_matrix.transpose(1, 0) # 形状变为 (100, 3)
np.vstack
:将多个一维数组垂直堆叠成二维数组-
transpose
:将特征维度从第一维调整到第二维,符合机器学习输入格式(样本数,特征数)
最后的数据如上图所示,原先代表时间戳的序列经过四个时间特征提取方法,得到了四组时间序列数据。可以看到图片里第一行数据都是有变化的,这是因为第一行表示的是小时与天的关系;后面的数据没变化,是因为他们都在同一天,对于周,月,年来说特征是相同的。
具体的计算代码如下:
class HourOfDay(TimeFeature):
"""Hour of day encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.hour / 23.0 - 0.5
class DayOfWeek(TimeFeature):
"""Hour of day encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.dayofweek / 6.0 - 0.5
class DayOfMonth(TimeFeature):
"""Day of month encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.day - 1) / 30.0 - 0.5
class DayOfYear(TimeFeature):
"""Day of year encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.dayofyear - 1) / 365.0 - 0.5
可以看到,4个时间特征都被压缩到了同一区间[-0.5,0.5]内,主要有两个好处:
- 周期性特征表达:通过将离散时间属性转换为连续值,帮助模型捕捉周期性规律(如小时、星期循环。
- 数值稳定性:归一化到[-0.5, 0.5]区间可避免不同尺度特征对模型训练的影响
最后根据dataset生成dataloader。
data_loader = DataLoader(
data_set,
batch_size=batch_size,
shuffle=shuffle_flag,
num_workers=args.num_workers,
drop_last=drop_last)
get_item
def __getitem__(self, index):
# 获取索引对应的数据样本
# index: 当前样本的索引,通常由DataLoader迭代时传入
# 计算输入序列的起始和结束位置
s_begin = index # 输入序列的起始位置等于索引
s_end = s_begin + self.seq_len # 输入序列的结束位置(编码器输入长度)
# 计算解码器相关序列的起始和结束位置
# Informer的Decoder需要两部分输入:
# 1. label_len: 解码器的已知输入序列(通常是编码器末尾的部分序列)
# 2. pred_len: 需要预测的未来序列长度
r_begin = s_end - self.label_len # 解码器输入的起始位置(从编码器序列末尾回退label_len长度)
r_end = r_begin + self.label_len + self.pred_len # 解码器输入+预测的总结束位置
# 获取编码器输入序列(历史数据)
# seq_x.shape: [seq_len, n_features]
seq_x = self.data_x[s_begin:s_end] # 编码器输入:从s_begin到s_end的序列
# 获取解码器目标序列
if self.inverse:
# 当需要逆变换时(例如数据被标准化后需要还原):
# 解码器的输入部分取原始数据(data_x),预测部分取目标数据(data_y)
# 这种模式常见于推理时,需要将预测结果逆变换到原始数据空间
seq_y = np.concatenate([
self.data_x[r_begin:r_begin+self.label_len], # 前label_len步用原始数据
self.data_y[r_begin+self.label_len:r_end] # 后pred_len步用目标数据(可能包含空值或占位符)
], axis=0)
else:
# 常规模式下直接使用目标数据
# seq_y包含解码器的输入(label_len)和预测目标(pred_len)
# seq_y.shape: [label_len + pred_len, n_features]
seq_y = self.data_y[r_begin:r_end]
# 获取时间戳特征(例如月份、星期、节假日等)
# seq_x_mark: 编码器输入的时间特征
# seq_y_mark: 解码器输入及预测区间的时间特征
# data_stamp通常包含时间相关的one-hot编码或数值特征
seq_x_mark = self.data_stamp[s_begin:s_end] # 编码器时间特征
seq_y_mark = self.data_stamp[r_begin:r_end] # 解码器时间特征(包含预测区间)
# 返回的四个对象:
# seq_x: 编码器输入序列(历史数据)
# seq_y: 解码器输入(前label_len) + 预测目标(后pred_len)的拼接序列
# seq_x_mark: 编码器时间特征
# seq_y_mark: 解码器时间特征(含预测区间)
return seq_x, seq_y, seq_x_mark, seq_y_mark
编码器的输入比较简单,因为编码器不处理预测行为,所以输入序列长度就是我们超参数里定义的序列长度seq_len。
解码器的输入就比较讲究了,解码器的输入长度取决于标签长度label_len和预测长度pred_len。其中标签长度的数据从编码器输出的末尾取得。因此解码器序列的开始位置是send-label_len。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)