背景
最近在重构 KoP 的部分代码,结合之前阅读 Pulsar 的代码,总结了一套 CompletableFuture
的最坏实践(worst practice)。(其实本来想写 Kafka group coordinator 相关的,比较费时间,先鸽了)
本文以实践为主,因此这里先贴出一个我喜欢用的实用方法。
private static void print(String msg) {
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName() + " " + msg);
}
最坏实践 1:缓存 future 然后注册回调
public class Producer {
public void send(String msg) {
print("send " + msg);
}
}
final var producers = new ConcurrentHashMap<String, CompletableFuture<Producer>>();
final var future = new CompletableFuture<Producer>();
producers.computeIfAbsent("A", __ -> future).thenAccept(producer -> producer.send("1"));
producers.computeIfAbsent("A", __ -> future).thenAccept(producer -> producer.send("2"));
future.complete(new Producer());
上述代码模拟 Producer
创建比较费时,但是为了防止阻塞当前线程,同时又避免为同一个 key 重复创建 Producer
,往往会缓存 future,然后从缓存中取出 future 调用 thenXxx
注册回调。
上述代码输出顺序依次是 2 和 1,也就是逆序输出。 惊不惊喜意不意外?
原因是 thenAccept
方法最终会调用 uniPush
:
final void unipush(Completion c) {
if (c != null) {
while (!tryPushStack(c)) {
Completion
对象封装了回调函数,而 tryPushStack
正如字面意思,它将 c
入栈,因此 future 没有完成时,回调是先进后出。
正常来说,要注册多个回调的正确手法是链式调用,比如 future.thenAccept(f1).thenAccept(f2)
,因为每个 thenXxx
都会返回一个 future。但是这样做就不能缓存 future 了。
当然,这个最坏实践的后半句不可忽略,如果不注册回调,实际上没什么风险,比如 Pulsar 的 ServerCnx
缓存了一组 producer future:
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
但在 handleSend
处理 Send
请求时是这样使用的:
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
/* ... */
close();
return;
}
Producer producer = producerFuture.getNow(null);
future 没有完成就直接关闭连接。完成了就把它取出来。
最坏实践 2:无脑调用 Async 方法注册回调
有时候我们不想用想要在单线程中执行回调以保证回调执行的顺序性,或者通过这种方式来避免竞态条件。此时会用 xxxAsync
的第二个参数指定一个 executor,往往是单线程的 executor。
final var executor = Executors.newSingleThreadExecutor();
final var future = new CompletableFuture<Integer>();
final var future2 = future.thenApplyAsync(i -> {
print("1: " + i);
return i + 10;
}, executor).thenApplyAsync(i -> {
print("2: " + i);
return i + 100;
}, executor).thenAcceptAsync(i -> {
print("3: " + i);
}, executor);
future.complete(43);
future2.join();
executor.shutdown();
输出:
1686839486255 pool-1-thread-1 1: 43
1686839486282 pool-1-thread-1 2: 53
1686839486282 pool-1-thread-1 3: 153
可以看到回调都在一个线程里执行,正确性是有保障的。因此很多人写代码无脑在 Async 方法传个 executor 就完事。
从正确性讲,没什么问题,但是,future 的这些回调执行的时候,是通过 Executor#execute
执行的,见 CompletableFuture#uniApplyNow
方法:
if (e != null) {
e.execute(new UniApply<T,V>(null, d, this, f));
} else {
@SuppressWarnings("unchecked") T t = (T) r;
d.result = d.encodeValue(f.apply(t));
}
上述代码 e
是 Executor
,即 xxxAsync
方法指定的。如果是用对应的不带 Async
的方法,e
是 null
。明明前一个 future 就是在目标线程中完成的,不直接继续执行而是选择丢给 Executor
去执行。无脑把任务扔给线程池多了,就别怪负载大的情况下 CPU 占用高了。
最坏实践 3:直接检查 exceptionally 异常类型
final var future = new CompletableFuture<Integer>();
final var future2 = future.thenAccept(i -> print("msg-" + i)).exceptionally(e -> {
if (e instanceof MyException) {
print("The exception is expected");
} else {
print("Unexpected exception " + e.getClass().getName() + ": " + e.getMessage());
}
return null;
});
future.completeExceptionally(new MyException("failed"));
future2.join();
输出:
Unexpected exception java.util.concurrent.CompletionException: org.example.Main$MyException: failed
早期 Pulsar 的 admin API 相当烂,各种 503 错误,真正的错误码根本没被返回给客户端。原因和上述代码一样。调用 exceptionally
的对象是 future.thenAccept
返回的新 future 而非 future
本身,因此 exceptionally
拿到的是 Completion
,需要判断 getCause()
的类型。
那么问题来了,假如 exceptionally
前面有两次 thenXxx
调用呢?
final var future2 = future.thenAccept(i -> print("msg-" + i))
.thenAccept(__ -> print("xxx")).exceptionally(e -> {
if (e.getCause() instanceof MyException) {
事实上上述判断是没问题的,e.getCause()
仍然是 MyException
。但是踩过 Completion
异常这个坑的人,可能会更加谨慎,这里给一段某 ZooKeeper/BookKeeper/Pulsar 三重 PMC 成员的一段代码:
CompletableFuture<Schema> schema = schemaStorage.createSchemaVersion(subject,
payload.schemaType, payload.schema, true);
return schema.thenApply(s -> new CreateSchemaResponse(s.getId())).exceptionally(err -> {
while (err instanceof CompletionException) {
err = err.getCause();
}
都被逼出 while 循环来了。虽然我之前也觉得这个解法挺好的,我就是不懂 CompletableFuture
,不管几层,我全部解除套娃,所以我也在某些代码里学着这么干过了。
最坏实践 4:自信假定回调不抛异常
我在相当一段时间是推崇 whenComplete
的,简单直接,但是有次开会交流时,我听到了一个说辞,那就是假如 whenComplete
抛出异常了,没有兜底的。
final var future = new CompletableFuture<Integer>();
final var future2 = future.whenComplete((i, e) -> {
if (i == 0) {
throw new RuntimeException("failed");
}
});
future.complete(0);
future2.join();
上述代码直接在 join
时抛异常了。所以当时某同事给我推荐的是 thenXxx
和 exceptionally
,比如:
final var future = new CompletableFuture<Integer>();
final var future2 = future.thenAccept(i -> {
if (i == 0) {
throw new RuntimeException("failed");
}
}).exceptionally(e -> {
print(e.getCause().getClass() + ": " + e.getCause().getMessage());
return null;
});
future.complete(0);
future2.join();
输出:
class java.lang.RuntimeException: failed
而且无论是 future
本身异常完成还是 future.thenAccept
的回调抛出异常,exceptionally
都能兜底。我们在 Pulsar 的代码中也看到了很多这样的使用场景。而在 KoP 里我也印象有一次是 whenComplete
内部抛出了异常没有被捕获导致线程挂了,这里再给个比较隐晦的例子:
final var future2 = future.whenComplete((i, e) -> {
if (e == null) {
System.out.println(f(i));
} else {
System.err.println(e.getMessage());
}
});
如果 f(i)
抛出异常,那么这个异常就会反馈到 future2
。某种程度上 exceptionally
类似于 catch (Throwable e)
,对于一般的逻辑,直接捕获 Throwable
显然是糟糕的,但是对于不能够快速失败的场景,这种兜底的异常处理方式还是值得使用的。
总结
一门好的语言往往用起来不需要了解很多细节,CompletableFuture
的出现确实避免了回调地狱。笔者在维护 Pulsar C++ SDK 的过程中深有体会,C++ 只有功能非常贫弱的 std::future
和 std::promise
,所以暴露出的接口都是基于回调的。
然而 CompletableFuture
看似简单,实际上连 Pulsar 的很多核心开发者都没搞透,像 Java SDK 那样暴露基于 CompletableFuture
异步 API 给用户去用,可能还并没有传一个回调来得简单。当然,如果有了 async
/await
语义,异步 API 会更加好用。
转载于:https://mp.weixin.qq.com/s/n-TWHFvIXEhNFujoQ6867Q