Arxml数据库解析

import pandas as pd
import xml.etree.ElementTree as ET
from typing import List, Dict
from pathlib import Path
from openpyxl.styles import PatternFill
from openpyxl import Workbook
from openpyxl.utils import get_column_letter


# 定义样式
RED_FILL = PatternFill(start_color='FFC7CE', end_color='FFC7CE', fill_type='solid')
HEADER_FILL = PatternFill(start_color='D9E1F2', end_color='D9E1F2', fill_type='solid')

def safe_find(element, path: str, ns: dict, attr: str = None):
    """安全查找XML节点"""
    node = element.find(path, ns)
    if node is None:
        return None
    return node.get(attr) if attr else node

def get_text(element, path: str, ns: dict) -> str:
    """安全获取文本内容"""
    node = safe_find(element, path, ns)
    return node.text.strip() if node is not None and node.text else '-'

def parse_arxml(arxml_file: str) -> List[Dict[str, str]]:
    """优化后的ARXML解析实现"""
    try:
        tree = ET.parse(arxml_file)
        root = tree.getroot()
    except Exception as e:
        
        raise ValueError(f"ARXML解析失败: {str(e)}")

    ns = {
            'ns': 'http://autosar.org/schema/r4.0',
            'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
        }


    metadata = {
        'signal_mappings': {},
        'signal_groups': {},
        'data_types': {},
        'units': {},
        'constraints': {},
        'compu_methods': {}
    }

    # ====== 元数据解析阶段 ======
    # 解析单位定义
    for unit in root.findall('.//ns:UNIT', ns):
        name = get_text(unit, 'ns:SHORT-NAME', ns)
        if name != '-':
            display_name = get_text(unit, 'ns:DISPLAY-NAME', ns)
            metadata['units'][name] = display_name

    # 解析约束条件
    for constr in root.findall('.//ns:DATA-CONSTR', ns):
        name = get_text(constr, 'ns:SHORT-NAME', ns)
        if name != '-':
            lower = get_text(constr, './/ns:LOWER-LIMIT', ns)
            upper = get_text(constr, './/ns:UPPER-LIMIT', ns)
            metadata['constraints'][name] = {'min': lower, 'max': upper}

    # 解析计算方法
    for method in root.findall('.//ns:COMPU-METHOD', ns):
        name = get_text(method, 'ns:SHORT-NAME', ns)
        if name != '-':
            category = get_text(method, 'ns:CATEGORY', ns)
            details = {}
            if category == 'LINEAR':
                scale = method.find('.//ns:COMPU-SCALE', ns)
                if scale is not None:
                    num = get_text(scale, 'ns:COMPU-NUMERATOR', ns)
                    den = get_text(scale, 'ns:COMPU-DENOMINATOR', ns)
                    details = f"Linear({num}/{den})"
            elif category == 'TEXTTABLE':
                for scale in method.findall('.//ns:COMPU-SCALE', ns):
                    lower = get_text(scale, 'ns:LOWER-LIMIT', ns)
                    text = get_text(scale, 'ns:COMPU-CONST/ns:VT', ns)
                    details[lower] = text
                details = str(details)
            metadata['compu_methods'][name] = {'category': category, 'details': details}

    # 解析数据类型
    for dtype in root.findall('.//ns:DATA-TYPE', ns):
        name = get_text(dtype, 'ns:SHORT-NAME', ns)
        if name != '-':
            props = dtype.find('.//ns:SW-DATA-DEF-PROPS-CONDITIONAL', ns)
            base_type = get_text(props, 'ns:BASE-TYPE-REF', ns)
            unit = get_text(props, 'ns:UNIT-REF', ns)
            constraint = get_text(props, 'ns:DATA-CONSTR-REF', ns)
            metadata['data_types'][name] = {
                'base_type': base_type.split('/')[-1] if base_type != '-' else '-',
                'unit': unit.split('/')[-1] if unit != '-' else '-',
                'constraint': constraint.split('/')[-1] if constraint != '-' else '-'
            }

    # ====== 通信架构解析优化 ======
    # 解析信号映射关系(方向+PDU)
    for mapping in root.findall('.//ns:I-SIGNAL-TO-PDU-MAPPING', ns):
        sig_ref = get_text(mapping, 'ns:I-SIGNAL-REF', ns)
        if sig_ref is not None:
            sig_name = sig_ref.split('/')[-1]
            pdu_ref = get_text(mapping, 'ns:PDU-REF', ns).split('/')[-1]
            direction = get_text(mapping, 'ns:COMMUNICATION-DIRECTION', ns)
            metadata['signal_mappings'][sig_name] = {'pdu': pdu_ref, 'direction': direction}



    # 修正信号组解析逻辑
    for pdu in root.findall('.//ns:I-PDU', ns):
        pdu_name = get_text(pdu, 'ns:SHORT-NAME', ns)
        if pdu_name == '-':
            continue
        
        metadata['signal_groups'][pdu_name] = {}
        
        # 遍历所有I-SIGNAL-GROUP
        for group in pdu.findall('.//ns:I-SIGNAL-GROUP', ns):
            group_name = get_text(group, 'ns:SHORT-NAME', ns)
            if group_name == '-':
                continue
            
            # 关键修复:定位正确的信号引用容器路径
            signal_refs = []
            refs_container = group.find('ns:I-SIGNAL-REFS', ns)
            if refs_container is not None:
                # 遍历所有I-SIGNAL-REF
                for sig_ref in refs_container.findall('ns:I-SIGNAL-REF', ns):
                    if sig_ref.text:
                        sig_name = sig_ref.text.split('/')[-1]
                        signal_refs.append(sig_name)
                        # 建立双向映射
                        metadata['signal_groups'].setdefault(sig_name, {}).setdefault(pdu_name, group_name)
            
            metadata['signal_groups'][pdu_name][group_name] = signal_refs

    # ====== 信号处理阶段 ======
    signals = []
    total_signals = 0
    for signal in root.findall('.//ns:I-SIGNAL', ns):
        try:
            sig_name = get_text(signal, 'ns:SHORT-NAME', ns)
            if sig_name == '-':
                continue

            total_signals += 1
            print(f"Processing signal [{total_signals}]: {sig_name}")

            # 基础信息
            sig_info = {
                'Signal Name': sig_name,
                'Direction': metadata['signal_mappings'].get(sig_name, {}).get('direction', '-'),
                'PDU': metadata['signal_mappings'].get(sig_name, {}).get('pdu', '-'),
                'Signal Group': '|'.join(
                    f"{pdu}:{group}" 
                    for pdu, group in metadata['signal_groups'].get(sig_name, {}).items()
                ) if sig_name in metadata['signal_groups'] else '-',
                'Data Type': '-',
                'Base Type': '-',
                'Length (bits)': get_text(signal, 'ns:LENGTH', ns),
                'Unit': '-',
                'Min Value': '-',
                'Max Value': '-',
                'Compu Method': '-'
            }

            # 处理数据类型
            dtype_ref = get_text(signal, 'ns:DATA-TYPE-REF', ns)
            if dtype_ref != '-':
                dtype_name = dtype_ref.split('/')[-1]
                dtype_info = metadata['data_types'].get(dtype_name, {})
                constraint_name = dtype_info.get('constraint', '-')
                constraint_info = metadata['constraints'].get(constraint_name, {})
                
                sig_info.update({
                    'Data Type': dtype_name,
                    'Base Type': dtype_info.get('base_type', '-'),
                    'Unit': metadata['units'].get(dtype_info.get('unit', '-'), '-'),
                    'Min Value': constraint_info.get('min', '-'),
                    'Max Value': constraint_info.get('max', '-')
                })

            # 处理计算方法
            compu_ref = get_text(signal, 'ns:COMPU-METHOD-REF', ns)
            if compu_ref != '-':
                compu_name = compu_ref.split('/')[-1]
                compu_info = metadata['compu_methods'].get(compu_name, {})
                sig_info['Compu Method'] = f"{compu_info.get('category', '')} {compu_info.get('details', '')}"

            signals.append(sig_info)
        except Exception as e:
            print(f"信号处理失败: {str(e)}")
    print(f"总解析信号数: {total_signals}")
    return signals


