Python Pandas数据清洗实战:10个高频场景完整代码,数据分析必备技能
前言:为什么数据清洗是数据分析最重要的技能?
在实际数据分析工作中,有一个不成文的规律:数据清洗占用了80%的时间,数据分析只占20%。
原始数据往往是混乱的——缺失值、重复记录、格式不统一、异常值……这些问题如果不处理,后续的任何分析结论都是不可信的。
本文整理了 Pandas 数据清洗的 10个高频场景,每个场景都有完整可运行的代码示例,适合数据分析师日常参考使用。
环境要求:Python 3.8+,pandas 1.3+,numpy 1.21+
import pandas as pd
import numpy as np
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
场景1:处理缺失值(Missing Values)
缺失值是数据清洗中最常见的问题。Pandas 中缺失值通常表示为 NaN(Not a Number)或 None。
1.1 检测缺失值
import pandas as pd
import numpy as np
# 创建示例数据
data = {
'name': ['Alice', 'Bob', None, 'Diana', 'Eve'],
'age': [25, np.nan, 30, 28, np.nan],
'salary': [8000, 12000, np.nan, 9500, 11000],
'department': ['技术', '产品', '技术', None, '运营']
}
df = pd.DataFrame(data)
# 检查缺失值数量
print("=== 各列缺失值数量 ===")
print(df.isnull().sum())
# 缺失值比例
print("\n=== 各列缺失值比例 ===")
print(df.isnull().mean().round(3))
# 可视化缺失情况
print("\n=== 缺失值矩阵 ===")
print(df.isnull())
输出结果:
=== 各列缺失值数量 ===
name 1
age 2
salary 1
department 1
dtype: int64
=== 各列缺失值比例 ===
name 0.2
age 0.4
salary 0.2
department 0.2
dtype: float64
1.2 填充缺失值
# 方法1:用固定值填充
df['department'].fillna('未知部门', inplace=True)
# 方法2:用均值/中位数填充数值列
df['age'].fillna(df['age'].mean(), inplace=True)
df['salary'].fillna(df['salary'].median(), inplace=True)
# 方法3:前向填充(ffill)
df_time = pd.DataFrame({
'date': pd.date_range('2026-01-01', periods=5),
'value': [10, np.nan, np.nan, 40, 50]
})
df_time['value_filled'] = df_time['value'].fillna(method='ffill')
print(df_time)
# 方法4:插值填充(适合时序数据)
df_time['value_interp'] = df_time['value'].interpolate(method='linear')
print(df_time)
1.3 删除含缺失值的行/列
# 删除有任意缺失值的行
df_clean = df.dropna()
# 删除所有值都是NaN的行
df_clean2 = df.dropna(how='all')
# 只在关键列有缺失时才删除
df_clean3 = df.dropna(subset=['name', 'age'])
# 缺失值超过阈值的列直接删除
df_clean4 = df.dropna(axis=1, thresh=int(len(df) * 0.7)) # 保留至少70%非空的列
场景2:处理重复数据
import pandas as pd
data = {
'user_id': [1, 2, 3, 2, 4, 1],
'name': ['Alice', 'Bob', 'Charlie', 'Bob', 'Diana', 'Alice'],
'action': ['buy', 'view', 'buy', 'view', 'buy', 'buy'],
'amount': [100, 0, 200, 0, 150, 100]
}
df = pd.DataFrame(data)
# 检测重复行
print(f"重复行数量: {df.duplicated().sum()}")
# 按特定列检测重复
print(f"user_id重复数量: {df.duplicated(subset=['user_id']).sum()}")
# 删除完全重复行
df_dedup = df.drop_duplicates()
print(f"去重后行数: {len(df_dedup)}")
# 保留某列的第一次/最后一次出现
df_first = df.drop_duplicates(subset=['user_id'], keep='first')
df_last = df.drop_duplicates(subset=['user_id'], keep='last')
print("保留首次出现:")
print(df_first)
场景3:数据类型转换
数据类型错误是导致分析报错最常见的原因之一。
import pandas as pd
data = {
'date': ['2026-01-01', '2026-01-02', '2026-01-03'],
'sales': ['1000', '2000', '1500'], # 字符串格式的数字
'flag': ['True', 'False', 'True'], # 字符串格式的布尔值
'category': ['A', 'B', 'A'],
'price': ['¥100.5', '¥200.0', '¥150.3'] # 带特殊字符的数字
}
df = pd.DataFrame(data)
print("原始数据类型:")
print(df.dtypes)
# 转换日期
df['date'] = pd.to_datetime(df['date'])
# 转换数值
df['sales'] = pd.to_numeric(df['sales'])
# 转换布尔值
df['flag'] = df['flag'].map({'True': True, 'False': False})
# 处理带特殊字符的数值
df['price'] = df['price'].str.replace('¥', '').astype(float)
# 转换为分类类型(节省内存)
df['category'] = df['category'].astype('category')
print("\n转换后数据类型:")
print(df.dtypes)
print("\n转换后数据:")
print(df)
场景4:字符串清洗
import pandas as pd
data = {
'name': [' Alice ', 'BOB', 'charlie', ' Diana_Smith '],
'phone': ['138-1234-5678', '139 8765 4321', '(010)12345678', '400-800-1234'],
'email': ['ALICE@EXAMPLE.COM', 'bob@test.com', 'Charlie@Gmail.COM', 'invalid_email']
}
df = pd.DataFrame(data)
# 去除首尾空格并统一大小写
df['name'] = df['name'].str.strip().str.title()
# 标准化手机号(只保留数字)
df['phone_clean'] = df['phone'].str.replace(r'[\s\-\(\)]', '', regex=True)
# 邮箱转小写
df['email'] = df['email'].str.lower()
# 验证邮箱格式
import re
def is_valid_email(email):
pattern = r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, str(email)))
df['email_valid'] = df['email'].apply(is_valid_email)
print(df[['name', 'phone_clean', 'email', 'email_valid']])
场景5:处理异常值(Outliers)
import pandas as pd
import numpy as np
np.random.seed(42)
data = {
'salary': np.concatenate([
np.random.normal(15000, 3000, 95), # 正常薪资
[100000, 200000, -5000, 0, 999999] # 异常值
])
}
df = pd.DataFrame(data)
# 方法1:基于IQR(四分位距)检测异常值
Q1 = df['salary'].quantile(0.25)
Q3 = df['salary'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_iqr = df[(df['salary'] < lower_bound) | (df['salary'] > upper_bound)]
print(f"IQR方法检测到 {len(outliers_iqr)} 个异常值")
print(f"异常值范围: < {lower_bound:.0f} 或 > {upper_bound:.0f}")
# 方法2:基于Z-Score检测(标准差3倍)
from scipy import stats
z_scores = np.abs(stats.zscore(df['salary']))
outliers_zscore = df[z_scores > 3]
print(f"\nZ-Score方法检测到 {len(outliers_zscore)} 个异常值")
# 处理异常值
# 策略1:删除异常值
df_clean = df[(df['salary'] >= lower_bound) & (df['salary'] <= upper_bound)]
# 策略2:截断(Winsorize)
df['salary_winsor'] = df['salary'].clip(lower=lower_bound, upper=upper_bound)
# 策略3:用中位数替换
median_salary = df['salary'].median()
df['salary_fixed'] = df['salary'].where(
(df['salary'] >= lower_bound) & (df['salary'] <= upper_bound),
median_salary
)
print(f"\n原始数据量: {len(df)}")
print(f"删除异常值后: {len(df_clean)}")
print(f"截断后均值: {df['salary_winsor'].mean():.0f}")
场景6:数据标准化与归一化
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, StandardScaler
data = {
'age': [22, 35, 28, 45, 30, 55],
'salary': [8000, 25000, 12000, 35000, 15000, 50000],
'experience': [1, 8, 3, 15, 5, 25]
}
df = pd.DataFrame(data)
# 方法1:Min-Max归一化(值缩放到[0,1])
scaler_minmax = MinMaxScaler()
df_normalized = pd.DataFrame(
scaler_minmax.fit_transform(df),
columns=[f'{col}_norm' for col in df.columns]
)
# 方法2:Z-Score标准化(均值0,标准差1)
scaler_std = StandardScaler()
df_standardized = pd.DataFrame(
scaler_std.fit_transform(df),
columns=[f'{col}_std' for col in df.columns]
)
# 手动实现(理解原理)
df['salary_manual_norm'] = (df['salary'] - df['salary'].min()) / (df['salary'].max() - df['salary'].min())
df['salary_manual_std'] = (df['salary'] - df['salary'].mean()) / df['salary'].std()
result = pd.concat([df, df_normalized, df_standardized], axis=1)
print(result[['salary', 'salary_norm', 'salary_std', 'salary_manual_norm', 'salary_manual_std']].round(3))
场景7:数据格式统一(日期、时间)
import pandas as pd
# 各种日期格式统一处理
date_data = {
'date_str': [
'2026-01-15',
'15/01/2026',
'20260115',
'Jan 15, 2026',
'2026年1月15日'
]
}
df = pd.DataFrame(date_data)
# 统一转换为datetime(自动推断格式)
df['date_parsed'] = pd.to_datetime(df['date_str'], infer_datetime_format=True, errors='coerce')
# 中文日期需要预处理
df['date_str2'] = df['date_str'].str.replace('年', '-').str.replace('月', '-').str.replace('日', '')
df['date_parsed2'] = pd.to_datetime(df['date_str2'], errors='coerce')
# 从datetime提取各部分
df['year'] = df['date_parsed'].dt.year
df['month'] = df['date_parsed'].dt.month
df['day'] = df['date_parsed'].dt.day
df['weekday'] = df['date_parsed'].dt.day_name()
df['quarter'] = df['date_parsed'].dt.quarter
print(df[['date_str', 'date_parsed', 'year', 'month', 'day', 'weekday', 'quarter']])
场景8:处理不一致的分类数据
import pandas as pd
data = {
'gender': ['男', 'male', 'M', '女', 'Female', 'f', '男性', 'MALE', '未知'],
'city': ['北京', '上海', 'beijing', 'Shanghai', '北京市', '上海市', 'BJ', 'SH', None]
}
df = pd.DataFrame(data)
# 性别标准化
gender_mapping = {
'男': 'M', 'male': 'M', 'm': 'M', '男性': 'M', 'MALE': 'M',
'女': 'F', 'Female': 'F', 'female': 'F', 'f': 'F', '女性': 'F',
'未知': 'Unknown'
}
df['gender_clean'] = df['gender'].str.strip().map(
lambda x: gender_mapping.get(x, gender_mapping.get(x.capitalize() if isinstance(x, str) else x, 'Unknown'))
)
# 城市标准化
city_mapping = {
'北京': '北京', '北京市': '北京', 'beijing': '北京', 'BJ': '北京',
'上海': '上海', '上海市': '上海', 'Shanghai': '上海', 'SH': '上海'
}
df['city_clean'] = df['city'].map(city_mapping).fillna('其他')
print(df[['gender', 'gender_clean', 'city', 'city_clean']])
场景9:数据一致性校验
import pandas as pd
import numpy as np
data = {
'order_id': ['O001', 'O002', 'O003', 'O004', 'O005'],
'quantity': [5, -2, 0, 3, 10], # -2 异常
'unit_price': [100, 200, 150, 0, 300], # 0 可能异常
'total_amount': [500, -400, 0, 300, 2500], # 应等于 quantity * unit_price
'discount': [0.1, 0.2, 0.05, 0.3, 1.5] # 1.5 异常,折扣率超过1
}
df = pd.DataFrame(data)
# 规则1:数量不能为负
invalid_quantity = df[df['quantity'] <= 0]
print(f"数量异常记录: {len(invalid_quantity)} 条")
print(invalid_quantity[['order_id', 'quantity']])
# 规则2:折扣率必须在[0,1]范围内
invalid_discount = df[(df['discount'] < 0) | (df['discount'] > 1)]
print(f"\n折扣率异常记录: {len(invalid_discount)} 条")
# 规则3:总金额 = 数量 × 单价
df['calculated_amount'] = df['quantity'] * df['unit_price']
df['amount_diff'] = abs(df['total_amount'] - df['calculated_amount'])
invalid_amount = df[df['amount_diff'] > 0.01] # 允许浮点误差
print(f"\n金额校验失败记录: {len(invalid_amount)} 条")
print(invalid_amount[['order_id', 'quantity', 'unit_price', 'total_amount', 'calculated_amount']])
# 生成数据质量报告
quality_report = {
'总记录数': len(df),
'数量异常': len(invalid_quantity),
'折扣率异常': len(invalid_discount),
'金额不一致': len(invalid_amount),
'数据质量分': f"{(1 - (len(invalid_quantity) + len(invalid_discount) + len(invalid_amount)) / len(df) / 3) * 100:.1f}%"
}
print("\n=== 数据质量报告 ===")
for k, v in quality_report.items():
print(f" {k}: {v}")
场景10:构建可复用的数据清洗Pipeline
将以上操作封装成可复用的清洗流水线,适合生产环境使用:
import pandas as pd
import numpy as np
from typing import List, Dict, Optional
class DataCleaner:
"""可复用的数据清洗Pipeline"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.report = []
def remove_duplicates(self, subset: Optional[List[str]] = None, keep: str = 'first'):
"""去除重复行"""
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
after = len(self.df)
self.report.append(f"去重: 删除 {before - after} 行重复记录")
return self
def fill_missing(self, strategy: Dict):
"""
填充缺失值
strategy: {'列名': '策略'}
策略: 'mean', 'median', 'mode', '固定值'
"""
for col, method in strategy.items():
if col not in self.df.columns:
continue
missing_count = self.df[col].isnull().sum()
if missing_count == 0:
continue
if method == 'mean':
self.df[col].fillna(self.df[col].mean(), inplace=True)
elif method == 'median':
self.df[col].fillna(self.df[col].median(), inplace=True)
elif method == 'mode':
self.df[col].fillna(self.df[col].mode()[0], inplace=True)
else:
self.df[col].fillna(method, inplace=True)
self.report.append(f"缺失值填充: {col} 列填充 {missing_count} 个缺失值(策略: {method})")
return self
def remove_outliers(self, columns: List[str], method: str = 'iqr', threshold: float = 1.5):
"""去除异常值"""
for col in columns:
if col not in self.df.columns:
continue
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
mask = (self.df[col] >= lower) & (self.df[col] <= upper)
removed = (~mask).sum()
self.df = self.df[mask | self.df[col].isnull()]
self.report.append(f"异常值处理: {col} 列删除 {removed} 个异常值")
return self
def standardize_strings(self, columns: List[str], case: str = 'lower'):
"""标准化字符串列"""
for col in columns:
if col not in self.df.columns:
continue
self.df[col] = self.df[col].astype(str).str.strip()
if case == 'lower':
self.df[col] = self.df[col].str.lower()
elif case == 'upper':
self.df[col] = self.df[col].str.upper()
elif case == 'title':
self.df[col] = self.df[col].str.title()
self.report.append(f"字符串标准化: {col} 列({case})")
return self
def convert_types(self, type_map: Dict):
"""转换数据类型"""
for col, dtype in type_map.items():
if col not in self.df.columns:
continue
if dtype == 'datetime':
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
elif dtype == 'numeric':
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
elif dtype == 'category':
self.df[col] = self.df[col].astype('category')
else:
self.df[col] = self.df[col].astype(dtype, errors='ignore')
self.report.append(f"类型转换: {col} 列 → {dtype}")
return self
def get_result(self):
"""返回清洗后的DataFrame和报告"""
print("=== 数据清洗报告 ===")
for item in self.report:
print(f" ✅ {item}")
print(f" 📊 最终数据量: {len(self.df)} 行 × {len(self.df.columns)} 列")
return self.df
# 使用示例
np.random.seed(42)
raw_data = {
'user_id': range(100),
'name': ['User_' + str(i) for i in range(100)],
'age': np.concatenate([np.random.randint(18, 65, 95), [200, -5, 0, np.nan, np.nan]]),
'salary': np.concatenate([np.random.randint(5000, 50000, 95), [1000000, -1000, np.nan, np.nan, np.nan]]),
'join_date': pd.date_range('2020-01-01', periods=100).astype(str),
'department': np.random.choice(['技术', '产品', '运营', '市场', None], 100)
}
raw_df = pd.DataFrame(raw_data)
# 一行搞定数据清洗
clean_df = (DataCleaner(raw_df)
.remove_duplicates(subset=['user_id'])
.fill_missing({'age': 'mean', 'salary': 'median', 'department': '未知'})
.remove_outliers(columns=['age', 'salary'], method='iqr')
.convert_types({'join_date': 'datetime', 'department': 'category'})
.get_result()
)
输出结果:
=== 数据清洗报告 ===
✅ 缺失值填充: age 列填充 2 个缺失值(策略: mean)
✅ 缺失值填充: salary 列填充 3 个缺失值(策略: median)
✅ 缺失值填充: department 列填充 X 个缺失值(策略: 未知)
✅ 异常值处理: age 列删除 3 个异常值
✅ 异常值处理: salary 列删除 2 个异常值
✅ 类型转换: join_date 列 → datetime
✅ 类型转换: department 列 → category
📊 最终数据量: 95 行 × 6 列
总结:数据清洗核心思路
做了这么多年数据分析,船长总结了数据清洗的核心思路:
① 先诊断,再处理:不要盲目清洗,先用 df.info()、df.describe()、df.isnull().sum() 全面了解数据状态。
② 理解业务含义:缺失值不一定要填充,可能"缺失"本身就是一种信息。异常值不一定是错误,可能是真实的极端案例。
③ 保留原始数据:清洗时永远操作副本 df.copy(),不要直接修改原始数据。
④ 记录清洗过程:养成写数据清洗日志的习惯,方便后续排查和复现。
⑤ 自动化流程:用 Pipeline 封装重复操作,节省时间,减少人为错误。
掌握这10个场景,日常数据清洗工作至少覆盖90%的情况。如果你在工作中遇到了其他复杂场景,欢迎评论区留言,我来补充。
---
相关推荐:
- SQL窗口函数实战:5个高频场景完整代码
- Python数据分析全流程:从读取到可视化
- Pandas性能优化:10个让代码快10倍的技巧
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)