前言:为什么数据清洗是数据分析最重要的技能?

在实际数据分析工作中,有一个不成文的规律:数据清洗占用了80%的时间,数据分析只占20%

原始数据往往是混乱的——缺失值、重复记录、格式不统一、异常值……这些问题如果不处理,后续的任何分析结论都是不可信的。

本文整理了 Pandas 数据清洗的 10个高频场景,每个场景都有完整可运行的代码示例,适合数据分析师日常参考使用。

环境要求:Python 3.8+,pandas 1.3+,numpy 1.21+


import pandas as pd
import numpy as np
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")

场景1:处理缺失值(Missing Values)

缺失值是数据清洗中最常见的问题。Pandas 中缺失值通常表示为 NaN(Not a Number)或 None

1.1 检测缺失值


import pandas as pd
import numpy as np

# 创建示例数据
data = {
    'name': ['Alice', 'Bob', None, 'Diana', 'Eve'],
    'age': [25, np.nan, 30, 28, np.nan],
    'salary': [8000, 12000, np.nan, 9500, 11000],
    'department': ['技术', '产品', '技术', None, '运营']
}
df = pd.DataFrame(data)

# 检查缺失值数量
print("=== 各列缺失值数量 ===")
print(df.isnull().sum())

# 缺失值比例
print("\n=== 各列缺失值比例 ===")
print(df.isnull().mean().round(3))

# 可视化缺失情况
print("\n=== 缺失值矩阵 ===")
print(df.isnull())

输出结果:


=== 各列缺失值数量 ===
name          1
age           2
salary        1
department    1
dtype: int64

=== 各列缺失值比例 ===
name          0.2
age           0.4
salary        0.2
department    0.2
dtype: float64

1.2 填充缺失值


# 方法1:用固定值填充
df['department'].fillna('未知部门', inplace=True)

# 方法2:用均值/中位数填充数值列
df['age'].fillna(df['age'].mean(), inplace=True)
df['salary'].fillna(df['salary'].median(), inplace=True)

# 方法3:前向填充(ffill)
df_time = pd.DataFrame({
    'date': pd.date_range('2026-01-01', periods=5),
    'value': [10, np.nan, np.nan, 40, 50]
})
df_time['value_filled'] = df_time['value'].fillna(method='ffill')
print(df_time)

# 方法4:插值填充(适合时序数据)
df_time['value_interp'] = df_time['value'].interpolate(method='linear')
print(df_time)

1.3 删除含缺失值的行/列


# 删除有任意缺失值的行
df_clean = df.dropna()

# 删除所有值都是NaN的行
df_clean2 = df.dropna(how='all')

# 只在关键列有缺失时才删除
df_clean3 = df.dropna(subset=['name', 'age'])

# 缺失值超过阈值的列直接删除
df_clean4 = df.dropna(axis=1, thresh=int(len(df) * 0.7))  # 保留至少70%非空的列

场景2:处理重复数据


import pandas as pd

data = {
    'user_id': [1, 2, 3, 2, 4, 1],
    'name': ['Alice', 'Bob', 'Charlie', 'Bob', 'Diana', 'Alice'],
    'action': ['buy', 'view', 'buy', 'view', 'buy', 'buy'],
    'amount': [100, 0, 200, 0, 150, 100]
}
df = pd.DataFrame(data)

# 检测重复行
print(f"重复行数量: {df.duplicated().sum()}")

# 按特定列检测重复
print(f"user_id重复数量: {df.duplicated(subset=['user_id']).sum()}")

# 删除完全重复行
df_dedup = df.drop_duplicates()
print(f"去重后行数: {len(df_dedup)}")

# 保留某列的第一次/最后一次出现
df_first = df.drop_duplicates(subset=['user_id'], keep='first')
df_last = df.drop_duplicates(subset=['user_id'], keep='last')

print("保留首次出现:")
print(df_first)

场景3:数据类型转换

数据类型错误是导致分析报错最常见的原因之一。


import pandas as pd

data = {
    'date': ['2026-01-01', '2026-01-02', '2026-01-03'],
    'sales': ['1000', '2000', '1500'],      # 字符串格式的数字
    'flag': ['True', 'False', 'True'],       # 字符串格式的布尔值
    'category': ['A', 'B', 'A'],
    'price': ['¥100.5', '¥200.0', '¥150.3'] # 带特殊字符的数字
}
df = pd.DataFrame(data)
print("原始数据类型:")
print(df.dtypes)

# 转换日期
df['date'] = pd.to_datetime(df['date'])

# 转换数值
df['sales'] = pd.to_numeric(df['sales'])

# 转换布尔值
df['flag'] = df['flag'].map({'True': True, 'False': False})

# 处理带特殊字符的数值
df['price'] = df['price'].str.replace('¥', '').astype(float)

