Future 和 CompletableFuture
2025-01-22 08:19:30    7.3k 字   
This post is also available in English and alternative languages.

1. 普通Future

普通线程的创建方式有两种,①继承Thread类②实现Runnable接口

这两种方式创建的线程,在运行结束后,都无法直接获取结果。

Java 5.0 引入 Callable 和 Future,通过它们创建的线程,在运行结束后可以通过 get()方法 获取结果。

下面两个示例,通过 Callable 创建线程,提交到线程池中运行,然后获得 Future 对象,通过 Future .get() 获取运行结果。


1.1. get() 方法示例

get()方法是阻塞方法,一直到线程运行完毕(抛出异常)才能返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class FutureMainApp01 {

private static final Logger LOGGER = LoggerFactory.getLogger(FutureMainApp01.class);

private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
16, 32, 30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) {
FutureMainApp01 futureMainApp01 = new FutureMainApp01();
futureMainApp01.process();
}

private void process() {
try {
ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate();
AnalysisData analysisData = new AnalysisData(shardedJedisOperate, "1-key", "1-value");
Future<String> future = threadPoolExecutor.submit(analysisData);
//String result = future.get();
String result = future.get(300,TimeUnit.MILLISECONDS);
LOGGER.info("result:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
threadPoolExecutor.shutdown();
}
}
}

class AnalysisData implements Callable<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisData.class);

private ShardedJedisOperate shardedJedisOperate;
private String key;
private String value;

public AnalysisData(ShardedJedisOperate shardedJedisOperate, String key, String value) {
this.shardedJedisOperate = shardedJedisOperate;
this.key = key;
this.value = value;
}

@Override
public String call() {
LOGGER.info("set redis info key:{},val:{}", key, value);
return shardedJedisOperate.set(key, value);
}
}


1.2. get(timeout, timeUnit) 方法示例

该方法可以设定时间,在指定时间内没有获取到结果,则直接返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FutureMainApp01 {

private static final Logger LOGGER = LoggerFactory.getLogger(FutureMainApp01.class);

private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
16, 32, 30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) {
FutureMainApp01 futureMainApp01 = new FutureMainApp01();
futureMainApp01.process();
}

private void process() {
try {
ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate();
AnalysisData analysisData = new AnalysisData(shardedJedisOperate, "1-key", "1-value");
Future<String> future = threadPoolExecutor.submit(analysisData);
String result = future.get(300,TimeUnit.MILLISECONDS);
LOGGER.info("result:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
threadPoolExecutor.shutdown();
}
}
}

class AnalysisData implements Callable<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(AnalysisData.class);

private ShardedJedisOperate shardedJedisOperate;
private String key;
private String value;

public AnalysisData(ShardedJedisOperate shardedJedisOperate, String key, String value) {
this.shardedJedisOperate = shardedJedisOperate;
this.key = key;
this.value = value;
}

@Override
public String call() {
LOGGER.info("set redis info key:{},val:{}", key, value);
return shardedJedisOperate.set(key, value);
}
}

Future 还有其他一些方法,比如:

  • isDone() 方法:检查线程是否运行完成。

  • cancel() 方法:停止线程的执行。


1.3. 局限性

Thread、Runnable 线程的使用,属于异步计算,并不能从线程中得到返回结果。

Future 线程 可以监视目标线程调用Call()方法,当调用 Future.get()方法以获取结果时,当前线程就开始阻塞,直至call()方法结束返回结果。

虽然 Future 提供了异步执行任务的能力,但获取结果还是很不方便,只有通过阻塞或轮询得到结果。

而且,如果要处理多个线程结果很不方便。

在实际业务中有一个对外接口,要补足商品信息,同时要调用四五个外部接口,之前使用的是普通Future,多个Future结果处理比较麻烦,后来使用 CompletableFuture 进行优化。


2. CompletableFuture

JDK8 中新增了一个类 CompletableFuture ,对 Future 进行很大的扩展,简化异步编程。


2.1. runAsync、supplyAsync

