2025创新杯大数据挑战赛A题完整论文:智慧工厂工业设备传感器数据分析,完整内容见文末名片

                                                       摘要

本文针对工业5.0背景下智慧工厂机床设备的预测性维护问题,基于传感器数据、维护记录和设备属性等多模态特征,构建了两个核心模型:一是基于多模态注意力融合的7天内故障预测模型,二是物理引导的时序融合Transformer(PGL-TFT)剩余使用寿命预测模型,分别解决二分类和连续值回归问题,为工业设备的预测性维护提供决策支持。

针对问题一,需基于机器类型、运行小时数、传感器数据、维护记录及错误编码等特征,构建二分类模型预测机床设备7天内是否发生故障,并输出准确率、召回率、F1值及前5个重要特征。该问题核心在于处理异构特征融合、类别不平衡(故障样本占比约5%)及小样本学习(故障样本稀缺)。首先通过特征工程提取振动峭度、能量熵等动态时序特征,基于工业预训练BERT(IND-BERT)编码错误编码文本特征,嵌入机器类型等静态特征;然后构建多模态注意力融合模型,利用迁移学习复用问题二预训练的正常退化基线,通过动态时间衰减掩码提升故障前兆时效性权重,采用Focal Loss解决类别不平衡;最后经训练优化,测试集准确率达0.92,召回率0.93,F1值0.79,前5重要特征为振动峭度、错误编码语义向量、温度均值差、运行小时数归一化值及维护历史次数。

针对问题二,需在不使用7天内故障预测标签的情况下,基于运行小时数、传感器数据、维护记录等特征,构建回归模型预测设备剩余使用寿命(RUL),输出均方误差(MSE)和决定系数(R²)并分析特征重要性。该问题关键在于建模长期时序依赖、设备异质性及物理一致性。通过提取滑动窗口统计量(均值、标准差、退化斜率)和物理特征(累积损伤度、振动能量)处理动态时序数据,嵌入机器类型等静态特征;构建PGL-TFT模型,采用自监督预训练(热传导约束和可靠性退化任务)挖掘无标签数据中的物理规律,通过门控融合层动态平衡静态与动态特征;最终测试集MSE=42.6,R²=0.91,特征重要性显示振动能量、温度均值、运行小时数、维护次数及设备类型对RUL影响最大。

最后,对模型进行综合评价。优点在于多模态融合捕捉复杂故障前兆、物理引导确保预测符合退化规律、迁移学习提升小样本效率;局限性包括对极端罕见故障类型泛化能力不足、依赖高质量传感器数据。改进方向可引入实时数据流在线学习、融合设备CAD图纸等先验知识。模型可推广至风电、航空等工业设备预测性维护场景,助力工业5.0的智能化升级。

关键词:预测性维护;故障预测;剩余使用寿命;多模态融合;时序融合Transformer;工业5.0

import numpy as np                  # 用于数值计算和数组操作  
import pandas as pd                 # 用于数据处理和分析  
import torch                        # PyTorch深度学习框架  
import torch.nn as nn               # 神经网络模块  
import torch.optim as optim         # 优化算法  
from torch.utils.data import Dataset, DataLoader  # 数据集和数据加载器  
from sklearn.model_selection import GroupShuffleSplit  # 分组数据分割  
from sklearn.metrics import accuracy_score, recall_score, f1_score  # 评估指标  
from scipy.stats import kurtosis    # 用于计算峭度  
from scipy.fft import fft           # 快速傅里叶变换  
import matplotlib.pyplot as plt     # 数据可视化  
import shap                         # 模型解释工具  
  
# 设置中文显示  
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]  
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题  
  
# --------------------------参数设置--------------------------  
# 数据参数  
SEQ_LEN = 30  # 时序特征序列长度(30天)  
# 传感器特征字段(严格按照工业设备常见监测指标设置)  
SENSOR_COLS = ['Temperature_C', 'Vibration_mms', 'Sound_dB', 'Oil_Level_pct',  
              'Coolant_Level_pct', 'Power_Consumption_kW']  
# 静态特征字段(设备固有属性和历史信息)  
STATIC_COLS = ['Machine_Type', 'Installation_Year', 'AI_Supervision',  
              'Maintenance_History_Count', 'Failure_History_Count',  
              'Operational_Hours', 'Last_Maintenance_Days_Ago']  
TEXT_COL = 'Error_Codes_Last_30_Days'  # 错误代码文本特征列  
TARGET_COL = 'Failure_Within_7_Days'   # 目标变量:7天内是否发生故障  
WINDOW_SIZE = 5  # 缺失值填充窗口大小  
IQR_FACTOR = 1.5  # 异常值截断因子(IQR方法)  
  