# 转换为分类类型(节省内存)
df['category'] = df['category'].astype('category')

print("\n转换后数据类型:")
print(df.dtypes)
print("\n转换后数据:")
print(df)

场景4:字符串清洗


import pandas as pd

data = {
    'name': ['  Alice  ', 'BOB', 'charlie', '  Diana_Smith '],
    'phone': ['138-1234-5678', '139 8765 4321', '(010)12345678', '400-800-1234'],
    'email': ['ALICE@EXAMPLE.COM', 'bob@test.com', 'Charlie@Gmail.COM', 'invalid_email']
}
df = pd.DataFrame(data)

# 去除首尾空格并统一大小写
df['name'] = df['name'].str.strip().str.title()

# 标准化手机号(只保留数字)
df['phone_clean'] = df['phone'].str.replace(r'[\s\-\(\)]', '', regex=True)

# 邮箱转小写
df['email'] = df['email'].str.lower()

# 验证邮箱格式
import re
def is_valid_email(email):
    pattern = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, str(email)))

df['email_valid'] = df['email'].apply(is_valid_email)

print(df[['name', 'phone_clean', 'email', 'email_valid']])

场景5:处理异常值(Outliers)


import pandas as pd
import numpy as np

np.random.seed(42)
data = {
    'salary': np.concatenate([
        np.random.normal(15000, 3000, 95),  # 正常薪资
        [100000, 200000, -5000, 0, 999999]    # 异常值
    ])
}
df = pd.DataFrame(data)