runAsync、supplyAsync 方法是 CompletableFuture 提供的创建异步操作的方法。

方法入参注意
runAsyncCompletableFuturerunAsync(Runnable runnable)

CompletableFuturerunAsync(Runnable runnable,Executor executor)
无返回结果
supplyAsync<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
有返回结果

如果没有指定 线程池,将会使用 ForkJoinPool.commonPool() 作为线程池执行异步代码;如果指定线程池,则使用指定的线程池运行。

runAsync、supplyAsync 两个方法从使用层面上看,就好像是 Thread / Runnable、Callable / Future,一个没有结果一个有结果。


2.1.1. runAsync

CompletableFuture.runAsync 只是执行一个异步线程,它不会返回结果,但返回一个 CompletableFuture 对象。

如果异步操作不需要返回结果,可以使用runAsync。比如需要后台运行一些任务。(也可以使用它进行重新组装和调配?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CFutureMainApp01 {

private static final Logger LOGGER = LoggerFactory.getLogger(CFutureMainApp01.class);

private ShardedJedisOperate shardedJedisOperate = new ShardedJedisOperate();

private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
16, 32, 30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws ExecutionException, InterruptedException {
CFutureMainApp01 cFutureMainApp01 = new CFutureMainApp01();
cFutureMainApp01.testRunAsync();
}

private void testRunAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(1000);
String result = shardedJedisOperate.set("2-key", "2-value");
LOGGER.info("result:{}", result);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end....");
}, threadPoolExecutor);
voidCompletableFuture.get();
threadPoolExecutor.shutdown();
}
}

2.1.2. supplyAsync

CompletableFuture.supplyAsync 执行异步线程 并 返回计算结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void testSupplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("3-key", "3-value");
LOGGER.info("result:{}", result);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end....");
return result;
}
}, threadPoolExecutor);
String result = completableFuture.get();
LOGGER.info("end result:{}", result);
threadPoolExecutor.shutdown();
}

2.2. whenComplete、exceptionally

三个方法 whenComplete、whenCompleteAsync、exceptionally 都是 CompletableFuture 运行完成 或 抛出异常 后继续执行的方法(任务),相当一个后置处理。

方法入参注意
whenCompletewhenComplete(BiConsumer<? super T, ? super Throwable> action)任务运行完成后,当前任务的线程继续执行 whenComplete 任务。

任务运行 成功/失败 都会执行 whenComplete。
whenCompleteAsyncwhenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
任务运行完成后,whenCompleteAsync任务 提交给线程池进行处理。

任务运行 成功/失败 都会执行 whenCompleteAsync。
exceptionallyexceptionally(Function<Throwable, ? extends T> fn)只有当 CompletableFuture 抛出异常的时候,才会触发 exceptionally 任务。

只有任务运行失败,才会执行。

2.2.1. whenComplete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void testWhenComplete() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("4-key", "4-value");
LOGGER.info("result:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}, threadPoolExecutor);

CompletableFuture<String> whenComplete = completableFuture.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
if (Objects.nonNull(throwable)) {
LOGGER.error("error message:{}", throwable.getMessage(), throwable);
}
LOGGER.warn("运行结束,关闭资源");
threadPoolExecutor.shutdown();
}
});
LOGGER.info("end result:{}", whenComplete.get());
}

运行结果:

1
2
3
4
[pool-1-thread-1] INFO  org.future.two.CFutureMainApp02 - 83 - result:OK
[pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 87 - ....end....
[pool-1-thread-1] WARN org.future.two.CFutureMainApp02 - 102 - 运行结束,关闭资源
[main] INFO org.future.two.CFutureMainApp02 - 106 - end result:OK

输出结果中,最后 102 行日志打印的线程号,和上面的线程号是一样的。

任务执行完以后,当前任务线程继续执行 whenComplete 任务。


2.2.2. whenCompleteAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void testWhenCompleteAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("5-key", "5-value");
LOGGER.info("result:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}, threadPoolExecutor);

CompletableFuture<String> whenCompleteAsync = completableFuture.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
if (Objects.nonNull(throwable)) {
LOGGER.error("error message:{}", throwable.getMessage(), throwable);
}
LOGGER.warn("运行结束,关闭资源");
threadPoolExecutor.shutdown();
}
});
LOGGER.info("end result:{}", whenCompleteAsync.get());
}

