Python 并行计算进阶:ProcessPoolExecutor 处理 CPU 密集型任务

在上一篇文章里,我们介绍了如何优雅封装 ThreadPoolExecutor 来处理 I/O 密集型任务(比如文件读取、网络请求、日志解析)。

但是,当任务本身计算量很大(例如大规模数值运算、复杂模拟)时,线程池往往帮不上忙,甚至可能变慢。原因在于:Python 的 GIL(全局解释器锁)限制了同一进程内的多线程并行计算

这时,就需要请出 ProcessPoolExecutor —— 一个真正能发挥多核 CPU 性能的利器。


1. 什么是 CPU 密集型任务?

在计算机科学里,常将任务分为两类:

  • I/O 密集型 (I/O-bound)
    等待外部资源的时间远多于计算时间。
    典型场景:爬虫、数据库操作、文件读写。

  • CPU 密集型 (CPU-bound)
    主要消耗 CPU 的算力,计算时间远大于 I/O 时间。
    典型场景:

    • 大规模矩阵运算(线性代数、深度学习前向传播)
    • 图像处理(滤波、卷积)
    • CFD 数值模拟、风电场流体计算
    • 加密解密、压缩解压

对于 CPU 密集型任务,线程池的多线程无法真正并行,因为受 GIL 限制。同一时间只会有一个线程运行 Python 字节码。

解决办法:使用 多进程 —— 每个进程都有自己的 Python 解释器和 GIL,可以真正利用多核 CPU 并行执行。


2. ProcessPoolExecutor 的基本用法

from concurrent.futures import ProcessPoolExecutor
import math

def cpu_task(n):
    """模拟 CPU 密集型任务:计算大量平方根"""
    return sum(math.sqrt(i) for i in range(n))

if __name__ == "__main__":
    numbers = [10_00000, 20_00000, 30_00000, 40_00000]

    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_task, numbers))

    print(results)

运行效果:

  • 串行执行:可能需要 10+ 秒。
  • 多进程执行:4 个核同时计算,总耗时大约缩短到 3 秒左右。

3. 封装成通用函数

与之前的 run_in_threads 类似,我们可以写一个 run_in_processes

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Iterable, Any

def run_in_processes(
    func: Callable[..., Any],
    tasks: Iterable[tuple],
    max_workers: int = 4,
    show_log: bool = True
) -> list:
    """
    使用多进程池并行执行 CPU 密集型任务。
    """
    results = [None] * len(tasks)
    if show_log:
        print(f"⚡ 开始多进程计算(进程数 = {max_workers},任务数 = {len(tasks)})...")

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx = {
            executor.submit(func, *args): idx
            for idx, args in enumerate(tasks)
        }
        for future in as_completed(future_to_idx):
            idx = future_to_idx[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                results[idx] = f"❌ 任务 {idx} 出错 → {e}"

    return results

4. 使用示例

示例一:大规模运算

import math

def heavy_calc(n):
    return sum(math.sqrt(i) for i in range(n))

tasks = [(50_00000,), (60_00000,), (70_00000,), (80_00000,)]
results = run_in_processes(heavy_calc, tasks, max_workers=4)
print(results)

示例二:图像处理

from PIL import Image, ImageFilter

def blur_image(path):
    img = Image.open(path)
    img = img.filter(ImageFilter.GaussianBlur(5))
    return f"{path} 完成"

tasks = [(f"img_{i}.jpg",) for i in range(10)]
results = run_in_processes(blur_image, tasks, max_workers=4)
print(results)

5. ThreadPool vs ProcessPool 对比

特性 ThreadPoolExecutor ProcessPoolExecutor
适用场景 I/O 密集型(网络请求、磁盘读写) CPU 密集型(数值计算、图像处理)
受 GIL 限制 ✅ 是 ❌ 否
启动开销 大(进程启动慢,内存消耗多)
共享内存 方便(同一进程内) 不方便(进程间需要序列化传输)
速度优势 等待多的任务(I/O-bound) 计算多的任务(CPU-bound)

👉 简单总结:

  • I/O-bound → ThreadPoolExecutor
  • CPU-bound → ProcessPoolExecutor

6. 注意事项

  1. 必须放在 if __name__ == "__main__":
    否则在 Windows/Mac 上会出现无限递归创建进程的问题。
  2. 数据传输开销大
    多进程间要通过序列化传输数据,避免传递超大对象(如几 GB 的数组)。
    建议用 共享内存(multiprocessing.Array / numpy.memmap) 解决。
  3. 进程数不宜过多
    一般等于 CPU 核心数或 核心数 - 1

7. 总结

  • CPU 密集型任务:计算量大,主要消耗 CPU 时间。例子:矩阵运算、CFD 模拟、图像处理。
  • 适合工具ProcessPoolExecutor,真正利用多核 CPU 并行,性能大幅提升。
  • 与线程池区别:线程适合等待多、计算少的场景;进程适合计算多、等待少的场景。

📌 推荐实践:

  • 任务多但轻 → 用 线程池
  • 任务重且算力需求大 → 用 进程池
  • 混合任务 → 可以考虑 线程池 + 进程池混合,甚至引入 asyncio

下一篇 “线程池 + 进程池混合应用实战”(例如风电 CFD 模拟里:多进程算流场 + 多线程读写文件)

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