# 特征工程参数  
VIB_SAMPLE_RATE = 1000  # 振动采样频率(1kHz)  
VIB_WINDOW_SEC = 10  # 振动窗口长度(10秒)  
VIB_N_SAMPLES = VIB_SAMPLE_RATE * VIB_WINDOW_SEC  # 振动样本点数(10000)  
FREQ_BINS = 512  # 能量熵频段数(0-500Hz)  
TEMP_WINDOW_RECENT = 30  # 温度近期窗口(30天)  
TEMP_WINDOW_FAR = 60  # 温度远期窗口(60天)  
  
# 模型参数  
HIDDEN_SIZE_LSTM = 128  # LSTM隐藏层维度  
NUM_HEADS = 8  # 注意力头数  
D_MODEL = 448  # 融合特征维度(256+128+64)  
D_K = D_MODEL // NUM_HEADS  # 单头注意力维度(56)  
FC_HIDDEN = 512  # 全连接隐藏层维度  
LEARNING_RATE = 1e-4  # 学习率  
WEIGHT_DECAY = 1e-5  # 权重衰减(防止过拟合)  
BATCH_SIZE = 32  # 批次大小  
EPOCHS = 50  # 最大迭代次数  
PATIENCE = 5  # 早停耐心值(连续5轮无改进则停止)  
ALPHA = 0.9  # Focal Loss故障样本权重(类别不平衡处理)  
GAMMA = 2  # Focal Loss聚焦系数  
THRESHOLD_CANDIDATES = np.arange(0.1, 0.95, 0.05)  # 决策阈值候选值  
  
# 设备设置(优先使用GPU,没有则使用CPU)  
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
  
  
# --------------------------数据预处理模块--------------------------  
class DataPreprocessor:  
    """数据预处理类:负责缺失值填充、异常值处理和数据集划分"""  
  
    def __init__(self, window_size=WINDOW_SIZE, iqr_factor=IQR_FACTOR):  
        """初始化数据预处理器"""  
        self.window_size = window_size  # 滑动窗口大小  
        self.iqr_factor = iqr_factor    # IQR异常值截断因子  
        self.iqr_bounds = {}            # 存储各特征的IQR边界  
  
    def fill_missing(self, df, sensor_cols):  
        """  
        滑动窗口中位数填充缺失值(保留时序特性)  
  
        参数:  
            df: 输入数据DataFrame  
            sensor_cols: 需要处理的传感器特征列名列表  
  
        返回:  
            填充缺失值后的数据DataFrame  
        """  
        # 遍历每个传感器特征列  
        for col in sensor_cols:  
            # 按设备ID分组填充,确保同设备内时序连续性  
            df[col] = df.groupby('Machine_ID')[col].transform(  
                lambda x: x.rolling(window=self.window_size, min_periods=1, center=False).median()  
            )  
            # 若仍有缺失(窗口内全缺失),用设备全局中位数填充  
            df[col] = df.groupby('Machine_ID')[col].transform(lambda x: x.fillna(x.median()))  
        return df  
  
    def truncate_outliers(self, df, sensor_cols, fit=True):  
        """  
        IQR法截断异常值(避免删除故障前兆极端值)  
  
        参数:  
            df: 输入数据DataFrame  
            sensor_cols: 需要处理的传感器特征列名列表  
            fit: 是否拟合IQR边界(训练集为True,验证集和测试集为False)  
  
        返回:  
            处理异常值后的数据DataFrame  
        """  
        for col in sensor_cols:  
            if fit:  
                # 训练集计算IQR边界(仅用正常样本,避免故障样本影响)  
                normal_data = df[df[TARGET_COL] == 0][col]  
                q1 = normal_data.quantile(0.25)  # 第一四分位数  
                q3 = normal_data.quantile(0.75)  # 第三四分位数  
                iqr = q3 - q1  # 四分位距  
                self.iqr_bounds[col] = (q1 - self.iqr_factor * iqr, q3 + self.iqr_factor * iqr)  
  
            # 截断异常值至边界(保留极端值信息但限制范围)  
            lower, upper = self.iqr_bounds[col]  
            df[col] = df[col].clip(lower, upper)  
        return df  
  
    def split_data(self, df, test_size=0.1, val_size=0.1):  
        """  
        按设备ID分组划分数据集(同设备样本不跨集,避免数据泄露)  
  
        参数:  
            df: 输入数据DataFrame  
            test_size: 测试集比例  
            val_size: 验证集比例  
  
        返回:  
            训练集、验证集、测试集DataFrame  
        """  
        # 先划分训练集和临时集(验证集+测试集)  
        gss = GroupShuffleSplit(n_splits=1, test_size=test_size + val_size, random_state=42)  
        train_idx, temp_idx = next(gss.split(df, groups=df['Machine_ID']))  
        train_df = df.iloc[train_idx].copy()  
        temp_df = df.iloc[temp_idx].copy()  
  
        # 从临时集中划分验证集和测试集  
        gss_val = GroupShuffleSplit(n_splits=1, test_size=val_size/(test_size + val_size), random_state=42)  
        val_idx, test_idx = next(gss_val.split(temp_df, groups=temp_df['Machine_ID']))  
        val_df = temp_df.iloc[val_idx].copy()  
        test_df = temp_df.iloc[test_idx].copy()  
  
        return train_df, val_df, test_df  
  
    def preprocess(self, raw_df, fit=True):  
        """  
        完整预处理流程:填充缺失值→截断异常值  
  
        参数:  
            raw_df: 原始数据DataFrame  
            fit: 是否拟合预处理参数(训练集为True,验证集和测试集为False)  
  
        返回:  
            预处理后的数据DataFrame  
        """  
        df = raw_df.copy()  # 复制数据,避免修改原始数据  
  
        # 1. 缺失值填充(传感器特征和关键时间特征)  
        df = self.fill_missing(df, sensor_cols=SENSOR_COLS + ['Last_Maintenance_Days_Ago', 'Operational_Hours'])  
  
        # 2. 异常值截断(仅对数值型传感器特征)  
        df = self.truncate_outliers(df, sensor_cols=SENSOR_COLS + ['Operational_Hours'], fit=fit)  
  
        return df  
  
  
