Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
·
目录
Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务
在上一篇文章里,我们介绍了如何优雅封装 ThreadPoolExecutor 来处理 I/O 密集型任务(比如文件读取、网络请求、日志解析)。
但是,当任务本身计算量很大(例如大规模数值运算、复杂模拟)时,线程池往往帮不上忙,甚至可能变慢。原因在于:Python 的 GIL(全局解释器锁)限制了同一进程内的多线程并行计算。
这时,就需要请出 ProcessPoolExecutor —— 一个真正能发挥多核 CPU 性能的利器。
1. 什么是 CPU 密集型任务?
在计算机科学里,常将任务分为两类:
-
I/O 密集型 (I/O-bound)
等待外部资源的时间远多于计算时间。
典型场景:爬虫、数据库操作、文件读写。 -
CPU 密集型 (CPU-bound)
主要消耗 CPU 的算力,计算时间远大于 I/O 时间。
典型场景:- 大规模矩阵运算(线性代数、深度学习前向传播)
- 图像处理(滤波、卷积)
- CFD 数值模拟、风电场流体计算
- 加密解密、压缩解压
对于 CPU 密集型任务,线程池的多线程无法真正并行,因为受 GIL 限制。同一时间只会有一个线程运行 Python 字节码。
解决办法:使用 多进程 —— 每个进程都有自己的 Python 解释器和 GIL,可以真正利用多核 CPU 并行执行。
2. ProcessPoolExecutor 的基本用法
from concurrent.futures import ProcessPoolExecutor
import math
def cpu_task(n):
"""模拟 CPU 密集型任务:计算大量平方根"""
return sum(math.sqrt(i) for i in range(n))
if __name__ == "__main__":
numbers = [10_00000, 20_00000, 30_00000, 40_00000]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, numbers))
print(results)
运行效果:
- 串行执行:可能需要 10+ 秒。
- 多进程执行:4 个核同时计算,总耗时大约缩短到 3 秒左右。
3. 封装成通用函数
与之前的 run_in_threads 类似,我们可以写一个 run_in_processes:
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Iterable, Any
def run_in_processes(
func: Callable[..., Any],
tasks: Iterable[tuple],
max_workers: int = 4,
show_log: bool = True
) -> list:
"""
使用多进程池并行执行 CPU 密集型任务。
"""
results = [None] * len(tasks)
if show_log:
print(f"⚡ 开始多进程计算(进程数 = {max_workers},任务数 = {len(tasks)})...")
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_idx = {
executor.submit(func, *args): idx
for idx, args in enumerate(tasks)
}
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = f"❌ 任务 {idx} 出错 → {e}"
return results
4. 使用示例
示例一:大规模运算
import math
def heavy_calc(n):
return sum(math.sqrt(i) for i in range(n))
tasks = [(50_00000,), (60_00000,), (70_00000,), (80_00000,)]
results = run_in_processes(heavy_calc, tasks, max_workers=4)
print(results)
示例二:图像处理
from PIL import Image, ImageFilter
def blur_image(path):
img = Image.open(path)
img = img.filter(ImageFilter.GaussianBlur(5))
return f"{path} 完成"
tasks = [(f"img_{i}.jpg",) for i in range(10)]
results = run_in_processes(blur_image, tasks, max_workers=4)
print(results)
5. ThreadPool vs ProcessPool 对比
| 特性 | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|
| 适用场景 | I/O 密集型(网络请求、磁盘读写) | CPU 密集型(数值计算、图像处理) |
| 受 GIL 限制 | ✅ 是 | ❌ 否 |
| 启动开销 | 小 | 大(进程启动慢,内存消耗多) |
| 共享内存 | 方便(同一进程内) | 不方便(进程间需要序列化传输) |
| 速度优势 | 等待多的任务(I/O-bound) | 计算多的任务(CPU-bound) |
👉 简单总结:
- I/O-bound → ThreadPoolExecutor
- CPU-bound → ProcessPoolExecutor
6. 注意事项
- 必须放在
if __name__ == "__main__":下
否则在 Windows/Mac 上会出现无限递归创建进程的问题。 - 数据传输开销大
多进程间要通过序列化传输数据,避免传递超大对象(如几 GB 的数组)。
建议用 共享内存(multiprocessing.Array / numpy.memmap) 解决。 - 进程数不宜过多
一般等于 CPU 核心数或核心数 - 1。
7. 总结
- CPU 密集型任务:计算量大,主要消耗 CPU 时间。例子:矩阵运算、CFD 模拟、图像处理。
- 适合工具:
ProcessPoolExecutor,真正利用多核 CPU 并行,性能大幅提升。 - 与线程池区别:线程适合等待多、计算少的场景;进程适合计算多、等待少的场景。
📌 推荐实践:
- 任务多但轻 → 用 线程池。
- 任务重且算力需求大 → 用 进程池。
- 混合任务 → 可以考虑 线程池 + 进程池混合,甚至引入
asyncio。
下一篇 “线程池 + 进程池混合应用实战”(例如风电 CFD 模拟里:多进程算流场 + 多线程读写文件)
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)