运行结果:

1
2
3
4
default [pool-1-thread-1] INFO  org.future.two.CFutureMainApp02 - 116 - result:OK
default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 120 - ....end....
default [ForkJoinPool.commonPool-worker-1] WARN org.future.two.CFutureMainApp02 - 133 - 运行结束,关闭资源
default [main] INFO org.future.two.CFutureMainApp02 - 137 - end result:OK

输出结果中,133 行日志打印的线程号,和上面的线程号不一致,whenCompleteAsync 中的任务 提交给 ForkJoinPool.commonPool() 线程池运行。


2.2.3. exceptionally

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void testExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("6-key", "6-value");
LOGGER.info("result:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}, threadPoolExecutor);

CompletableFuture<String> exceptionally = completableFuture.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
LOGGER.error("error message:{}", throwable.getMessage(), throwable);
LOGGER.warn("运行结束,关闭资源");
threadPoolExecutor.shutdown();
return "error";
}
});
LOGGER.info("end result:{}", exceptionally.get());
}

正常的运行结果

1
2
3
default [pool-1-thread-1] INFO  org.future.two.CFutureMainApp02 - 147 - result:OK
default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 151 - ....end....
default [main] INFO org.future.two.CFutureMainApp02 - 168 - end result:OK

如果运行中发生了异常,exceptionally 任务就会执行。(将模拟异常注释放开)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
default [pool-1-thread-1] INFO  org.future.two.CFutureMainApp02 - 147 - result:OK
default [pool-1-thread-1] INFO org.future.two.CFutureMainApp02 - 151 - ....end....
default [pool-1-thread-1] ERROR org.future.two.CFutureMainApp02 - 161 - error message:java.lang.ArrayIndexOutOfBoundsException: 100
java.util.concurrent.CompletionException: java.lang.ArrayIndexOutOfBoundsException: 100
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 100
at org.future.two.CFutureMainApp02.lambda$testExceptionally$3(CFutureMainApp02.java:154)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 common frames omitted
default [pool-1-thread-1] WARN org.future.two.CFutureMainApp02 - 162 - 运行结束,关闭资源
default [main] INFO org.future.two.CFutureMainApp02 - 167 - end result:error

2.3. thenApply、handle

当一个线程依赖另一个线程的结果时,或对线程结果进行转换。可以使用 thenApply、handle 两个方法将其串行化。

当上一个线程运行异常,thenApply方法不会执行,而handle方法则可以处理异常信息。

方法入参注意
thenApplythenApply( Function<? super T,? extends U> fn)
thenApplyAsyncthenApplyAsync(Function<? super T,? extends U> fn)

thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
使用ForkJoinPool线程池
使用自定义线程池
handlehandle(BiFunction<? super T, Throwable, ? extends U> fn)
handleAsynchandleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
使用ForkJoinPool线程池
使用自定义线程池

2.3.1. thenApply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void testThenApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("7-key", "7-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
////模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}, threadPoolExecutor);

//test thenApply
//CompletableFuture<String> completableFuture = supplyAsync.thenApply(s -> {
// LOGGER.info("result-2:{}", s);
// if ("OK".equals(s)) {
// s = s + "6666";
// } else {
// s = s + "9999";
// }
// return s;
//});

//test thenApplyAsync ForkJoinPool
//CompletableFuture<String> completableFuture = supplyAsync.thenApplyAsync(s -> {
// LOGGER.info("result-2:{}", s);
// if ("OK".equals(s)) {
// s = s + "5555";
// } else {
// s = s + "6666";
// }
// return s;
//});

//test thenApplyAsync
CompletableFuture<String> completableFuture = supplyAsync.thenApplyAsync(s -> {
LOGGER.info("result-2:{}", s);
if ("OK".equals(s)) {
s = s + "5555";
} else {
s = s + "6666";
}
return s;
}, threadPoolExecutor);

String result = completableFuture.get();
LOGGER.info("end result:{}", result);
}

