Spring Boot 中实现异步批量调用并等待所有结果

在 Spring Boot 中实现异步循环调用接口并等待所有结果返回后继续执行后续代码,可以通过以下步骤实现:

1. 启用异步支持

在主应用类上添加 @EnableAsync 注解:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class YourApplication {
    public static void main(String[] args) {
        SpringApplication.run(YourApplication.class, args);
    }
}

2. 创建异步服务

创建一个服务类,使用 @Async 注解标记异步方法:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.CompletableFuture;

@Service
public class AsyncService {

    // 返回 CompletableFuture 示例
    @Async("asyncExecutor")  // 指定线程池(可选)
    public CompletableFuture<String> callApi(String param) {
        // 模拟接口调用
        try {
            Thread.sleep(1000);  // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return CompletableFuture.completedFuture("结果: " + param);
    }
}

3. 配置线程池(可选但推荐)

在配置类中定义异步线程池:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "asyncExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);      // 核心线程数
        executor.setMaxPoolSize(20);      // 最大线程数
        executor.setQueueCapacity(100);   // 队列容量
        executor.setThreadNamePrefix("async-");
        executor.initialize();
        return executor;
    }
}

4. 编写异步调用逻辑

在服务或控制器中调用异步方法并等待所有结果:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Service
public class MyService {

    @Autowired
    private AsyncService asyncService;

    public void processAsyncCalls() throws ExecutionException, InterruptedException {
        List<String> params = List.of("param1", "param2", "param3");
        List<CompletableFuture<String>> futures = new ArrayList<>();

        // 异步调用接口
        for (String param : params) {
            CompletableFuture<String> future = asyncService.callApi(param);
            futures.add(future);
        }

        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
        );

        // 获取所有结果
        CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
            return futures.stream()
                    .map(future -> {
                        try {
                            return future.get();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .toList();
        });

        // 阻塞直到所有结果返回
        List<String> results = allResults.get();

        // 继续执行后续代码
        System.out.println("所有异步任务已完成,结果: " + results);
        // 处理结果...
    }
}

5. 异常处理

在实际应用中,建议添加异常处理:

// 在调用时添加异常处理
CompletableFuture<String> future = asyncService.callApi(param)
        .exceptionally(ex -> {
            System.err.println("接口调用失败: " + ex.getMessage());
            return "默认结果";  // 或返回 null
        });

6. 使用场景示例

@RestController
@RequestMapping("/api")
public class MyController {

    @Autowired
    private MyService myService;

    @GetMapping("/batch")
    public String batchProcess() throws ExecutionException, InterruptedException {
        myService.processAsyncCalls();
        return "批量处理完成";
    }
}

关键点说明

  1. 异步方法返回类型:使用 CompletableFuture<T> 作为返回类型,便于组合多个异步任务。
  2. 线程池配置:通过自定义线程池控制并发数量,避免资源耗尽。
  3. 等待所有结果:使用 CompletableFuture.allOf() 等待所有任务完成。
  4. 结果获取:使用 thenApply() 收集所有任务的结果。

其他实现方式

1. 使用 @Async 和 Future
@Async
public Future<String> callApi(String param) {
    // 实现逻辑
    return new AsyncResult<>("结果");
}

// 调用代码
List<Future<String>> futures = new ArrayList<>();
for (String param : params) {
    futures.add(asyncService.callApi(param));
}

// 等待所有结果
for (Future<String> future : futures) {
    String result = future.get();  // 阻塞直到结果返回
}
2. 使用 Java 8 的 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<String>> futures = new ArrayList<>();

for (String param : params) {
    Future<String> future = executor.submit(() -> {
        return callApi(param);
    });
    futures.add(future);
}

// 关闭线程池
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

注意事项

  1. 异步方法必须在外部调用@Async 方法不能在同一个类中被调用,否则不会生效。
  2. 异常处理:建议为每个 CompletableFuture 添加 exceptionally() 处理异常。
  3. 超时控制:可使用 CompletableFuture.orTimeout() 设置最大等待时间。

通过以上方法,您可以在 Spring Boot 中实现高效的异步批量调用,并在所有结果返回后继续执行后续逻辑。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