数据清洗
1、处理缺失值
2、处理重复值
3、处理异常值
4、文本数据清洗

数据处理
1、特征工程支持
2、数据格式标准化
3、数据合并与拆分

一、含义

1、处理缺失值

  • 检测:自动识别数据中的空值、NaN、占位符(如"NULL"、“NA”)。
  • 解决方案:
    删除:直接删除含有缺失值的行或列。这种方法适用于缺失值较少且对整体数据分析影响不大的情况。
    插值:根据其他样本的值估计缺失值。常用的插值方法包括均值插值、中位数插值、众数插值等。
    使用默认值:为缺失值设定一个合理的默认值,如0、平均值或某个特定代码。
    预测:根据数据的趋势预测缺失值。

2、处理重复值

  • 检测:基于完整行或关键列(如ID)去重。
  • 解决方案:
    保留首行/末行:在存在重复行的情况下,选择保留每组重复行的首行或末行数据,并删除其余行。
    自定义:根据业务需求,定义自定义方法来处理重复项,如合并重复项中的某些字段。

3、处理异常值

  • 检测:
    统计方法:通过计算数据的均值、标准差等统计量来判断哪些值偏离正常范围。例如,将超出均值±3倍标准差的值视为异常值。
    可视化方法:利用箱线图、散点图等可视化工具直观地发现异常值。
  • 解决方案:
    修正(如截断到合理范围)。
    可以删除异常值,或者将异常值替换为正常范围内的值(如均值、阈值或中位数)。

4、文本数据清洗

  • 基础处理:
    去除HTML标签、特殊字符、停用词、敏感词或无意义词。
    正则表达式匹配替换(如邮箱、电话号码脱敏)。
  • 高级功能:
    拼写纠正、词干提取(英文)、分词(中文)。
    情感符号/表情处理。

5、特征工程支持

  • 数值特征:
    归一化(Min-Max)、标准化(Z-score)。
    分箱、对数变换。
  • 分类特征:
    处理类别不平衡(过采样/欠采样)。
    稀有类别合并。

6、数据格式标准化

  • 统一格式:日期时间(如YYYY-MM-DD)。文本(大小写、空格、缩写标准化)。数值(单位统一,如千克→克)。
  • 类型转换:字符串转数值。

7、数据合并与拆分

  • 将多个数据源的数据合并到一起,或者将一个数据集拆分成多个子集(如训练集、验证集和测试集)。
  • 拆分数据时,可以根据比例(如 70% 训练集、15% 验证集、15% 测试集)随机拆分,也可以根据特定的条件(如时间顺序)进行拆分。

二、工具

  • 工具:在这里插入图片描述

  • 地址:https://www.spsspro.com/analysis/index

1、导入本地数据

在这里插入图片描述

2、数据处理

通用方法:异常值处理、无效样本处理、缺失值处理、样本均衡等;
自定义数据处理:编写代码,直接对数据处理。

  • 通用方法
    在这里插入图片描述
  • 自定义算法
    在这里插入图片描述
    在这里插入图片描述
    局限性:
    1、限制导入文件的格式,只能导入Excel、CSV、SPSS等格式,无法导入TXT文件;
    2、默认文件首行为表头;
    3、通用方法主要是对数值型数据进行处理;
    4、自定义方法需编写代码,对非技术用户不友好。

三、代码

from flask import Flask, request, jsonify
import pandas as pd
import numpy as np
import re
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import jieba  # 中文分词
from spellchecker import SpellChecker
import logging
from typing import Union, Dict, List
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

# 初始化工具
stemmer = PorterStemmer()
spell = SpellChecker()

# 获取停用词
try:
    stop_words = set(stopwords.words('english'))
except:
    import nltk

    nltk.download('stopwords')
    stop_words = set(stopwords.words('english'))


def convert_to_dataframe(data: Union[Dict, List[Dict]]) -> pd.DataFrame:
    """将输入数据转换为DataFrame"""
    if isinstance(data, dict):
        return pd.DataFrame([data])
    elif isinstance(data, list):
        return pd.DataFrame(data)
    else:
        raise ValueError("输入数据必须是字典或字典列表")