2.3.2. handle

handle 方法不仅可以处理上一线程的结果,还能处理上一线程中的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
private void testHandle() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {

String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("8-key", "8-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}

LOGGER.info("....end-1....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}
}, threadPoolExecutor);

//test handle
//CompletableFuture<String> handle = supplyAsync.handle(new BiFunction<String, Throwable, String>() {
// @Override
// public String apply(String s, Throwable throwable) {
//
// LOGGER.info("receive info:{}", s);
// try {
// if (Objects.nonNull(throwable)) {
// LOGGER.warn("errMsg:{}", throwable.getMessage());
// s = "error";
// } else {
// LOGGER.info("info:{}", s);
// if ("OK".equals(s)) {
// s += "3333";
// } else {
// s += "4444";
// }
// }
// } catch (Exception e) {
// LOGGER.error(e.getMessage(), e);
// } finally {
// LOGGER.info("关闭资源");
// threadPoolExecutor.shutdown();
// }
// return s;
// }
//});

//test handle ForkJoinPool
//CompletableFuture<String> handle = supplyAsync.handleAsync(new BiFunction<String, Throwable, String>() {
// @Override
// public String apply(String s, Throwable throwable) {
//
// LOGGER.info("receive info:{}", s);
// try {
// if (Objects.nonNull(throwable)) {
// LOGGER.warn("errMsg:{}", throwable.getMessage());
// s = "error";
// } else {
// LOGGER.info("info:{}", s);
// if ("OK".equals(s)) {
// s += "3333";
// } else {
// s += "4444";
// }
// }
// } catch (Exception e) {
// LOGGER.error(e.getMessage(), e);
// } finally {
// LOGGER.info("关闭资源");
// threadPoolExecutor.shutdown();
// }
// return s;
// }
//});

//test handle Executor
CompletableFuture<String> handle = supplyAsync.handleAsync(new BiFunction<String, Throwable, String>() {
@Override
public String apply(String s, Throwable throwable) {

LOGGER.info("receive info:{}", s);
try {
if (Objects.nonNull(throwable)) {
LOGGER.warn("errMsg:{}", throwable.getMessage());
s = "error";
} else {
LOGGER.info("info:{}", s);
if ("OK".equals(s)) {
s += "3333";
} else {
s += "4444";
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
LOGGER.info("关闭资源");
threadPoolExecutor.shutdown();
}
return s;
}
}, threadPoolExecutor);

String result = handle.get();
LOGGER.info("end result:{}", result);
}

2.4. thenAccept、thenRun

接收上一个线程的结果,纯消费、处理 ,无结果返回。

thenRun 跟 thenAccept 方法不一样的是,thenRun 不关心上一线程的处理结果。只要上一线程执行完成,就开始执行 thenRun。而 thenAccept 能接收上一线程的结果。

可以当做后置处理。

方法入参注意
thenAcceptthenAccept(Consumer<? super T> action)接收上一线程的结果。纯处理,无返回结果。
thenAcceptAsyncthenAcceptAsync(Consumer<? super T> action)

thenAcceptAsync(Consumer<? super T> action,Executor executor)
使用 ForkJoinPool 线程池

使用 Executor 自定义线程池
thenRunthenRun(Runnable action)不关心上一线程结果。纯处理,无返回结果。入参是runnable线程。
thenRunAsyncthenRunAsync(Runnable action)

thenRunAsync(Runnable action,Executor executor)
使用 ForkJoinPool 线程池

使用 Executor 自定义线程池

2.4.1. thenAccept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void testThenAccept() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("10-key", "10-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}
}, threadPoolExecutor);

//thenAccept ForkJoinPool
//CompletableFuture<Void> thenAccept = supplyAsync.thenAccept(new Consumer<String>() {
// @Override
// public void accept(String s) {
// LOGGER.info("receive msg:{},模拟推送kafka", s);
// }
//});

//thenAcceptAsync ForkJoinPool
//CompletableFuture<Void> thenAccept = supplyAsync.thenAcceptAsync(new Consumer<String>() {
// @Override
// public void accept(String s) {
// LOGGER.info("receive msg:{},模拟推送kafka", s);
// }
//});

//thenAcceptAsync Executor
CompletableFuture<Void> thenAccept = supplyAsync.thenAcceptAsync(new Consumer<String>() {
@Override
public void accept(String s) {
LOGGER.info("receive msg:{},模拟推送kafka", s);
}
}, threadPoolExecutor);

thenAccept.get();
}