# 方法1:基于IQR(四分位距)检测异常值
Q1 = df['salary'].quantile(0.25)
Q3 = df['salary'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers_iqr = df[(df['salary'] < lower_bound) | (df['salary'] > upper_bound)]
print(f"IQR方法检测到 {len(outliers_iqr)} 个异常值")
print(f"异常值范围: < {lower_bound:.0f} 或 > {upper_bound:.0f}")

# 方法2:基于Z-Score检测(标准差3倍)
from scipy import stats
z_scores = np.abs(stats.zscore(df['salary']))
outliers_zscore = df[z_scores > 3]
print(f"\nZ-Score方法检测到 {len(outliers_zscore)} 个异常值")

# 处理异常值
# 策略1:删除异常值
df_clean = df[(df['salary'] >= lower_bound) & (df['salary'] <= upper_bound)]

# 策略2:截断(Winsorize)
df['salary_winsor'] = df['salary'].clip(lower=lower_bound, upper=upper_bound)

# 策略3:用中位数替换
median_salary = df['salary'].median()
df['salary_fixed'] = df['salary'].where(
    (df['salary'] >= lower_bound) & (df['salary'] <= upper_bound),
    median_salary
)

print(f"\n原始数据量: {len(df)}")
print(f"删除异常值后: {len(df_clean)}")
print(f"截断后均值: {df['salary_winsor'].mean():.0f}")

场景6:数据标准化与归一化


import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, StandardScaler

data = {
    'age': [22, 35, 28, 45, 30, 55],
    'salary': [8000, 25000, 12000, 35000, 15000, 50000],
    'experience': [1, 8, 3, 15, 5, 25]
}
df = pd.DataFrame(data)

# 方法1:Min-Max归一化(值缩放到[0,1])
scaler_minmax = MinMaxScaler()
df_normalized = pd.DataFrame(
    scaler_minmax.fit_transform(df),
    columns=[f'{col}_norm' for col in df.columns]
)

# 方法2:Z-Score标准化(均值0,标准差1)
scaler_std = StandardScaler()
df_standardized = pd.DataFrame(
    scaler_std.fit_transform(df),
    columns=[f'{col}_std' for col in df.columns]
)

# 手动实现(理解原理)
df['salary_manual_norm'] = (df['salary'] - df['salary'].min()) / (df['salary'].max() - df['salary'].min())
df['salary_manual_std'] = (df['salary'] - df['salary'].mean()) / df['salary'].std()

result = pd.concat([df, df_normalized, df_standardized], axis=1)
print(result[['salary', 'salary_norm', 'salary_std', 'salary_manual_norm', 'salary_manual_std']].round(3))

场景7:数据格式统一(日期、时间)


import pandas as pd

# 各种日期格式统一处理
date_data = {
    'date_str': [
        '2026-01-15',
        '15/01/2026',
        '20260115',
        'Jan 15, 2026',
        '2026年1月15日'
    ]
}
df = pd.DataFrame(date_data)

# 统一转换为datetime(自动推断格式)
df['date_parsed'] = pd.to_datetime(df['date_str'], infer_datetime_format=True, errors='coerce')

# 中文日期需要预处理
df['date_str2'] = df['date_str'].str.replace('年', '-').str.replace('月', '-').str.replace('日', '')
df['date_parsed2'] = pd.to_datetime(df['date_str2'], errors='coerce')

# 从datetime提取各部分
df['year'] = df['date_parsed'].dt.year
df['month'] = df['date_parsed'].dt.month
df['day'] = df['date_parsed'].dt.day
df['weekday'] = df['date_parsed'].dt.day_name()
df['quarter'] = df['date_parsed'].dt.quarter

print(df[['date_str', 'date_parsed', 'year', 'month', 'day', 'weekday', 'quarter']])

场景8:处理不一致的分类数据


import pandas as pd

data = {
    'gender': ['男', 'male', 'M', '女', 'Female', 'f', '男性', 'MALE', '未知'],
    'city': ['北京', '上海', 'beijing', 'Shanghai', '北京市', '上海市', 'BJ', 'SH', None]
}
df = pd.DataFrame(data)

# 性别标准化
gender_mapping = {
    '男': 'M', 'male': 'M', 'm': 'M', '男性': 'M', 'MALE': 'M',
    '女': 'F', 'Female': 'F', 'female': 'F', 'f': 'F', '女性': 'F',
    '未知': 'Unknown'
}
df['gender_clean'] = df['gender'].str.strip().map(
    lambda x: gender_mapping.get(x, gender_mapping.get(x.capitalize() if isinstance(x, str) else x, 'Unknown'))
)

# 城市标准化
city_mapping = {
    '北京': '北京', '北京市': '北京', 'beijing': '北京', 'BJ': '北京',
    '上海': '上海', '上海市': '上海', 'Shanghai': '上海', 'SH': '上海'
}
df['city_clean'] = df['city'].map(city_mapping).fillna('其他')

print(df[['gender', 'gender_clean', 'city', 'city_clean']])

场景9:数据一致性校验


import pandas as pd
import numpy as np

data = {
    'order_id': ['O001', 'O002', 'O003', 'O004', 'O005'],
    'quantity': [5, -2, 0, 3, 10],          # -2 异常
    'unit_price': [100, 200, 150, 0, 300],   # 0 可能异常
    'total_amount': [500, -400, 0, 300, 2500], # 应等于 quantity * unit_price
    'discount': [0.1, 0.2, 0.05, 0.3, 1.5]   # 1.5 异常,折扣率超过1
}
df = pd.DataFrame(data)

# 规则1:数量不能为负
invalid_quantity = df[df['quantity'] <= 0]
print(f"数量异常记录: {len(invalid_quantity)} 条")
print(invalid_quantity[['order_id', 'quantity']])

# 规则2:折扣率必须在[0,1]范围内
invalid_discount = df[(df['discount'] < 0) | (df['discount'] > 1)]
print(f"\n折扣率异常记录: {len(invalid_discount)} 条")

# 规则3:总金额 = 数量 × 单价
df['calculated_amount'] = df['quantity'] * df['unit_price']
df['amount_diff'] = abs(df['total_amount'] - df['calculated_amount'])
invalid_amount = df[df['amount_diff'] > 0.01]  # 允许浮点误差
print(f"\n金额校验失败记录: {len(invalid_amount)} 条")
print(invalid_amount[['order_id', 'quantity', 'unit_price', 'total_amount', 'calculated_amount']])

# 生成数据质量报告
quality_report = {
    '总记录数': len(df),
    '数量异常': len(invalid_quantity),
    '折扣率异常': len(invalid_discount),
    '金额不一致': len(invalid_amount),
    '数据质量分': f"{(1 - (len(invalid_quantity) + len(invalid_discount) + len(invalid_amount)) / len(df) / 3) * 100:.1f}%"
}
print("\n=== 数据质量报告 ===")
for k, v in quality_report.items():
    print(f"  {k}: {v}")

场景10:构建可复用的数据清洗Pipeline

将以上操作封装成可复用的清洗流水线,适合生产环境使用:


import pandas as pd
import numpy as np
from typing import List, Dict, Optional

class DataCleaner:
    """可复用的数据清洗Pipeline"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.report = []
    
    def remove_duplicates(self, subset: Optional[List[str]] = None, keep: str = 'first'):
        """去除重复行"""
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        after = len(self.df)
        self.report.append(f"去重: 删除 {before - after} 行重复记录")
        return self
    
    def fill_missing(self, strategy: Dict):
        """
        填充缺失值
        strategy: {'列名': '策略'} 
        策略: 'mean', 'median', 'mode', '固定值'
        """
        for col, method in strategy.items():
            if col not in self.df.columns:
                continue
            missing_count = self.df[col].isnull().sum()
            if missing_count == 0:
                continue
            
            if method == 'mean':
                self.df[col].fillna(self.df[col].mean(), inplace=True)
            elif method == 'median':
                self.df[col].fillna(self.df[col].median(), inplace=True)
            elif method == 'mode':
                self.df[col].fillna(self.df[col].mode()[0], inplace=True)
            else:
                self.df[col].fillna(method, inplace=True)
            
            self.report.append(f"缺失值填充: {col} 列填充 {missing_count} 个缺失值(策略: {method})")
        return self
    
    def remove_outliers(self, columns: List[str], method: str = 'iqr', threshold: float = 1.5):
        """去除异常值"""
        for col in columns:
            if col not in self.df.columns:
                continue
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower = Q1 - threshold * IQR
                upper = Q3 + threshold * IQR
                mask = (self.df[col] >= lower) & (self.df[col] <= upper)
                removed = (~mask).sum()
                self.df = self.df[mask | self.df[col].isnull()]
                self.report.append(f"异常值处理: {col} 列删除 {removed} 个异常值")
        return self
    
    def standardize_strings(self, columns: List[str], case: str = 'lower'):
        """标准化字符串列"""
        for col in columns:
            if col not in self.df.columns:
                continue
            self.df[col] = self.df[col].astype(str).str.strip()
            if case == 'lower':
                self.df[col] = self.df[col].str.lower()
            elif case == 'upper':
                self.df[col] = self.df[col].str.upper()
            elif case == 'title':
                self.df[col] = self.df[col].str.title()
            self.report.append(f"字符串标准化: {col} 列({case})")
        return self
    
    def convert_types(self, type_map: Dict):
        """转换数据类型"""
        for col, dtype in type_map.items():
            if col not in self.df.columns:
                continue
            if dtype == 'datetime':
                self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
            elif dtype == 'numeric':
                self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
            elif dtype == 'category':
                self.df[col] = self.df[col].astype('category')
            else:
                self.df[col] = self.df[col].astype(dtype, errors='ignore')
            self.report.append(f"类型转换: {col} 列 → {dtype}")
        return self
    
    def get_result(self):
        """返回清洗后的DataFrame和报告"""
        print("=== 数据清洗报告 ===")
        for item in self.report:
            print(f"  ✅ {item}")
        print(f"  📊 最终数据量: {len(self.df)} 行 × {len(self.df.columns)} 列")
        return self.df


# 使用示例
np.random.seed(42)
raw_data = {
    'user_id': range(100),
    'name': ['User_' + str(i) for i in range(100)],
    'age': np.concatenate([np.random.randint(18, 65, 95), [200, -5, 0, np.nan, np.nan]]),
    'salary': np.concatenate([np.random.randint(5000, 50000, 95), [1000000, -1000, np.nan, np.nan, np.nan]]),
    'join_date': pd.date_range('2020-01-01', periods=100).astype(str),
    'department': np.random.choice(['技术', '产品', '运营', '市场', None], 100)
}
raw_df = pd.DataFrame(raw_data)

# 一行搞定数据清洗
clean_df = (DataCleaner(raw_df)
    .remove_duplicates(subset=['user_id'])
    .fill_missing({'age': 'mean', 'salary': 'median', 'department': '未知'})
    .remove_outliers(columns=['age', 'salary'], method='iqr')
    .convert_types({'join_date': 'datetime', 'department': 'category'})
    .get_result()
)

输出结果:


=== 数据清洗报告 ===
  ✅ 缺失值填充: age 列填充 2 个缺失值(策略: mean)
  ✅ 缺失值填充: salary 列填充 3 个缺失值(策略: median)
  ✅ 缺失值填充: department 列填充 X 个缺失值(策略: 未知)
  ✅ 异常值处理: age 列删除 3 个异常值
  ✅ 异常值处理: salary 列删除 2 个异常值
  ✅ 类型转换: join_date 列 → datetime
  ✅ 类型转换: department 列 → category
  📊 最终数据量: 95 行 × 6 列

总结:数据清洗核心思路

做了这么多年数据分析,船长总结了数据清洗的核心思路:

① 先诊断,再处理:不要盲目清洗,先用 df.info()df.describe()df.isnull().sum() 全面了解数据状态。

② 理解业务含义:缺失值不一定要填充,可能"缺失"本身就是一种信息。异常值不一定是错误,可能是真实的极端案例。

③ 保留原始数据:清洗时永远操作副本 df.copy(),不要直接修改原始数据。

④ 记录清洗过程:养成写数据清洗日志的习惯,方便后续排查和复现。

⑤ 自动化流程:用 Pipeline 封装重复操作,节省时间,减少人为错误。

掌握这10个场景,日常数据清洗工作至少覆盖90%的情况。如果你在工作中遇到了其他复杂场景,欢迎评论区留言,我来补充。

---

相关推荐:

- SQL窗口函数实战:5个高频场景完整代码

- Python数据分析全流程:从读取到可视化

- Pandas性能优化:10个让代码快10倍的技巧

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