1. 问题根源:共享可变状态

并行流会将数据拆分到多个线程处理,如果多个线程同时修改同一个变量或集合,且没有同步控制,结果就会不一致。

示例代码(错误写法):

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> result = new ArrayList<>();

list.parallelStream()
    .forEach(result::add); // 多个线程并发修改 ArrayList,导致数据丢失或异常

问题原因:

  • ArrayList 是非线程安全集合,多个线程同时调用 add 方法会导致内部数据覆盖或数组越界。


2. 解决方案

方案 1:使用线程安全集合

使用 java.util.concurrent 包下的线程安全容器(如 CopyOnWriteArrayList):

List<Integer> result = new CopyOnWriteArrayList<>();
list.parallelStream()
    .forEach(result::add);
方案 2:使用正确的归约操作(推荐)

避免直接修改外部变量,而是通过 collect 或 reduce 方法进行归约,确保线程安全。

使用 Collectors.toList()

List<Integer> result = list.parallelStream()
    .collect(Collectors.toList());

自定义归约操作:

// 求和示例
int sum = list.parallelStream()
    .reduce(0, Integer::sum); // 初始值 + 累加方式

复杂归约(使用 Collectors):

Map<Integer, Long> countMap = list.parallelStream()
    .collect(Collectors.groupingByConcurrent(
        Function.identity(), // 键
        Collectors.counting() // 值
    ));

3. 避免副作用(Side-Effects)

确保在流操作中 不依赖外部可变状态,所有操作应独立处理数据。

错误示例:

int[] sum = {0}; // 外部变量
list.parallelStream()
    .forEach(x -> sum[0] += x); // 竞态条件:多个线程同时修改 sum

正确写法:

int sum = list.parallelStream()
    .mapToInt(Integer::intValue)
    .sum();

4. 检查数据源是否线程安全

如果数据源(如 ArrayListHashSet)在并行流中被修改,也可能导致问题:

List<Integer> dynamicList = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
dynamicList.parallelStream()
    .filter(x -> x % 2 == 0)
    .forEach(dynamicList::remove); // 并发修改数据源,抛出 ConcurrentModificationException

解决方案:

  • 使用并发集合(如 ConcurrentHashMap)或在处理前创建数据副本。


5. 确保合并函数(Combiner)正确

在自定义 reduce 或 collect 操作时,需确保合并函数能正确处理并发结果。

错误示例:

List<Integer> result = list.parallelStream()
    .reduce(
        new ArrayList<>(), // 错误:所有线程共享同一个 ArrayList
        (acc, x) -> { acc.add(x); return acc; },
        (list1, list2) -> { list1.addAll(list2); return list1; }
    );

正确写法(每次生成新列表):

List<Integer> result = list.parallelStream()
    .collect(
        ArrayList::new, // Supplier:每个线程创建新列表
        ArrayList::add, // Accumulator:安全累加
        ArrayList::addAll // Combiner:合并结果
    );

总结

  • 避免在 parallelStream 中修改共享可变状态。

  • 优先使用无副作用操作和内置归约方法(如 collectreduce)。

  • 选择线程安全的集合或归约容器。

  • 如果数据量小或操作简单,可能不需要并行流(并行拆分/合并的开销可能抵消性能优势)。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