2.4.2. thenRun

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void testThenRun() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("11-key", "11-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
//模拟异常
//String[] a = new String[2];
//LOGGER.info("index:{}", a[100]);
return result;
}
}, threadPoolExecutor);

//CompletableFuture<Void> thenRun = supplyAsync.thenRun(new Runnable() {
// @Override
// public void run() {
// LOGGER.info("runnable线程,不关心上一线程结果");
// LOGGER.info("后置处理.....");
// }
//});

//CompletableFuture<Void> thenRun = supplyAsync.thenRunAsync(new Runnable() {
// @Override
// public void run() {
// LOGGER.info("runnable线程,不关心上一线程结果");
// LOGGER.info("后置处理.....");
// }
//});

CompletableFuture<Void> thenRun = supplyAsync.thenRunAsync(new Runnable() {
@Override
public void run() {
LOGGER.info("runnable线程,不关心上一线程结果");
LOGGER.info("后置处理.....");
}
}, threadPoolExecutor);

thenRun.get();
}

2.5. thenCombine、thenAcceptBoth

thenCombine、thenAcceptBoth 用来合并任务。

等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。

区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值

方法入参注意
thenCombinethenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
thenCombineAsyncthenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
thenAcceptBoththenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
thenAcceptBothAsyncthenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)

thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
使用 ForkJoinPool 线程池

使用 Executor 自定义线程池

2.5.1. thenCombine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void testThenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("12-key", "12-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("13-key", "13-value");
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

// thenCombine
//CompletableFuture<String> thenCombine = supplyAsync1.thenCombine(supplyAsync2, new BiFunction<String, String, String>() {
// @Override
// public String apply(String s, String s2) {
// return s + " 666 " + s2;
// }
//});

// thenCombineAsync
//CompletableFuture<String> thenCombine = supplyAsync1.thenCombineAsync(supplyAsync2, new BiFunction<String, String, String>() {
// @Override
// public String apply(String s, String s2) {
// return s + " 666 " + s2;
// }
//});

// thenCombineAsync Executor
CompletableFuture<String> thenCombine = supplyAsync1.thenCombineAsync(supplyAsync2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) {
return s + " 666 " + s2;
}
}, threadPoolExecutor);

String result = thenCombine.get();
LOGGER.info("end result:{}", result);
}

2.5.2. thenAcceptBoth

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private void testThenAcceptBoth() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("12-key", "12-value");
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep(1000);
result = shardedJedisOperate.set("13-key", "13-value");
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

//thenAcceptBoth
//CompletableFuture<Void> thenAcceptBoth = supplyAsync1.thenAcceptBoth(supplyAsync2, new BiConsumer<String, String>() {
// @Override
// public void accept(String s, String s2) {
// LOGGER.info("receive s:{},s2:{}", s, s2);
// LOGGER.info("合并数据,模拟推送kafka....");
// }
//});

//thenAcceptBothAsync ForkJoinPool
//CompletableFuture<Void> thenAcceptBoth = supplyAsync1.thenAcceptBothAsync(supplyAsync2, new BiConsumer<String, String>() {
// @Override
// public void accept(String s, String s2) {
// LOGGER.info("receive s:{},s2:{}", s, s2);
// LOGGER.info("合并数据,模拟推送kafka....");
// }
//});

