获取双异步返回值时,如何保证主线程不阻塞?
一、前情提要
在上一篇文章中,使用双异步后,如何保证数据一致性?,通过Future获取异步返回值,轮询判断Future状态,如果执行完毕或已取消,则通过get()获取返回值,get()是阻塞的方法,因此会阻塞当前线程,如果通过new Runnable()执行get()方法,那么还是需要返回AsyncResult,然后再通过主线程去get()获取异步线程返回结果。
写法很繁琐,还会阻塞主线程。
下面是FutureTask异步执行流程图:
二、JDK8的CompletableFuture
1、ForkJoinPool
Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。
CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。
ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。
Future的异步执行是通过ThreadPoolExecutor实现的。
2、从ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的区别
- ForkJoinPool中的每个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不同,细分出各种线程池;
- ForkJoinPool在使用过程中,会创建大量的子任务,会进行大量的gc,但是ThreadPoolExecutor不需要,因为ThreadPoolExecutor是任务分配平均的;
- ThreadPoolExecutor中每个异步线程之间是相互独立的,当执行速度快的线程执行完毕后,它就会一直处于空闲的状态,等待其它线程执行完毕;
- ForkJoinPool中每个异步线程之间并不是绝对独立的,在ForkJoinPool线程池中会维护一个队列来存放需要执行的任务,当线程自身任务执行完毕后,它会从其它线程中获取未执行的任务并帮助它执行,直至所有线程执行完毕。
因此,在多线程任务分配不均时,ForkJoinPool的执行效率更高。但是,如果任务分配均匀,ThreadPoolExecutor的执行效率更高,因为ForkJoinPool会创建大量子任务,并对其进行大量的GC,比较耗时。
三、通过CompletableFuture优化 “通过Future获取异步返回值”
1、通过Future获取异步返回值关键代码
(1)将异步方法的返回值改为Future,将返回值放到new AsyncResult();中
@Async("async-executor")
public void readXls(String filePath, String filename) {
try {
// 此代码为简化关键性代码
List futureList = new ArrayList();
for (int time = 0; time < times; time++) {
Future sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
futureList.add(sumFuture);
}
}catch (Exception e){
logger.error("readXlsCacheAsync---插入数据异常:",e);
}
}
@Async("async-executor")
public Future readXlsCacheAsync() {
try {
// 此代码为简化关键性代码
return new AsyncResult(sum);
}catch (Exception e){
return new AsyncResult(0);
}
}
(2)通过Future.get()获取返回值
public static boolean getFutureResult(List futureList, int excelRow) {
int[] futureSumArr = new int[futureList.size()];
for (int i = 0;i {// 回调方法
return thenApplyTest2(result);// supplyAsync返回值 * 1
}).thenApply((result) -> {
return thenApplyTest5(result);// thenApply返回值 * 1
}).exceptionally((e) -> { // 如果执行异常:
logger.error("CompletableFuture.supplyAsync----异常:", e);
return null;
});
completableFutureList.add(completableFuture);
}
}
@Async("async-executor")
public int readXlsCacheAsync() {
try {
// 此代码为简化关键性代码
return sum;
}catch (Exception e){
return -1;
}
}
(2)通过completableFuture.get()获取返回值
public static boolean getCompletableFutureResult(List list, int excelRow){
logger.info("通过completableFuture.get()获取每个异步线程的插入结果----开始");
int sum = 0;
for (int i = 0; i < list.size(); i++) {
Integer result = list.get(i).get();
sum += result;
}
boolean insertFlag = excelRow == sum;
logger.info("全部执行完毕,excelRow={},入库={}, 数据是否一致={}",excelRow,sum,insertFlag);
return insertFlag;
}
3、效率对比
(1)测试环境
- 12个逻辑处理器的电脑;
- Excel中包含10万条数据;
- Future的自定义线程池,核心线程数为24;
- ForkJoinPool的核心线程数为24;
(2)统计四种情况下10万数据入库时间
- 不获取异步返回值
- 通过Future获取异步返回值
- 通过CompletableFuture获取异步返回值,默认ForkJoinPool线程池的核心线程数为本机逻辑处理器数量,测试电脑为12;
- 通过CompletableFuture获取异步返回值,修改ForkJoinPool线程池的核心线程数为24。
备注:因为CompletableFuture不阻塞主线程,主线程执行时间只有2秒,表格中统计的是异步线程全部执行完成的时间。
(3)设置核心线程数
将核心线程数CorePoolSize设置成CPU的处理器数量,是不是效率最高的?
// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 测试电脑是24
因为在接口被调用后,开启异步线程,执行入库任务,因为测试机最多同时开启24线程处理任务,故将10万条数据拆分成等量的24份,也就是10万/24 = 4166,那么我设置成4200,是不是效率最佳呢?
测试的过程中发现,好像真的是这样的。
自定义ForkJoinPool线程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;
@Override
public void readXls(String filePath, String filename) {
List completableFutureList = new ArrayList();
for (int time = 0; time {
try {
int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
} catch (Exception ex) {
return;
}
});
}
自定义线程池
/**
* 自定义异步线程池
*/
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置线程名称
executor.setThreadNamePrefix("asyncTask-Executor");
//设置最大线程数
executor.setMaxPoolSize(200);
//设置核心线程数
executor.setCorePoolSize(24);
//设置线程空闲时间,默认60
executor.setKeepAliveSeconds(200);
//设置队列容量
executor.setQueueCapacity(50);
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
(4)统计分析
效率对比:
③通过CompletableFuture获取异步返回值(12线程) readExcelDbJdk8Service.readXlsCacheAsyncMybatis();