@app.route('/clean', methods=['POST'])
def data_cleaning():
    try:
        # 获取前端数据
        input_data = request.json
        df = convert_to_dataframe(input_data['data'])
        operations = input_data.get('operations', {})

        # 1. 处理缺失值
        if operations.get('handle_missing'):
            missing_config = operations['handle_missing']
            # 首先识别占位符并转换为NaN
            placeholders = missing_config.get('placeholders', ['NULL', 'NA', 'NaN', ''])
            for col in df.columns: # 遍历列名
                df[col] = df[col].replace(placeholders, np.nan)

            strategy = missing_config.get('strategy', 'drop') # "mean"
            if strategy == 'drop':
                how = missing_config.get('how', 'any') # 'any'(默认):如果任何一个值是 NaN,就删除该行或列。 'all':只有全部值是 NaN 时才删除该行或列。
                axis = missing_config.get('axis', 0) # axis 参数控制删除方向:0 或 'index'(默认):删除行;1 或 'columns':删除列
                df = df.dropna(how=how, axis=axis)
            elif strategy in ['mean', 'median', 'most_frequent', 'constant']: # 均值/中位数/众数/常量
                imputer = SimpleImputer(
                    strategy=strategy,
                    fill_value=missing_config.get('fill_value') # fill_value(仅当 strategy='constant' 时有效),获取用户指定的填充值
                )
                df = pd.DataFrame(imputer.fit_transform(df), columns=df.columns) # fit:计算每列的均值/中位数/众数,transform:用计算出的值填充 NaN
            elif strategy == 'knn': # 基于K近邻的插值
                imputer = KNNImputer(n_neighbors=missing_config.get('n_neighbors', 5)) # n_neighbors指定用于计算插值的邻居数量
                df = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)

        # 2. 处理重复值
        if operations.get('handle_duplicates'):
            dup_config = operations['handle_duplicates']
            subset = dup_config.get('subset', None) # None(默认):比较整行是否完全相同
            keep = dup_config.get('keep', 'first') # 'first'(默认):保留第一条;'last':保留最后一条;False:删除所有重复行(只保留完全不重复的行)
            df = df.drop_duplicates(subset=subset, keep=keep)

        # 3. 处理异常值
        if operations.get('handle_outliers'):
            outlier_config = operations['handle_outliers']
            method = outlier_config.get('method', 'zscore') # 异常值检测方法,默认 'zscore'(Z分数法),可选 'iqr'(IQR四分位距法)
            columns = outlier_config.get('columns', df.select_dtypes(include=np.number).columns) # 默认自动选择所有数值型列(np.number 类型)

            for col in columns:
                if method == 'zscore': # Z-score(标准差法):数据点与均值的距离超过 threshold 倍标准差时,视为异常值。适用场景:数据近似服从正态分布时效果较好
                    mean = df[col].mean()
                    std = df[col].std()
                    threshold = outlier_config.get('threshold', 3)
                    lower, upper = mean - threshold * std, mean + threshold * std
                elif method == 'iqr': # 异常值定义为低于 Q1 - 1.5*IQR 或高于 Q3 + 1.5*IQR 的值。适用于非正态分布数据,抗干扰性强。
                    Q1 = df[col].quantile(0.25) # 第一四分位数
                    Q3 = df[col].quantile(0.75) # 第三四分位数
                    IQR = Q3 - Q1 # 四分位距
                    lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR

                action = outlier_config.get('action', 'clip') # 'clip'(默认):将异常值截断到边界值;'remove':直接删除包含异常值的行;'replace':用指定值替换异常值。
                if action == 'clip':
                    df[col] = df[col].clip(lower, upper)
                elif action == 'remove':
                    df = df[(df[col] >= lower) & (df[col] <= upper)]
                elif action == 'replace':
                    replace_with = outlier_config.get('replace_with', 'mean')
                    if replace_with == 'mean':
                        replacement = df[col].mean()
                    elif replace_with == 'median':
                        replacement = df[col].median()
                    else:
                        replacement = replace_with
                    df.loc[(df[col] < lower) | (df[col] > upper), col] = replacement

        # 4. 文本数据清洗
        if operations.get('text_cleaning'):
            text_config = operations['text_cleaning']
            columns = text_config.get('columns', df.select_dtypes(include='object').columns) # 默认选择所有 object 类型的列(通常是字符串)。

            for col in columns:
                # 基础处理
                if text_config.get('remove_html'):
                    df[col] = df[col].apply(lambda x: re.sub(r'<[^>]+>', '', str(x))) # 匹配所有HTML标签(如 <b>, </p>)
                if text_config.get('remove_special_chars'):
                    df[col] = df[col].apply(lambda x: re.sub(r'[^\w\s]', '', str(x))) # 匹配所有非字母、数字、下划线或空格的字符
                if text_config.get('lowercase'): # 转换为小写
                    df[col] = df[col].str.lower()
                if text_config.get('remove_stopwords'): # 去除停用词
                    df[col] = df[col].apply(
                        lambda x: ' '.join([word for word in str(x).split() if word not in stop_words]))
                if text_config.get('stemming') and text_config.get('language') == 'en': # 词干提取
                    df[col] = df[col].apply(lambda x: ' '.join([stemmer.stem(word) for word in str(x).split()])) # 将单词还原为词干形式(如 "running" → "run")
                if text_config.get('spell_check'): # 拼写检查,纠正拼写错误(如 "helo" → "hello"),但可能误判专有名词或缩写。
                    df[col] = df[col].apply(lambda x: ' '.join([spell.correction(word) for word in str(x).split()]))
                if text_config.get('chinese_segmentation') and text_config.get('language') == 'zh': # 中文分词,将连续的中文文本分割为词语
                    df[col] = df[col].apply(lambda x: ' '.join(jieba.cut(str(x))))
                if text_config.get('desensitize'): # 敏感信息脱敏
                    # 邮箱脱敏
                    df[col] = df[col].apply(lambda x: re.sub(r'(\w+)(@\w+\.\w+)', r'*****\2', str(x))) # 匹配邮箱,替换为:*****@example.com(保留域名)
                    # 电话号码脱敏
                    df[col] = df[col].apply(lambda x: re.sub(r'(\d{3})\d{4}(\d{4})', r'\1****\2', str(x))) # 匹配11位手机号,替换保留前3位和后4位)

        # 5. 特征工程
        if operations.get('feature_engineering'):
            fe_config = operations['feature_engineering']

            # 数值特征处理
            if fe_config.get('numeric_features'):
                num_config = fe_config['numeric_features']
                columns = num_config.get('columns', df.select_dtypes(include=np.number).columns) # 指定要处理的数值列,默认选择所有数值列。

                # 归一化
                if num_config.get('normalization') == 'minmax': # 最小-最大归一化,结果范围:[0, 1]
                    scaler = MinMaxScaler()
                    df[columns] = scaler.fit_transform(df[columns])
                elif num_config.get('normalization') == 'zscore': # Z-score标准化,结果范围:均值为 0,标准差为 1
                    scaler = StandardScaler()
                    df[columns] = scaler.fit_transform(df[columns])

                # 分箱,将连续数值离散化为若干个区间(如年龄分组)
                if num_config.get('binning'):
                    for col, bins in num_config['binning'].items():
                        if col in df.columns:
                            df[col + '_binned'] = pd.cut(df[col], bins=bins, labels=False) # bins:分箱边界(如 [0, 18, 35, 60, 100]),labels=False:返回箱的编号(0, 1, 2...)
                '''
                示例:
                输入:age = [15, 25, 45, 70]
                分箱边界:[0, 18, 35, 60, 100]
                输出:[0, 1, 2, 3](分别对应[0 - 18), [18 - 35), [35 - 60), [60 - 100))
                '''

                if num_config.get('log_transform'): # 对数变换
                    for col in num_config['log_transform']:
                        if col in df.columns:
                            df[col + '_log'] = np.log1p(df[col]) # log1p(x) = log(1 + x)(避免 x=0 时出错)

            # 分类特征处理)
            if fe_config.get('categorical_features'):
                cat_config = fe_config['categorical_features']
                columns = cat_config.get('columns', df.select_dtypes(include='object').columns)

                # 稀有类别合并
                if cat_config.get('rare_category_handling') == 'merge':  # 将出现频率低于 threshold(默认5%)的类别合并为 'RARE',防止稀有类别导致过拟合。
                    threshold = cat_config.get('threshold', 0.05)
                    for col in columns:
                        value_counts = df[col].value_counts(normalize=True)
                        rare_categories = value_counts[value_counts < threshold].index
                        df[col] = df[col].replace(rare_categories, 'RARE')

                # 类别不平衡处理
                for col in columns:
                    if cat_config.get(f'{col}_balance') == 'oversample':  # 过采样
                        ros = RandomOverSampler()
                        df, _ = ros.fit_resample(df, df[col]) # 对数据进行过采样,使每个类别的样本数量相同。
                    elif cat_config.get(f'{col}_balance') == 'undersample':  # 欠采样
                        rus = RandomUnderSampler()
                        df, _ = rus.fit_resample(df, df[col]) # 对数据进行欠采样,使每个类别的样本数量相同。
                '''
                cat_config = {
                    'columns': ['category1', 'category2'],
                    'rare_category_handling': 'merge',
                    'threshold': 0.05,
                    'category1_balance': 'oversample',
                    'category2_balance': 'undersample'
                }
                '''

        # 6. 数据格式标准化
        if operations.get('standardize_format'):
            format_config = operations['standardize_format']

            # 将不同格式的日期统一转换为 YYYY-MM-DD 格式。
            if format_config.get('date_columns'):
                for col, fmt in format_config['date_columns'].items():
                    if col in df.columns:
                        df[col] = pd.to_datetime(df[col], format=fmt).dt.strftime('%Y-%m-%d') # col:日期列名; fmt:原始日期格式

            # 文本数据标准化
            if format_config.get('text_columns'):
                for col, action in format_config['text_columns'].items():
                    if col in df.columns:
                        if action == 'lower': # 转换为小写
                            df[col] = df[col].str.lower()
                        elif action == 'upper': # 转换为大写
                            df[col] = df[col].str.upper()
                        elif action == 'strip': # 去除首尾空格
                            df[col] = df[col].str.strip()
            # 数值单位转换
            if format_config.get('numeric_conversion'):
                for col, numeric in format_config['numeric_conversion'].items():
                    if col in df.columns:  # 数值单位转换
                        df[col] = df[col] * float(numeric)

            # 类型转换
            if format_config.get('type_conversion'):
                for col, unit in format_config['type_conversion'].items():
                    if col in df.columns: # 类型转换
                        df[col] = df[col].astype(unit)

        # 7. 数据合并与拆分
        if operations.get('split_data'):
            split_config = operations['split_data']
            if split_config.get('split_type') == 'random':
                train_size = split_config.get('train_size', 0.7)
                test_size = split_config.get('test_size', 0.15)
                val_size = split_config.get('val_size', 0.15)

                # 先拆分训练集和剩余集
                train, remaining = train_test_split(df, train_size=train_size, random_state=42)
                # 再拆分验证集和测试集
                test_size_adjusted = test_size / (1 - train_size)
                val, test = train_test_split(remaining, test_size=test_size_adjusted, random_state=42)

                result = {
                    'train': train.to_dict(orient='records'),
                    'val': val.to_dict(orient='records'),
                    'test': test.to_dict(orient='records')
                }
                return jsonify({'status': 'success', 'result': result})

        # 返回清洗后的数据
        return jsonify({
            'status': 'success',
            'result': df.to_dict(orient='records')
        })

    except Exception as e:
        logging.error(f"数据处理错误: {str(e)}")
        return jsonify({
            'status': 'error',
            'message': str(e)
        }), 400