//thenAcceptBothAsync Executor
CompletableFuture<Void> thenAcceptBoth = supplyAsync1.thenAcceptBothAsync(supplyAsync2, new BiConsumer<String, String>() {
@Override
public void accept(String s, String s2) {
LOGGER.info("receive s:{},s2:{}", s, s2);
LOGGER.info("合并数据,模拟推送kafka....");
}
}, threadPoolExecutor);

thenAcceptBoth.get();
}

2.6. applyToEither 、acceptEither

当任意一个 CompletableFuture 完成的时候,fn会被执行,它的返回值会当作新的 CompletableFuture 的计算结果。

两个 CompletableFuture ,任意一个 CompletableFuture 执行完(返回结果快),就取哪个 CompletableFuture 的结果,调用 applyToEither 处理。

方法入参注意
applyToEitherapplyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)有返回值
applyToEitherAsyncapplyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)

applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
有返回值,使用 ForkJoinPool 线程池

有返回值,使用 Executor 自定义线程池
acceptEitheracceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)无返回值
acceptEitherAsyncacceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
无返回值,使用 ForkJoinPool 线程池

无返回值,使用 Executor 自定义线程池

2.6.1. applyToEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void testApplyToEither() throws ExecutionException, InterruptedException {

CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("14-key", "14-value");
result += "555";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("15-key", "15-value");
result += "666";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

//applyToEither
//CompletableFuture<String> applyToEither = supplyAsync1.applyToEither(supplyAsync2, new Function<String, String>() {
// @Override
// public String apply(String s) {
// LOGGER.info("first result:{}", s);
// return s;
// }
//});

//applyToEitherAsync ForkJoinPool
//CompletableFuture<String> applyToEither = supplyAsync1.applyToEitherAsync(supplyAsync2, new Function<String, String>() {
// @Override
// public String apply(String s) {
// LOGGER.info("first result:{}", s);
// return s;
// }
//});

//applyToEitherAsync Executor
CompletableFuture<String> applyToEither = supplyAsync1.applyToEitherAsync(supplyAsync2, new Function<String, String>() {
@Override
public String apply(String s) {
LOGGER.info("first result:{}", s);
return s;
}
}, threadPoolExecutor);

String result = applyToEither.get();
LOGGER.info("end result:{}", result);
}

2.6.2. acceptEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void testAcceptEither() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("16-key", "16-value");
result += "111";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("17-key", "17-value");
result += "333";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

//acceptEither
//CompletableFuture<Void> acceptEither = supplyAsync1.acceptEither(supplyAsync2, new Consumer<String>() {
// @Override
// public void accept(String s) {
// LOGGER.info("receive msg:{}", s);
// }
//});

//acceptEitherAsync ForkJoinPool
//CompletableFuture<Void> acceptEither = supplyAsync1.acceptEitherAsync(supplyAsync2, new Consumer<String>() {
// @Override
// public void accept(String s) {
// LOGGER.info("receive msg:{}", s);
// }
//});

//acceptEitherAsync Executor
CompletableFuture<Void> acceptEither = supplyAsync1.acceptEitherAsync(supplyAsync2, new Consumer<String>() {
@Override
public void accept(String s) {
LOGGER.info("receive msg:{}", s);
}
}, threadPoolExecutor);

acceptEither.get();
}

2.7. runAfterEither 、runAfterBoth

runAfterEither:两个互不相干的任务A和B,只要任何一个完成就触发执行线程Runnable

两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable),无返回值。

方法入参注意
runAfterEitherrunAfterEither(CompletionStage<?> other,Runnable action)Runnable线程
runAfterEitherAsyncrunAfterEitherAsync(CompletionStage</?> other,Runnable action)

runAfterEitherAsync(CompletionStage</?> other,Runnable action,Executor executor)
使用 ForkJoinPool 线程池