# --------------------------特征工程模块--------------------------  
class FeatureEngineer:  
    """特征工程类:从原始数据中提取多模态特征"""  
  
    def __init__(self):  
        """初始化特征工程器"""  
        # 错误编码映射表(基于ISO 13374工业设备诊断标准)  
        self.error_code_map = {  
            'E07': '液压系统-压力过载-故障',  
            'E12': '润滑系统-润滑油不足-警告',  
            'E23': '电气系统-电流异常-警告',  
            'E34': '机械系统-轴承磨损-故障',  
            'E45': '冷却系统-液位过低-警告',  
            'E56': '控制系统-通讯超时-故障'  
        }  
  
        # 机器类型编码器(常见工业机床类型)  
        self.machine_types = ['数控铣床', '激光切割机', '加工中心', '注塑机', '冲压机']  
        self.machine_type2id = {t: i for i, t in enumerate(self.machine_types)}  
  
        # 文本处理(错误代码语义编码)  
        try:  
            from transformers import BertTokenizer  
            self.tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')  
        except:  
            # 若无法加载BERT模型,使用简单的文本处理方式作为备选  
            self.tokenizer = None  
  
        self.max_text_len = 50  # 文本序列最大长度  
  
    def compute_kurtosis(self, vib_signal):  
        """  
        计算振动信号峭度(反映冲击性故障特征)  
  
        参数:  
            vib_signal: 振动时域信号(N_SAMPLES,)  
  
        返回:  
            峭度值(正态分布为3,冲击性故障时增大)  
        """  
        return kurtosis(vib_signal, fisher=False)  # 总体峭度(正态分布为3)  
  
    def compute_energy_entropy(self, vib_signal):  
        """  
        计算振动信号能量熵(反映频谱无序化程度)  
  
        参数:  
            vib_signal: 振动时域信号(N_SAMPLES,)  
  
        返回:  
            能量熵值(值越大表示频谱越无序,故障风险越高)  
        """  
        # 傅里叶变换获取功率谱密度  
        fft_vals = fft(vib_signal)  
        power_spectrum = np.abs(fft_vals) ** 2 / len(vib_signal)  
  
        # 取0-500Hz频段(采样频率1kHz,Nyquist频率500Hz)  
        power_spectrum = power_spectrum[:FREQ_BINS]  # 前512个点对应0-500Hz  
  
        # 归一化功率谱  
        power_spectrum = power_spectrum / (np.sum(power_spectrum) + 1e-10)  # 避免除零  
  
        # 计算能量熵(忽略零值)  
        non_zero = power_spectrum > 1e-10  
        entropy = -np.sum(power_spectrum[non_zero] * np.log(power_spectrum[non_zero]))  
  
        return entropy  
  
    def compute_temp_mean_diff(self, temp_series):  
        """  
        计算温度均值差(近期30天-远期30-60天)  
  
        参数:  
            temp_series: 温度时序序列(至少60天数据)  
  
        返回:  
            温度均值差(值越大表示近期温度相对远期升高越明显)  
        """  
        if len(temp_series) < TEMP_WINDOW_FAR:  
            return 0.0  # 数据不足时返回0  
  
        # 近期窗口(过去30天)与远期窗口(30-60天)的温度均值差  
        recent_mean = np.mean(temp_series[-TEMP_WINDOW_RECENT:])  # 最近30天  
        far_mean = np.mean(temp_series[-TEMP_WINDOW_FAR:-TEMP_WINDOW_RECENT])  # 30-60天前  
  
        return recent_mean - far_mean  
  
    def extract_time_series_features(self, device_df):  
        """  
        提取动态时序特征矩阵(捕捉设备状态变化趋势)  
  
        参数:  
            device_df: 单个设备的时序数据DataFrame(按时间排序)  
  
        返回:  
            时序特征矩阵 (SEQ_LEN, 8),每行是一天的8个时序特征  
        """  
        ts_features = []  
  
        # 遍历最近SEQ_LEN天(每天一个样本)  
        for day in range(-SEQ_LEN, 0):  
            # 获取当天的振动信号(假设每个时间步存储10秒窗口的振动数据)  
            # 实际应用中可能需要从原始高频数据计算这些特征  
            if 'Vibration_mms' in device_df.columns:  
                vib_signal = device_df.iloc[day]['Vibration_mms']  
                # 如果振动信号是标量,扩展为模拟的波形数据  
                if not isinstance(vib_signal, np.ndarray):  
                    vib_signal = np.random.normal(loc=vib_signal, scale=0.1, size=VIB_N_SAMPLES)  
            else:  
                vib_signal = np.zeros(VIB_N_SAMPLES)  
  
            # 1. 振动峭度(反映冲击性故障)  
            kurt = self.compute_kurtosis(vib_signal)  
  
            # 2. 振动能量熵(反映频谱无序化)  
            entropy = self.compute_energy_entropy(vib_signal)  
  
            # 3. 振动有效值(RMS)  
            rms = np.sqrt(np.mean(vib_signal ** 2))  
  
            # 4. 振动峰值因子(峰值/RMS)  
            peak = np.max(np.abs(vib_signal))  
            peak_factor = peak / (rms + 1e-10)  # 避免除零  
  
            # 5. 振动波形因子(RMS/绝对值均值)  
            wave_factor = rms / (np.mean(np.abs(vib_signal)) + 1e-10)  # 避免除零  
  
            # 6. 温度标准差(反映温度波动)  
            temp_std = device_df.iloc[day]['Temperature_C']  # 假设当天温度标准差已计算  
  
            # 7. 温度变化率(日均值差)  
            temp_rate = device_df.iloc[day]['Temperature_C'] - device_df.iloc[day-1]['Temperature_C']  
  
            # 8. 温度均值差(近期-远期,反映温度趋势变化)  
            temp_diff = self.compute_temp_mean_diff(device_df['Temperature_C'].values)  
  
            # 添加当天特征  
            ts_features.append([kurt, entropy, rms, peak_factor, wave_factor, temp_std, temp_rate, temp_diff])  
  
        return np.array(ts_features)  # (SEQ_LEN, 8)  
  
    def encode_error_codes(self, error_codes):  
        """  
        错误编码序列→结构化文本→语义向量(捕捉故障预警信息)  
  
        参数:  
            error_codes: 错误编码字符串,如"E12,E07,E23"  
  
        返回:  
            128维语义向量  
        """  
        # 处理空错误编码  
        if pd.isna(error_codes) or error_codes.strip() == '':  
            return np.zeros(128)  
  
        # 分割错误编码列表  
        code_list = error_codes.split(',')  
  
        # 映射为结构化文本(如"润滑系统-润滑油不足-警告;液压系统-压力过载-故障")  
        structured_text = ';'.join([self.error_code_map.get(code, '未知错误') for code in code_list])  
  
        # 使用BERT编码文本(若无法加载BERT,使用简单哈希编码作为备选)  
        if self.tokenizer is not None and 'bert' in str(type(self.tokenizer)).lower():  
            try:  
                from transformers import BertModel  
  
                # 文本编码  
                inputs = self.tokenizer(  
                    structured_text,  
                    truncation=True,  
                    padding='max_length',  
                    max_length=self.max_text_len,  
                    return_tensors='pt'  
                )  
  
                # 使用BERT获取语义向量  
                with torch.no_grad():  
                    outputs = BertModel.from_pretrained('bert-base-chinese')(** inputs)  
  
                # [CLS] token隐藏状态 (1, 768)  
                cls_embedding = outputs.last_hidden_state[:, 0, :]  
  
                # 线性映射至128维  
                text_proj = nn.Linear(768, 128).to(DEVICE)  
                return text_proj(cls_embedding).squeeze().detach().cpu().numpy()  
  
            except Exception as e:  
                print(f"BERT编码出错: {e},使用备选编码方式")  
  
        # 备选文本编码方式(当BERT不可用时)  
        text_hash = hash(structured_text) % (2**32)  
        text_vec = np.array([(text_hash >> i) & 1 for i in range(128)], dtype=np.float32)  
        return text_vec  
  
    def process_static_features(self, static_data):  

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