if __name__ == '__main__':
    app.run(debug=True)


'''
处理缺失值
{
    "data": [
        {"id": 1, "age": 25, "income": 50000, "email": "test1@example.com"},
        {"id": 2, "age": null, "income": null, "email": "NULL"},
        {"id": 3, "age": 30, "income": 60000, "email": "test3@example.com"}
    ],
    "operations": {
        "handle_missing": {
            "placeholders": ["NULL", "NA", "NaN", ""],
            "strategy": "mean",
            "columns": ["age", "income"]
        }
    }
}

处理重复值
{
    "data": [
        {"id": 1, "name": "Alice", "score": 85},
        {"id": 1, "name": "Alice", "score": 85},
        {"id": 2, "name": "Bob", "score": 90}
    ],
    "operations": {
        "handle_duplicates": {
            "subset": ["id"],
            "keep": "first"
        }
    }
}

文本清洗
{
    "data": [
        {"id": 1, "comment": "This is a <b>great</b> product! I love it!!!"},
        {"id": 2, "comment": "The service was TERRIBLE. Never again."}
    ],
    "operations": {
        "text_cleaning": {
            "columns": ["comment"],
            "remove_html": true,
            "remove_special_chars": true,
            "lowercase": true,
            "remove_stopwords": true,
            "stemming": true,
            "language": "en"
        }
    }
}

数据拆分
{
    "data": [
        {"id": 1, "feature": 0.5, "label": 1},
        {"id": 2, "feature": 0.8, "label": 1},
        {"id": 3, "feature": 0.2, "label": 0},
        {"id": 4, "feature": 0.4, "label": 0},
        {"id": 5, "feature": 0.9, "label": 1}
    ],
    "operations": {
        "split_data": {
            "split_type": "random",
            "train_size": 0.6,
            "test_size": 0.2,
            "val_size": 0.2
        }
    }
}

'''
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