使用 Executor 自定义线程池
runAfterBothrunAfterBoth(CompletionStage<?> other,Runnable action)
runAfterBothAsync--

2.7.1. runAfterEither

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void testRunAfterEither() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("18-key", "18-value");
result += "000";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("19-key", "19-value");
result += "222";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

//test runAfterEither
//CompletableFuture<Void> runAfterEither = supplyAsync1.runAfterEither(supplyAsync2, new Runnable() {
// @Override
// public void run() {
// LOGGER.info("触发某些业务.....");
// }
//});

//runAfterEitherAsync ForkJoinPool
//CompletableFuture<Void> runAfterEither = supplyAsync1.runAfterEitherAsync(supplyAsync2, new Runnable() {
// @Override
// public void run() {
// LOGGER.info("触发某些业务.....");
// }
//});

CompletableFuture<Void> runAfterEither = supplyAsync1.runAfterEitherAsync(supplyAsync2, new Runnable() {
@Override
public void run() {
LOGGER.info("触发某些业务.....");
}
}, threadPoolExecutor);

runAfterEither.get();
}

2.7.2. runAfterBoth

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void testRunAfterBoth() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("20-key", "20-value");
result += "777";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("21-key", "21-value");
result += "888";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

//CompletableFuture<Void> runAfterBoth = supplyAsync1.runAfterBoth(supplyAsync2, new Runnable() {
// @Override
// public void run() {
// LOGGER.info("触发某些业务.....");
// }
//});

//runAfterBothAsync ForkJoinPool
//CompletableFuture<Void> runAfterBoth = supplyAsync1.runAfterBothAsync(supplyAsync2, new Runnable() {
// @Override
// public void run() {
// LOGGER.info("触发某些业务.....");
// }
//});

//runAfterBothAsync Executor
CompletableFuture<Void> runAfterBoth = supplyAsync1.runAfterBothAsync(supplyAsync2, new Runnable() {
@Override
public void run() {
LOGGER.info("触发某些业务.....");
}
}, threadPoolExecutor);
runAfterBoth.get();
}

2.8. thenCompose

组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private void testThenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("22-key", "22-value");
result += "999";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

//thenCompose
//CompletableFuture<String> thenCompose = supplyAsync1.thenCompose(new Function<String, CompletionStage<String>>() {
// @Override
// public CompletionStage<String> apply(String s) {
// LOGGER.info("receive msg:{}", s);
// return CompletableFuture.supplyAsync(new Supplier<String>() {
// @Override
// public String get() {
// String result = StringUtils.EMPTY;
// try {
// TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
// result = shardedJedisOperate.set("23-key", "23-value");
// result += "9852631--";
// result += s;
// LOGGER.info("result-2:{}", result);
// } catch (Exception e) {
// LOGGER.error(e.getMessage(), e);
// }
// LOGGER.info("....end-2....");
// return result;
// }
// });
// }
//});

CompletableFuture<String> thenCompose = supplyAsync1.thenComposeAsync(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
LOGGER.info("receive msg:{}", s);
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("23-key", "23-value");
result += "9852631--";
result += s;
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}
});
}
});

//CompletableFuture<String> thenCompose = supplyAsync1.thenComposeAsync(new Function<String, CompletionStage<String>>() {
// @Override
// public CompletionStage<String> apply(String s) {
// LOGGER.info("receive msg:{}", s);
// return CompletableFuture.supplyAsync(new Supplier<String>() {
// @Override
// public String get() {
// String result = StringUtils.EMPTY;
// try {
// TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
// result = shardedJedisOperate.set("23-key", "23-value");
// result += "9852631--";
// result += s;
// LOGGER.info("result-2:{}", result);
// } catch (Exception e) {
// LOGGER.error(e.getMessage(), e);
// }
// LOGGER.info("....end-2....");
// return result;
// }
// });
// }
//}, threadPoolExecutor);

String result = thenCompose.get();
LOGGER.info("end result:{}", result);
}

2.9. allOf、anyOf

