This post is also available in English and alternative languages.1. 普通Future
普通线程的创建方式有两种,①继承Thread类,②实现Runnable接口。
这两种方式创建的线程,在运行结束后,都无法直接获取结果。
Java 5.0 引入 Callable 和 Future,通过它们创建的线程,在运行结束后可以通过 get()方法 获取结果。
下面两个示例,通过 Callable 创建线程,提交到线程池中运行,然后获得 Future 对象,通过 Future .get() 获取运行结果。
1.1. get() 方法示例
get()方法是阻塞方法,一直到线程运行完毕(抛出异常)才能返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class FutureMainApp01 {
private static final Logger LOGGER = LoggerFactory.getLogger(FutureMainApp01.class);
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 16, 32, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) { FutureMainApp01 futureMainApp01 = new FutureMainApp01(); futureMainApp01.process(); }
private void process() { try { ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate(); AnalysisData analysisData = new AnalysisData(shardedJedisOperate, "1-key", "1-value"); Future<String> future = threadPoolExecutor.submit(analysisData); String result = future.get(300,TimeUnit.MILLISECONDS); LOGGER.info("result:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { threadPoolExecutor.shutdown(); } } }
class AnalysisData implements Callable<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisData.class);
private ShardedJedisOperate shardedJedisOperate; private String key; private String value;
public AnalysisData(ShardedJedisOperate shardedJedisOperate, String key, String value) { this.shardedJedisOperate = shardedJedisOperate; this.key = key; this.value = value; }
@Override public String call() { LOGGER.info("set redis info key:{},val:{}", key, value); return shardedJedisOperate.set(key, value); } }
|
1.2. get(timeout, timeUnit) 方法示例
该方法可以设定时间,在指定时间内没有获取到结果,则直接返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class FutureMainApp01 {
private static final Logger LOGGER = LoggerFactory.getLogger(FutureMainApp01.class);
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 16, 32, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) { FutureMainApp01 futureMainApp01 = new FutureMainApp01(); futureMainApp01.process(); }
private void process() { try { ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate(); AnalysisData analysisData = new AnalysisData(shardedJedisOperate, "1-key", "1-value"); Future<String> future = threadPoolExecutor.submit(analysisData); String result = future.get(300,TimeUnit.MILLISECONDS); LOGGER.info("result:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { threadPoolExecutor.shutdown(); } } }
class AnalysisData implements Callable<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisData.class);
private ShardedJedisOperate shardedJedisOperate; private String key; private String value;
public AnalysisData(ShardedJedisOperate shardedJedisOperate, String key, String value) { this.shardedJedisOperate = shardedJedisOperate; this.key = key; this.value = value; }
@Override public String call() { LOGGER.info("set redis info key:{},val:{}", key, value); return shardedJedisOperate.set(key, value); } }
|
Future 还有其他一些方法,比如:
isDone()
方法:检查线程是否运行完成。
cancel()
方法:停止线程的执行。
1.3. 局限性
Thread、Runnable 线程的使用,属于异步计算,并不能从线程中得到返回结果。
Future 线程 可以监视目标线程调用Call()方法,当调用 Future.get()方法以获取结果时,当前线程就开始阻塞,直至call()方法结束返回结果。
虽然 Future 提供了异步执行任务的能力,但获取结果还是很不方便,只有通过阻塞或轮询得到结果。
而且,如果要处理多个线程结果很不方便。
在实际业务中有一个对外接口,要补足商品信息,同时要调用四五个外部接口,之前使用的是普通Future,多个Future结果处理比较麻烦,后来使用 CompletableFuture 进行优化。
2. CompletableFuture
JDK8 中新增了一个类 CompletableFuture ,对 Future 进行很大的扩展,简化异步编程。
2.1. runAsync、supplyAsync
runAsync、supplyAsync 方法是 CompletableFuture 提供的创建异步操作的方法。
方法 | 入参 | 注意 |
---|
runAsync | CompletableFuturerunAsync(Runnable runnable)
CompletableFuturerunAsync(Runnable runnable,Executor executor) | 无返回结果 |
supplyAsync | <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) | 有返回结果 |
如果没有指定 线程池,将会使用 ForkJoinPool.commonPool()
作为线程池执行异步代码;如果指定线程池,则使用指定的线程池运行。
runAsync、supplyAsync 两个方法从使用层面上看,就好像是 Thread / Runnable、Callable / Future,一个没有结果一个有结果。
2.1.1. runAsync
CompletableFuture.runAsync
只是执行一个异步线程,它不会返回结果,但返回一个 CompletableFuture 对象。
如果异步操作不需要返回结果,可以使用runAsync。比如需要后台运行一些任务。(也可以使用它进行重新组装和调配?)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CFutureMainApp01 {
private static final Logger LOGGER = LoggerFactory.getLogger(CFutureMainApp01.class);
private ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate();
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 16, 32, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException { CFutureMainApp01 cFutureMainApp01 = new CFutureMainApp01(); cFutureMainApp01.testRunAsync(); }
private void testRunAsync() throws ExecutionException, InterruptedException { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(1000); String result = shardedJedisOperate.set("2-key", "2-value"); LOGGER.info("result:{}", result); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end...."); }, threadPoolExecutor); voidCompletableFuture.get(); threadPoolExecutor.shutdown(); } }
|
2.1.2. supplyAsync
CompletableFuture.supplyAsync 执行异步线程 并 返回计算结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private void testSupplyAsync() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("3-key", "3-value"); LOGGER.info("result:{}", result); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end...."); return result; } }, threadPoolExecutor); String result = completableFuture.get(); LOGGER.info("end result:{}", result); threadPoolExecutor.shutdown(); }
|
2.2. whenComplete、exceptionally
三个方法 whenComplete、whenCompleteAsync、exceptionally 都是 CompletableFuture 运行完成 或 抛出异常 后继续执行的方法(任务),相当一个后置处理。
方法 | 入参 | 注意 |
---|
whenComplete | whenComplete(BiConsumer<? super T, ? super Throwable> action) | 任务运行完成后,当前任务的线程继续执行 whenComplete 任务。
任务运行 成功/失败 都会执行 whenComplete。 |
whenCompleteAsync | whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) | 任务运行完成后,whenCompleteAsync任务 提交给线程池进行处理。
任务运行 成功/失败 都会执行 whenCompleteAsync。 |
exceptionally | exceptionally(Function<Throwable, ? extends T> fn) | 只有当 CompletableFuture 抛出异常的时候,才会触发 exceptionally 任务。
只有任务运行失败,才会执行。 |
2.2.1. whenComplete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| private void testWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("4-key", "4-value"); LOGGER.info("result:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end...."); return result; }, threadPoolExecutor);
CompletableFuture<String> whenComplete = completableFuture.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) { if (Objects.nonNull(throwable)) { LOGGER.error("error message:{}", throwable.getMessage(), throwable); } LOGGER.warn("运行结束,关闭资源"); threadPoolExecutor.shutdown(); } }); LOGGER.info("end result:{}", whenComplete.get()); }
|
运行结果:
1 2 3 4
| [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 83 - result:OK [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 87 - ....end.... [pool-1-thread-1] WARN org.future.two.CFutureMainApp02 - 102 - 运行结束,关闭资源 [main] INFO org.future.two.CFutureMainApp02 - 106 - end result:OK
|
输出结果中,最后 102 行日志打印的线程号,和上面的线程号是一样的。
任务执行完以后,当前任务线程继续执行 whenComplete 任务。
2.2.2. whenCompleteAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| private void testWhenCompleteAsync() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("5-key", "5-value"); LOGGER.info("result:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end...."); return result; }, threadPoolExecutor);
CompletableFuture<String> whenCompleteAsync = completableFuture.whenCompleteAsync(new BiConsumer<String, Throwable>() { @Override public void accept(String s, Throwable throwable) { if (Objects.nonNull(throwable)) { LOGGER.error("error message:{}", throwable.getMessage(), throwable); } LOGGER.warn("运行结束,关闭资源"); threadPoolExecutor.shutdown(); } }); LOGGER.info("end result:{}", whenCompleteAsync.get()); }
|
运行结果:
1 2 3 4
| default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 116 - result:OK default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 120 - ....end.... default [ForkJoinPool.commonPool-worker-1] WARN org.future.two.CFutureMainApp02 - 133 - 运行结束,关闭资源 default [main] INFO org.future.two.CFutureMainApp02 - 137 - end result:OK
|
输出结果中,133 行日志打印的线程号,和上面的线程号不一致,whenCompleteAsync 中的任务 提交给 ForkJoinPool.commonPool() 线程池运行。
2.2.3. exceptionally
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private void testExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("6-key", "6-value"); LOGGER.info("result:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end...."); return result; }, threadPoolExecutor);
CompletableFuture<String> exceptionally = completableFuture.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable throwable) { LOGGER.error("error message:{}", throwable.getMessage(), throwable); LOGGER.warn("运行结束,关闭资源"); threadPoolExecutor.shutdown(); return "error"; } }); LOGGER.info("end result:{}", exceptionally.get()); }
|
正常的运行结果
1 2 3
| default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 147 - result:OK default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 151 - ....end.... default [main] INFO org.future.two.CFutureMainApp02 - 168 - end result:OK
|
如果运行中发生了异常,exceptionally 任务就会执行。(将模拟异常注释放开)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 147 - result:OK default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 151 - ....end.... default [pool-1-thread-1] ERROR org.future.two.CFutureMainApp02 - 161 - error message:java.lang.ArrayIndexOutOfBoundsException: 100 java.util.concurrent.CompletionException: java.lang.ArrayIndexOutOfBoundsException: 100 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: 100 at org.future.two.CFutureMainApp02.lambda$testExceptionally$3(CFutureMainApp02.java:154) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 3 common frames omitted default [pool-1-thread-1] WARN org.future.two.CFutureMainApp02 - 162 - 运行结束,关闭资源 default [main] INFO org.future.two.CFutureMainApp02 - 167 - end result:error
|
2.3. thenApply、handle
当一个线程依赖另一个线程的结果时,或对线程结果进行转换。可以使用 thenApply、handle 两个方法将其串行化。
当上一个线程运行异常,thenApply方法不会执行,而handle方法则可以处理异常信息。
方法 | 入参 | 注意 |
---|
thenApply | thenApply( Function<? super T,? extends U> fn) | |
thenApplyAsync | thenApplyAsync(Function<? super T,? extends U> fn)
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 使用ForkJoinPool线程池 使用自定义线程池 |
handle | handle(BiFunction<? super T, Throwable, ? extends U> fn) | |
handleAsync | handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) | 使用ForkJoinPool线程池 使用自定义线程池 |
2.3.1. thenApply
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| private void testThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("7-key", "7-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> completableFuture = supplyAsync.thenApplyAsync(s -> { LOGGER.info("result-2:{}", s); if ("OK".equals(s)) { s = s + "5555"; } else { s = s + "6666"; } return s; }, threadPoolExecutor);
String result = completableFuture.get(); LOGGER.info("end result:{}", result); }
|
2.3.2. handle
handle 方法不仅可以处理上一线程的结果,还能处理上一线程中的异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| private void testHandle() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() {
String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("8-key", "8-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); }
LOGGER.info("....end-1...."); return result; } }, threadPoolExecutor);
CompletableFuture<String> handle = supplyAsync.handleAsync(new BiFunction<String, Throwable, String>() { @Override public String apply(String s, Throwable throwable) {
LOGGER.info("receive info:{}", s); try { if (Objects.nonNull(throwable)) { LOGGER.warn("errMsg:{}", throwable.getMessage()); s = "error"; } else { LOGGER.info("info:{}", s); if ("OK".equals(s)) { s += "3333"; } else { s += "4444"; } } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { LOGGER.info("关闭资源"); threadPoolExecutor.shutdown(); } return s; } }, threadPoolExecutor);
String result = handle.get(); LOGGER.info("end result:{}", result); }
|
2.4. thenAccept、thenRun
接收上一个线程的结果,纯消费、处理 ,无结果返回。
thenRun 跟 thenAccept 方法不一样的是,thenRun 不关心上一线程的处理结果。只要上一线程执行完成,就开始执行 thenRun。而 thenAccept 能接收上一线程的结果。
可以当做后置处理。
方法 | 入参 | 注意 |
---|
thenAccept | thenAccept(Consumer<? super T> action) | 接收上一线程的结果。纯处理,无返回结果。 |
thenAcceptAsync | thenAcceptAsync(Consumer<? super T> action)
thenAcceptAsync(Consumer<? super T> action,Executor executor) | 使用 ForkJoinPool 线程池
使用 Executor 自定义线程池 |
thenRun | thenRun(Runnable action) | 不关心上一线程结果。纯处理,无返回结果。入参是runnable线程。 |
thenRunAsync | thenRunAsync(Runnable action)
thenRunAsync(Runnable action,Executor executor) | 使用 ForkJoinPool 线程池
使用 Executor 自定义线程池 |
2.4.1. thenAccept
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| private void testThenAccept() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("10-key", "10-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; } }, threadPoolExecutor);
CompletableFuture<Void> thenAccept = supplyAsync.thenAcceptAsync(new Consumer<String>() { @Override public void accept(String s) { LOGGER.info("receive msg:{},模拟推送kafka", s); } }, threadPoolExecutor);
thenAccept.get(); }
|
2.4.2. thenRun
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| private void testThenRun() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("11-key", "11-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; } }, threadPoolExecutor);
CompletableFuture<Void> thenRun = supplyAsync.thenRunAsync(new Runnable() { @Override public void run() { LOGGER.info("runnable线程,不关心上一线程结果"); LOGGER.info("后置处理....."); } }, threadPoolExecutor);
thenRun.get(); }
|
2.5. thenCombine、thenAcceptBoth
thenCombine、thenAcceptBoth 用来合并任务。
等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。
区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值
方法 | 入参 | 注意 |
---|
thenCombine | thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) | |
thenCombineAsync | thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) | |
thenAcceptBoth | thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) | |
thenAcceptBothAsync | thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) | 使用 ForkJoinPool 线程池
使用 Executor 自定义线程池 |
2.5.1. thenCombine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| private void testThenCombine() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("12-key", "12-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("13-key", "13-value"); LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<String> thenCombine = supplyAsync1.thenCombineAsync(supplyAsync2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) { return s + " 666 " + s2; } }, threadPoolExecutor);
String result = thenCombine.get(); LOGGER.info("end result:{}", result); }
|
2.5.2. thenAcceptBoth
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| private void testThenAcceptBoth() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("12-key", "12-value"); LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep(1000); result = shardedJedisOperate.set("13-key", "13-value"); LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<Void> thenAcceptBoth = supplyAsync1.thenAcceptBothAsync(supplyAsync2, new BiConsumer<String, String>() { @Override public void accept(String s, String s2) { LOGGER.info("receive s:{},s2:{}", s, s2); LOGGER.info("合并数据,模拟推送kafka...."); } }, threadPoolExecutor);
thenAcceptBoth.get(); }
|
2.6. applyToEither 、acceptEither
当任意一个 CompletableFuture 完成的时候,fn会被执行,它的返回值会当作新的 CompletableFuture 的计算结果。
两个 CompletableFuture ,任意一个 CompletableFuture 执行完(返回结果快),就取哪个 CompletableFuture 的结果,调用 applyToEither 处理。
方法 | 入参 | 注意 |
---|
applyToEither | applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) | 有返回值 |
applyToEitherAsync | applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) | 有返回值,使用 ForkJoinPool 线程池
有返回值,使用 Executor 自定义线程池 |
acceptEither | acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) | 无返回值 |
acceptEitherAsync | acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor) | 无返回值,使用 ForkJoinPool 线程池
无返回值,使用 Executor 自定义线程池 |
2.6.1. applyToEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| private void testApplyToEither() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("14-key", "14-value"); result += "555"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("15-key", "15-value"); result += "666"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<String> applyToEither = supplyAsync1.applyToEitherAsync(supplyAsync2, new Function<String, String>() { @Override public String apply(String s) { LOGGER.info("first result:{}", s); return s; } }, threadPoolExecutor);
String result = applyToEither.get(); LOGGER.info("end result:{}", result); }
|
2.6.2. acceptEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| private void testAcceptEither() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("16-key", "16-value"); result += "111"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("17-key", "17-value"); result += "333"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<Void> acceptEither = supplyAsync1.acceptEitherAsync(supplyAsync2, new Consumer<String>() { @Override public void accept(String s) { LOGGER.info("receive msg:{}", s); } }, threadPoolExecutor);
acceptEither.get(); }
|
2.7. runAfterEither 、runAfterBoth
runAfterEither:两个互不相干的任务A和B,只要任何一个完成就触发执行线程Runnable
两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable),无返回值。
方法 | 入参 | 注意 |
---|
runAfterEither | runAfterEither(CompletionStage<?> other,Runnable action) | Runnable线程 |
runAfterEitherAsync | runAfterEitherAsync(CompletionStage</?> other,Runnable action)
runAfterEitherAsync(CompletionStage</?> other,Runnable action,Executor executor) | 使用 ForkJoinPool 线程池
使用 Executor 自定义线程池 |
runAfterBoth | runAfterBoth(CompletionStage<?> other,Runnable action) | |
runAfterBothAsync | - | - |
2.7.1. runAfterEither
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| private void testRunAfterEither() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("18-key", "18-value"); result += "000"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("19-key", "19-value"); result += "222"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<Void> runAfterEither = supplyAsync1.runAfterEitherAsync(supplyAsync2, new Runnable() { @Override public void run() { LOGGER.info("触发某些业务....."); } }, threadPoolExecutor);
runAfterEither.get(); }
|
2.7.2. runAfterBoth
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| private void testRunAfterBoth() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("20-key", "20-value"); result += "777"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("21-key", "21-value"); result += "888"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<Void> runAfterBoth = supplyAsync1.runAfterBothAsync(supplyAsync2, new Runnable() { @Override public void run() { LOGGER.info("触发某些业务....."); } }, threadPoolExecutor); runAfterBoth.get(); }
|
2.8. thenCompose
组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| private void testThenCompose() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("22-key", "22-value"); result += "999"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> thenCompose = supplyAsync1.thenComposeAsync(new Function<String, CompletionStage<String>>() { @Override public CompletionStage<String> apply(String s) { LOGGER.info("receive msg:{}", s); return CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("23-key", "23-value"); result += "9852631--"; result += s; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; } }); } });
String result = thenCompose.get(); LOGGER.info("end result:{}", result); }
|
2.9. allOf、anyOf
allOf:所有的CompletableFuture都执行完后执行计算
anyOf:任意一个CompletableFuture执行完后就会执行计算
2.9.1. allOf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| private void testAllOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500)); result = shardedJedisOperate.set("22-key", "22-value"); result += "000"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("23-key", "23-value"); result += "222"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
List<CompletableFuture<String>> futureList = new ArrayList<>(Arrays.asList(supplyAsync1, supplyAsync2)); CompletableFuture<List<String>> completableFuture = allOf(futureList); List<String> result = completableFuture.get(); LOGGER.info("end - result:{}", result); }
public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) { CompletableFuture<Void> allFuturesResult = CompletableFuture .allOf(futuresList.toArray(new CompletableFuture[futuresList.size()])); return allFuturesResult.thenApply(v -> futuresList.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()) ); }
|
2.9.2. anyOf
任意一个CompletableFuture执行完后就会执行计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private void testAnyOf() throws ExecutionException, InterruptedException { CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 3500)); result = shardedJedisOperate.set("24-key", "24-value"); result += "000"; LOGGER.info("result-1:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-1...."); return result; }, threadPoolExecutor);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> { String result = StringUtils.EMPTY; try { TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000)); result = shardedJedisOperate.set("25-key", "25-value"); result += "222"; LOGGER.info("result-2:{}", result); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } LOGGER.info("....end-2...."); return result; }, threadPoolExecutor);
CompletableFuture<Object> anyOfResult = CompletableFuture.anyOf(supplyAsync1, supplyAsync2); Object result = anyOfResult.get(); LOGGER.info("end result:{}", result); }
|
3. 使用示例
项目中,需要查询页面的菜单和功能信息;为了提高性能,可以使用CompletableFuture
并行查询。
虽然可以并行查询菜单和功能信息,但是接口逻辑同步的,需要等待菜单、功能信息全部查询完毕,才能返回结果数据。
因此,阻塞等待两个线程都完成了以后,再返回数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
private void getMenuAndFeatureInfo(String roleKey, RoleMenuFeatureBO roleMenuFeatureBO) { CompletableFuture<Map<String, Object>> menusFuture = menuService.getMenuCompletableFuture(roleKey); CompletableFuture<Map<String, Object>> featuresFuture = featureService.getFeatureCompletableFuture(roleKey);
Map<String, Object> collect = allOfGetMap(new ArrayList<>(Arrays.asList(menusFuture, featuresFuture)));
roleMenuFeatureBO.setMenus((List<MenuBO>) collect.get(MENUS_STR)); roleMenuFeatureBO.setFeatures((List<FeatureBO>) collect.get(FEATURES_STR)); }
@Override public CompletableFuture<Map<String, Object>> getMenuCompletableFuture(String roleKey) { return CompletableFuture.supplyAsync(() -> { Map<String, Object> datas = new HashMap<>(); this.queryByRoleKey(roleKey).ifPresent(menuBoS -> { datas.put(MENUS_STR, menuBoS); }); return datas; }, toolThreadPool); }
public static Map<String, Object> allOfGetMap(final List<CompletableFuture<Map<String, Object>>> futuresList) { CompletableFuture<List<Map<String, Object>>> resultFuture = allOf(futuresList); List<Map<String, Object>> getResult = Collections.emptyList(); try { getResult = resultFuture.get(); } catch (Exception e) { log.error(e.getMessage(), e); } return toMap(getResult); }
|
4. Reference