def export_to_excel(data: List[Dict[str, str]], excel_path: str):
    """带格式的Excel导出"""
    df = pd.DataFrame(data)
    #print(df)

    # 创建Excel写入器
    with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
        df.to_excel(writer, index=False,
                    sheet_name='DatabaseSignalsDetails',
                    columns=['Signal Group',
                    'Signal Name',
                    'Direction',
                    'Data Type',
                    'Base Type',
                    'Length (bits)',
                    'Unit',
                    'Min Value',
                    'Max Value',
                    'Compu Method'])
    
    # 获取工作表对象
    workbook = writer.book
    worksheet = workbook.active
    worksheet.title = "DatabaseSignalsDetails"
    worksheet = writer.sheets['DatabaseSignalsDetails']
    
    # 设置标题样式
    for col in worksheet[1]:
        col.fill = HEADER_FILL
    
    # 标记缺失数据
    for row in worksheet.iter_rows(min_row=2):
        for cell in row:
            if cell.value == '-':
                cell.fill = RED_FILL
    
    # 自动调整列宽
    # 设置列宽样式
        for col_idx, col in enumerate(worksheet.columns, 1):
            # 计算最大内容长度
            max_length = max(
                len(str(cell.value)) 
                for cell in col 
                if cell.value is not None
            )
            
            # 获取列字母(兼容所有版本)
            column_letter = get_column_letter(col_idx)
            worksheet.column_dimensions[column_letter].width = min(max_length + 2, 50)  # 限制最大50
    
    writer.save()


def main():
    arxml_file = input("请输入ARXML文件路径: ").strip('"')
    
    if not Path(arxml_file).exists():
        print(f"错误: 文件 {arxml_file} 不存在")
        return
    
    try:
        print("▌ 正在解析ARXML文件...")
        signal_data = parse_arxml(arxml_file)
    except Exception as e:
        print(f"解析失败: {str(e)}")
        return
    
    if not signal_data:
        print("警告: 未找到任何信号数据")
        return
    
    output_path = Path(arxml_file).with_name(f"{Path(arxml_file).stem}_SignalsReport.xlsx")
    
    try:
        print("▌ 正在生成Excel报告...")
        export_to_excel(signal_data, str(output_path))
        print(f"成功生成报告: {output_path}")
    except Exception as e:
        print(f"导出失败: {str(e)}")

# 使用示例
if __name__ == "__main__":
    from openpyxl.styles import Font, PatternFill
    main()

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