allOf:所有的CompletableFuture都执行完后执行计算

anyOf:任意一个CompletableFuture执行完后就会执行计算


2.9.1. allOf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void testAllOf() throws ExecutionException, InterruptedException {

CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1500));
result = shardedJedisOperate.set("22-key", "22-value");
result += "000";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("23-key", "23-value");
result += "222";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

List<CompletableFuture<String>> futureList = new ArrayList<>(Arrays.asList(supplyAsync1, supplyAsync2));
CompletableFuture<List<String>> completableFuture = allOf(futureList);
List<String> result = completableFuture.get();
LOGGER.info("end - result:{}", result);
}

public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult = CompletableFuture
.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
return allFuturesResult.thenApply(v ->
futuresList.stream().map(CompletableFuture::join).collect(Collectors.<T>toList())
);
}

2.9.2. anyOf

任意一个CompletableFuture执行完后就会执行计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void testAnyOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 3500));
result = shardedJedisOperate.set("24-key", "24-value");
result += "000";
LOGGER.info("result-1:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-1....");
return result;
}, threadPoolExecutor);

CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
String result = StringUtils.EMPTY;
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
result = shardedJedisOperate.set("25-key", "25-value");
result += "222";
LOGGER.info("result-2:{}", result);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("....end-2....");
return result;
}, threadPoolExecutor);

CompletableFuture<Object> anyOfResult = CompletableFuture.anyOf(supplyAsync1, supplyAsync2);
Object result = anyOfResult.get();
LOGGER.info("end result:{}", result);
}

3. 使用示例

项目中,需要查询页面的菜单和功能信息;为了提高性能,可以使用CompletableFuture并行查询。

虽然可以并行查询菜单和功能信息,但是接口逻辑同步的,需要等待菜单、功能信息全部查询完毕,才能返回结果数据。

因此,阻塞等待两个线程都完成了以后,再返回数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 获取 菜单、功能 信息
*
* @param roleKey 角色key
* @param roleMenuFeatureBO
*/
private void getMenuAndFeatureInfo(String roleKey, RoleMenuFeatureBO roleMenuFeatureBO) {
//查询菜单
CompletableFuture<Map<String, Object>> menusFuture = menuService.getMenuCompletableFuture(roleKey);
//查询功能
CompletableFuture<Map<String, Object>> featuresFuture = featureService.getFeatureCompletableFuture(roleKey);

//阻塞,等待所有future完成,获取结果
Map<String, Object> collect = allOfGetMap(new ArrayList<>(Arrays.asList(menusFuture, featuresFuture)));

roleMenuFeatureBO.setMenus((List<MenuBO>) collect.get(MENUS_STR));
roleMenuFeatureBO.setFeatures((List<FeatureBO>) collect.get(FEATURES_STR));
}

@Override
public CompletableFuture<Map<String, Object>> getMenuCompletableFuture(String roleKey) {
return CompletableFuture.supplyAsync(() -> {
Map<String, Object> datas = new HashMap<>();
this.queryByRoleKey(roleKey).ifPresent(menuBoS -> {
datas.put(MENUS_STR, menuBoS);
});
return datas;
}, toolThreadPool);
}

/**
* 等待future全部完成后,聚合所有future的返回值
* <p>
* API:{@link CompletableFuture#allOf}
* <p>
* 使用场景:
*
* <blockquote>
* 多个任务,可以异步执行(没有业务关联),但是返回类型不同。<br/>
* 在异步任务中包装Map,将相同类型的返回值使用一样的key,最后get后,根据Key进行强转
* </blockquote>
*
* @param futuresList
* @return map
*/
public static Map<String, Object> allOfGetMap(final List<CompletableFuture<Map<String, Object>>> futuresList) {
CompletableFuture<List<Map<String, Object>>> resultFuture = allOf(futuresList);
List<Map<String, Object>> getResult = Collections.emptyList();
try {
getResult = resultFuture.get();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return toMap(getResult);
}

4. Reference