做网站用的主机多少合适,discuz下载官网,中英文网站建设方案,下面哪些是用作电子商务网站开发文章目录 1 为何需要并行加载2 并行加载的实现方式2.1 同步模型2.2 NIO异步模型2.3 为什么会选择CompletableFuture#xff1f; 3 CompletableFuture使用与原理3.1 CompletableFuture的背景和定义3.1.1 CompletableFuture解决的问题3.1.2 CompletableFuture的定义 3.2 Complet… 文章目录 1 为何需要并行加载2 并行加载的实现方式2.1 同步模型2.2 NIO异步模型2.3 为什么会选择CompletableFuture 3 CompletableFuture使用与原理3.1 CompletableFuture的背景和定义3.1.1 CompletableFuture解决的问题3.1.2 CompletableFuture的定义 3.2 CompletableFuture的使用3.2.1 零依赖CompletableFuture的创建3.2.2 一元依赖依赖一个CF3.2.3 二元依赖依赖两个CF3.2.4 多元依赖依赖多个CF 3.3 CompletableFuture原理3.3.1 CompletableFuture的设计思想3.3.2 整体流程3.3.3 小结 4 实践总结4.1 线程阻塞问题4.1.1 代码执行在哪个线程上 4.2 线程池须知4.2.1 异步回调要传线程池4.2.2 线程池循环引用会导致死锁4.2.3 异步RPC调用注意不要阻塞IO线程池 4.3 其他4.3.1 异常处理4.3.2 沉淀的工具方法介绍 5 异步化收益6 参考文献7 名词解释及备注附录自定义函数CompletableFuture处理工具类异常提取工具类打印日志日志处理实现类打印日志方式异常情况返回默认值默认返回值应用示例 1 为何需要并行加载
例如: 外卖商家端API服务是典型的I/O密集型I/O Bound服务。除此之外外卖商家端交易业务还有两个比较大的特点
服务端必须一次返回订单卡片所有内容根据商家端和服务端的“增量同步协议注1”服务端必须一次性返回订单的所有信息包含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付参照美团或饿了么订单卡片截图等需要从下游三十多个服务中获取数据。在特定条件下如第一次登录和长时间没登录的情况下客户端会分页拉取多个订单这样发起的远程调用会更多。商家端和服务端交互频繁商家对订单状态变化敏感多种推拉机制保证每次变更能够触达商家导致App和服务端的交互频繁每次变更需要拉取订单最新的全部内容。
在外卖交易链路如此大的流量下为了保证商家的用户体验保证接口的高性能并行从下游获取数据就成为必然。
2 并行加载的实现方式
并行从下游获取数据从IO模型上来讲分为同步模型和异步模型。
2.1 同步模型
从各个服务获取数据最常见的是同步调用如下图所示 在同步调用的场景下接口耗时长、性能差接口响应时长T T1T2T3……Tn这时为了缩短接口的响应时间一般会使用线程池的方式并行获取数据商家端订单卡片的组装正是使用了这种方式。 这种方式由于以下两个原因导致资源利用率比较低
CPU资源大量浪费在阻塞等待上导致CPU资源利用率低。在Java 8之前一般会通过回调的方式来减少阻塞但是大量使用回调又引发臭名昭著的回调地狱问题导致代码可读性和可维护性大大降低。为了增加并发度会引入更多额外的线程池随着CPU调度线程数的增加会导致更严重的资源争用宝贵的CPU资源被损耗在上下文切换上而且线程本身也会占用系统资源且不能无限增加。
同步模型下会导致硬件资源无法充分利用系统吞吐量容易达到瓶颈。
2.2 NIO异步模型
我们主要通过以下两种方式来减少线程池的调度开销和阻塞时间
通过RPC NIO异步调用的方式可以降低线程数从而降低调度上下文切换开销如Dubbo的异步调用可以参考《dubbo调用端异步》一文。通过引入CompletableFuture下文简称CF对业务流程进行编排降低依赖之间的阻塞。本文主要讲述CompletableFuture的使用和原理。
2.3 为什么会选择CompletableFuture
我们首先对业界广泛流行的解决方案做了横向调研主要包括Future、CompletableFuture注2、RxJava、Reactor。它们的特性对比如下
FutureCompletableFutureRxJavaReactorComposable可组合❌✔️✔️✔️Asynchronous异步✔️✔️✔️✔️Operator fusion操作融合❌❌✔️✔️Lazy延迟执行❌❌✔️✔️Backpressure回压❌❌✔️✔️
可组合可以将多个依赖操作通过不同的方式进行编排例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法这些方法就是对“可组合”特性的支持。操作融合将数据流中使用的多个操作符以某种方式结合起来进而降低开销时间、内存。延迟执行操作不会立即执行当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时才会触发操作。回压某些异步阶段的处理速度跟不上直接失败会导致大量数据的丢失对业务来说是不能接受的这时需要反馈上游生产者降低调用量。
RxJava与Reactor显然更加强大它们提供了更多的函数调用方式支持更多特性但同时也带来了更大的学习成本。而我们本次整合最需要的特性就是“异步”、“可组合”综合考虑后我们选择了学习成本相对较低的CompletableFuture。
3 CompletableFuture使用与原理
3.1 CompletableFuture的背景和定义
3.1.1 CompletableFuture解决的问题
CompletableFuture是由Java 8引入的在Java8之前我们一般通过Future实现异步。
Future用于表示异步计算的结果只能通过阻塞或者轮询的方式获取结果而且不支持设置回调方法Java 8之前若要设置回调一般会使用guava的ListenableFuture回调的引入又会导致臭名昭著的回调地狱下面的例子会通过ListenableFuture的使用来具体进行展示。CompletableFuture对Future进行了扩展可以通过设置回调的方式处理计算结果同时也支持组合操作支持进一步的编排同时一定程度解决了回调地狱的问题。
下面将举例来说明我们通过ListenableFuture、CompletableFuture来实现异步的差异。假设有三个操作step1、step2、step3存在依赖关系其中step3的执行依赖step1和step2的结果。
Future(ListenableFuture)的实现回调地狱如下
ExecutorService executor Executors.newFixedThreadPool(5);
ListeningExecutorService guavaExecutor MoreExecutors.listeningDecorator(executor);
ListenableFutureString future1 guavaExecutor.submit(() - {//step 1System.out.println(执行step 1);return step1 result;
});
ListenableFutureString future2 guavaExecutor.submit(() - {//step 2System.out.println(执行step 2);return step2 result;
});
ListenableFutureListString future1And2 Futures.allAsList(future1, future2);
Futures.addCallback(future1And2, new FutureCallbackListString() {Overridepublic void onSuccess(ListString result) {System.out.println(result);ListenableFutureString future3 guavaExecutor.submit(() - {System.out.println(执行step 3);return step3 result;});Futures.addCallback(future3, new FutureCallbackString() {Overridepublic void onSuccess(String result) {System.out.println(result);} Overridepublic void onFailure(Throwable t) {}}, guavaExecutor);}Overridepublic void onFailure(Throwable t) {}}, guavaExecutor);CompletableFuture的实现如下
ExecutorService executor Executors.newFixedThreadPool(5);
CompletableFutureString cf1 CompletableFuture.supplyAsync(() - {System.out.println(执行step 1);return step1 result;
}, executor);
CompletableFutureString cf2 CompletableFuture.supplyAsync(() - {System.out.println(执行step 2);return step2 result;
});
cf1.thenCombine(cf2, (result1, result2) - {System.out.println(result1 , result2);System.out.println(执行step 3);return step3 result;
}).thenAccept(result3 - System.out.println(result3));显然CompletableFuture的实现更为简洁可读性更好。
3.1.2 CompletableFuture的定义 CompletableFuture实现了两个接口如上图所示Future、CompletionStage。Future表示异步计算的结果CompletionStage用于表示异步执行过程中的一个步骤Stage这个步骤可能是由另外一个CompletionStage触发的随着当前步骤的完成也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合CompletionStage接口正是定义了这样的能力我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
3.2 CompletableFuture的使用
下面我们通过一个例子来讲解CompletableFuture如何使用使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行 如上图所示这里描绘的是一个业务接口的流程其中包括CF1\CF2\CF3\CF4\CF5共5个步骤并描绘了这些步骤之间的依赖关系每个步骤可以是一次RPC调用、一次数据库操作或者是一次本地方法调用等在使用CompletableFuture进行异步化编程时图中的每个步骤都会产生一个CompletableFuture对象最终结果也会用一个CompletableFuture来进行表示。
根据CompletableFuture依赖数量可以分为以下几类零依赖、一元依赖、二元依赖和多元依赖。
3.2.1 零依赖CompletableFuture的创建
我们先看下如何不依赖其他CompletableFuture来创建新的CompletableFuture 如上图红色链路所示接口接收到请求后首先发起两个异步调用CF1、CF2主要有三种方式
ExecutorService executor Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFutureString cf1 CompletableFuture.supplyAsync(() - {return result1;
}, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFutureString cf2 CompletableFuture.completedFuture(result2);
//3、先初始化一个未完成的CompletableFuture然后通过complete()、completeExceptionally()完成该CompletableFuture
CompletableFutureString cf new CompletableFuture();
cf.complete(success);第三种方式的一个典型使用场景就是将回调方法转为CompletableFuture然后再依赖CompletableFure的能力进行调用编排示例如下
FunctionalInterface
public interface ThriftAsyncCall {void invoke() throws TException;
}/*** 该方法为rpc注册监听的封装可以作为其他实现的参照* OctoThriftCallback 为thrift回调方法* ThriftAsyncCall 为自定义函数用来表示一次thrift调用定义如上*/public static T CompletableFutureT toCompletableFuture(final OctoThriftCallback?,T callback , ThriftAsyncCall thriftCall) {//新建一个未完成的CompletableFutureCompletableFutureT resultFuture new CompletableFuture();//监听回调的完成并且与CompletableFuture同步状态callback.addObserver(new OctoObserverT() {Overridepublic void onSuccess(T t) {resultFuture.complete(t);}Overridepublic void onFailure(Throwable throwable) {resultFuture.completeExceptionally(throwable);}});if (thriftCall ! null) {try {thriftCall.invoke();} catch (TException e) {resultFuture.completeExceptionally(e);}}return resultFuture;}3.2.2 一元依赖依赖一个CF 如上图红色链路所示CF3CF5分别依赖于CF1和CF2这种对于单个CompletableFuture的依赖可以通过thenApply、thenAccept、thenCompose等方法来实现代码如下所示
CompletableFutureString cf3 cf1.thenApply(result1 - {//result1为CF1的结果//......return result3;
});
CompletableFutureString cf5 cf2.thenApply(result2 - {//result2为CF2的结果//......return result5;
});3.2.3 二元依赖依赖两个CF 如上图红色链路所示CF4同时依赖于两个CF1和CF2这种二元依赖可以通过thenCombine等回调来实现如下代码所示
CompletableFutureString cf4 cf1.thenCombine(cf2, (result1, result2) - {//result1和result2分别为cf1和cf2的结果return result4;
});3.2.4 多元依赖依赖多个CF 如上图红色链路所示整个流程的结束依赖于三个步骤CF3、CF4、CF5这种多元依赖可以通过allOf或anyOf方法来实现区别是当需要多个依赖全部完成时使用allOf当多个依赖中的任意一个完成即可时使用anyOf如下代码所示
CompletableFutureVoid cf6 CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFutureString result cf6.thenApply(v - {//这里的join并不会阻塞因为传给thenApply的函数是在CF3、CF4、CF5全部完成时才会执行 。result3 cf3.join();result4 cf4.join();result5 cf5.join();//根据result3、result4、result5组装最终result;return result;
});3.3 CompletableFuture原理
CompletableFuture中包含两个字段result和stack。result用于存储当前CF的结果stackCompletion表示当前CF完成后需要触发的依赖动作Dependency Actions去触发依赖它的CF的计算依赖动作可以有多个表示有多个依赖它的CF以栈Treiber stack的形式存储stack表示栈顶元素。 这种方式类似“观察者模式”依赖动作Dependency Action都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类Completion本身是观察者的基类。
UniCompletion继承了Completion是一元依赖的基类例如thenApply的实现类UniApply就继承自UniCompletion。BiCompletion继承了UniCompletion是二元依赖的基类同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。 3.3.1 CompletableFuture的设计思想
按照类似“观察者模式”的设计思想原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多但结构差异不大所以这里单以一元依赖中的thenApply为例不再枚举全部回调类型。如下图所示 3.3.1.1 被观察者
每个CompletableFuture都可以被看作一个被观察者其内部有一个Completion类型的链表成员变量stack用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。被观察者CF中的result属性用来存储返回结果数据。这里可能是一次RPC调用的返回值也可能是任意对象在上面的例子中对应步骤fn1的执行结果。
3.3.1.2 观察者
CompletableFuture支持很多回调方法例如thenAccept、thenApply、exceptionally等这些方法接收一个函数类型的参数f生成一个Completion类型的对象即观察者并将入参函数f赋值给Completion的成员变量fn然后检查当前CF是否已处于完成状态即result ! null如果已完成直接触发fn否则将观察者Completion加入到CF的观察者链stack中再次尝试触发如果被观察者未执行完则其执行完毕之后通知触发。
观察者中的dep属性指向其对应的CompletableFuture在上面的例子中dep指向CF2。观察者中的src属性指向其依赖的CompletableFuture在上面的例子中src指向CF1。观察者Completion中的fn属性用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法thenAccept、thenApply、exceptionally等接收的函数类型也不同即fn的类型有很多种在上面的例子中fn指向fn2。
3.3.2 整体流程
3.3.2.1 一元依赖
这里仍然以thenApply为例来说明一元依赖的流程
将观察者Completion注册到CF1此时CF1将Completion压栈。当CF1的操作运行完成时会将结果赋值给CF1中的result属性。依次弹栈通知观察者尝试运行。 初步流程设计如上图所示这里有几个关于注册与通知的并发问题大家可以思考下
Q1在观察者注册之前如果CF已经执行完成并且已经发出通知那么这时观察者由于错过了通知是不是将永远不会被触发呢 A1不会。在注册时检查依赖的CF是否已经完成。如果未完成即result null则将观察者入栈如果已完成result ! null则直接触发观察者操作。
Q2在”入栈“前会有”result null“的判断这两个操作为非原子操作CompletableFufure的实现也没有对两个操作进行加锁完成时间在这两个操作之间观察者仍然得不到通知是不是仍然无法触发 A2不会。入栈之后再次检查CF是否完成如果完成则触发。
Q3当依赖多个CF时观察者会被压入所有依赖的CF的栈中每个CF完成的时候都会进行那么会不会导致一个操作被多次执行呢 如下图所示即当CF1、CF2同时完成时如何避免CF3被多次触发。 A3CompletableFuture的实现是这样解决该问题的观察者在执行之前会先通过CAS操作设置一个状态位将status由0改为1。如果观察者已经执行过了那么CAS操作将会失败取消执行。
通过对以上3个问题的分析可以看出CompletableFuture在处理并行问题时全程无加锁操作极大地提高了程序的执行效率。我们将并行问题考虑纳入之后可以得到完善的整体流程图如下所示 CompletableFuture支持的回调方法十分丰富但是正如上一章节的整体流程图所述他们的整体流程是一致的。所有回调复用同一套流程架构不同的回调监听通过策略模式实现差异化。
3.3.2.2 二元依赖
我们以thenCombine为例来说明二元依赖 thenCombine操作表示依赖两个CompletableFuture。其观察者实现类为BiApply如上图所示BiApply通过src和snd两个属性关联被依赖的两个CFfn属性的类型为BiFunction。与单个依赖不同的是在依赖的CF未完成的情况下thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中每个被依赖的CF完成时都会尝试触发观察者BiApplyBiApply会检查两个依赖是否都完成如果完成则开始执行。这里为了解决重复触发的问题同样用的是上一章节提到的CAS操作执行时会先通过CAS设置状态位避免重复触发。
3.3.2.3 多元依赖
依赖多个CompletableFuture的回调方法包括allOf、anyOf区别在于allOf观察者实现类为BiRelay需要所有被依赖的CF完成后才会执行回调而anyOf观察者实现类为OrRelay任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树执行结果层层通知直到根节点触发回调监听。 3.3.3 小结
本章节为CompletableFuture实现原理的科普旨在尝试不粘贴源码而通过结构图、流程图以及搭配文字描述把CompletableFuture的实现原理讲述清楚。把晦涩的源码翻译为“整体流程”章节的流程图并且将并发处理的逻辑融入便于大家理解。
4 实践总结
在商家端API异步化的过程中我们遇到了一些问题这些问题有的会比较隐蔽下面把这些问题的处理经验整理出来。希望能帮助到更多的同学大家可以少踩一些坑。
4.1 线程阻塞问题
4.1.1 代码执行在哪个线程上
要合理治理线程资源最基本的前提条件就是要在写代码时清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。
CompletableFuture实现了CompletionStage接口通过丰富的回调方法支持各种组合操作每种组合场景都有同步和异步两种方法。
同步方法即不带Async后缀的方法有两种情况。
如果注册时被依赖的操作已经执行完成则直接由当前线程执行。如果注册时被依赖的操作还未执行完则由回调线程执行。
异步方法即带Async后缀的方法可以选择是否传递线程池参数Executor运行在指定线程池中当不传递Executor时会使用ForkJoinPool中的共用线程池CommonPoolCommonPool的大小是CPU核数-1如果是IO密集的应用线程数可能成为瓶颈。
例如
ExecutorService threadPool1 new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100));
CompletableFutureString future1 CompletableFuture.supplyAsync(() - {System.out.println(supplyAsync 执行线程 Thread.currentThread().getName());//业务操作return ;
}, threadPool1);
//此时如果future1中的业务操作已经执行完毕并返回则该thenApply直接由当前main线程执行否则将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value - {System.out.println(thenApply 执行线程 Thread.currentThread().getName());return value 1;
});
//使用ForkJoinPool中的共用线程池CommonPool
future1.thenApplyAsync(value - {
//do somethingreturn value 1;
});
//使用指定线程池
future1.thenApplyAsync(value - {
//do somethingreturn value 1;
}, threadPool1);4.2 线程池须知
4.2.1 异步回调要传线程池
前面提到异步回调方法可以选择是否传递线程池参数Executor这里我们建议强制传线程池且根据实际情况做线程池隔离。
当不传递线程池时会使用ForkJoinPool中的公共线程池CommonPool这里所有调用将共用该线程池核心线程数处理器数量-1单核核心线程数为1所有异步回调都会共用该CommonPool核心与非核心业务都竞争同一个池中的线程很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数并且可以给不同的业务分配不同的线程池以求资源隔离减少不同业务之间的相互干扰。
4.2.2 线程池循环引用会导致死锁
public Object doGet() {ExecutorService threadPool1 new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100));CompletableFuture cf1 CompletableFuture.supplyAsync(() - {//do sthreturn CompletableFuture.supplyAsync(() - {System.out.println(child);return child;}, threadPool1).join();//子任务}, threadPool1);return cf1.join();
}如上代码块所示doGet方法第三行通过supplyAsync向threadPool1请求线程并且内部子任务又向threadPool1请求线程。threadPool1大小为10当同一时刻有10个请求到达则threadPool1被打满子任务请求线程时进入阻塞队列排队但是父任务的完成又依赖于子任务这时由于子任务得不到线程父任务无法完成。主线程执行cf1.join()进入阻塞状态并且永远无法恢复。
为了修复该问题需要将父任务与子任务做线程池隔离两个任务请求不同的线程池避免循环依赖导致的阻塞。
4.2.3 异步RPC调用注意不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步RPC调用的结果这时需要特别注意一点如果是使用基于NIO比如Netty的异步RPC则返回结果是由IO线程负责设置的即回调方法由IO线程触发CompletableFuture同步回调如thenApply、thenAccept等无Async后缀的方法如果依赖的异步RPC调用的返回结果那么这些同步回调将运行在IO线程上而整个服务只有一个IO线程池这时需要保证同步回调中不能有阻塞等耗时过长的逻辑否则在这些逻辑执行完成前IO线程将一直被占用影响整个服务的响应。
4.3 其他
4.3.1 异常处理
由于异步执行的任务在其他线程上执行而异常信息存储在线程栈中因此当前线程除非阻塞等待返回结果否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally相当于同步调用中的try\catch。使用方法如下所示
Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFutureInteger getCancelTypeAsync(long orderId) {CompletableFutureWmOrderOpRemarkResult remarkResultFuture wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法内部会发起异步rpc调用return remarkResultFuture.exceptionally(err - {//通过exceptionally 捕获异常打印日志并返回默认值log.error(WmOrderRemarkService.getCancelTypeAsync Exception orderId{}, orderId, err);return 0;});
}有一点需要注意CompletableFuture在回调方法中对异常进行了包装。大部分异常会封装成CompletionException后抛出真正的异常存储在cause属性中因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常。但是有些情况下会直接返回真正的异常Stack Overflow的讨论最好使用工具类提取异常如下代码所示
Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFutureInteger getCancelTypeAsync(long orderId) {CompletableFutureWmOrderOpRemarkResult remarkResultFuture wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法内部会发起异步rpc调用return remarkResultFuture.thenApply(result - {//这里增加了一个回调方法thenApply如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装//这里是一些业务操作}).exceptionally(err - {//通过exceptionally 捕获异常这里的err已经被thenApply包装过因此需要通过Throwable.getCause()提取异常log.error(WmOrderRemarkService.getCancelTypeAsync Exception orderId{}, orderId, ExceptionUtils.extractRealException(err));return 0;});
}上面代码中用到了一个自定义的工具类ExceptionUtils用于CompletableFuture的异常提取在使用CompletableFuture做异步编程时可以直接使用该工具类处理异常。实现代码如下
public class ExceptionUtils {public static Throwable extractRealException(Throwable throwable) {//这里判断异常类型是否为CompletionException、ExecutionException如果是则进行提取否则直接返回。if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {if (throwable.getCause() ! null) {return throwable.getCause();}}return throwable;}
}4.3.2 沉淀的工具方法介绍
在实践过程中我们沉淀了一些通用的工具方法在使用CompletableFuture开发时可以直接拿来使用详情参见“附录”。
5 异步化收益
通过异步化改造API系统的性能得到明显提升与改造前对比的收益如下
核心接口吞吐量大幅提升其中订单轮询接口改造前TP99为754ms改造后降为408ms。服务器数量减少1/3。
6 参考文献
CompletableFuture (Java Platform SE 8 )java - Does CompletionStage always wrap exceptions in CompletionException? - Stack Overflowexception - Surprising behavior of Java 8 CompletableFuture exceptionally method - Stack Overflow文档 | Apache Dubbo
7 名词解释及备注
注1“增量同步”是指商家客户端与服务端之间的订单增量数据同步协议客户端使用该协议获取新增订单以及状态发生变化的订单。
注2本文涉及到的所有技术点依赖的Java版本为JDK 8CompletableFuture支持的特性分析也是基于该版本。
附录
自定义函数
FunctionalInterface
public interface ThriftAsyncCall {void invoke() throws TException ;
}CompletableFuture处理工具类
/*** CompletableFuture封装工具类*/
Slf4j
public class FutureUtils {
/*** 该方法为rpc注册监听的封装可以作为其他实现的参照* OctoThriftCallback 为thrift回调方法* ThriftAsyncCall 为自定义函数用来表示一次thrift调用定义如上*/
public static T CompletableFutureT toCompletableFuture(final OctoThriftCallback?,T callback , ThriftAsyncCall thriftCall) {CompletableFutureT thriftResultFuture new CompletableFuture();callback.addObserver(new OctoObserverT() {Overridepublic void onSuccess(T t) {thriftResultFuture.complete(t);}Overridepublic void onFailure(Throwable throwable) {thriftResultFuture.completeExceptionally(throwable);}});if (thriftCall ! null) {try {thriftCall.invoke();} catch (TException e) {thriftResultFuture.completeExceptionally(e);}}return thriftResultFuture;
}/*** 设置CF状态为失败*/public static T CompletableFutureT failed(Throwable ex) {CompletableFutureT completableFuture new CompletableFuture();completableFuture.completeExceptionally(ex);return completableFuture;}/*** 设置CF状态为成功*/public static T CompletableFutureT success(T result) {CompletableFutureT completableFuture new CompletableFuture();completableFuture.complete(result);return completableFuture;}/*** 将ListCompletableFutureT 转为 CompletableFutureListT*/public static T CompletableFutureListT sequence(CollectionCompletableFutureT completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));}/*** 将ListCompletableFutureListT 转为 CompletableFutureListT* 多用于分页查询的场景*/public static T CompletableFutureListT sequenceList(CollectionCompletableFutureListT completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().flatMap( listFuture - listFuture.join().stream()).collect(Collectors.toList()));}/** 将ListCompletableFutureMapK, V 转为 CompletableFutureMapK, V* Param mergeFunction 自定义key冲突时的merge策略*/public static K, V CompletableFutureMapK, V sequenceMap(CollectionCompletableFutureMapK, V completableFutures, BinaryOperatorV mergeFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().map(CompletableFuture::join).flatMap(map - map.entrySet().stream()).collect(Collectors.toMap(Entry::getKey, Entry::getValue, mergeFunction)));}/*** 将ListCompletableFutureT 转为 CompletableFutureListT并过滤调null值*/public static T CompletableFutureListT sequenceNonNull(CollectionCompletableFutureT completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().map(CompletableFuture::join).filter(e - e ! null).collect(Collectors.toList()));}/*** 将ListCompletableFutureListT 转为 CompletableFutureListT并过滤调null值* 多用于分页查询的场景*/public static T CompletableFutureListT sequenceListNonNull(CollectionCompletableFutureListT completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().flatMap( listFuture - listFuture.join().stream().filter(e - e ! null)).collect(Collectors.toList()));}/*** 将ListCompletableFutureMapK, V 转为 CompletableFutureMapK, V* Param filterFunction 自定义过滤策略*/public static T CompletableFutureListT sequence(CollectionCompletableFutureT completableFutures,Predicate? super T filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().map(CompletableFuture::join).filter(filterFunction).collect(Collectors.toList()));}/*** 将ListCompletableFutureListT 转为 CompletableFutureListT* Param filterFunction 自定义过滤策略*/public static T CompletableFutureListT sequenceList(CollectionCompletableFutureListT completableFutures,Predicate? super T filterFunction) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().flatMap( listFuture - listFuture.join().stream().filter(filterFunction)).collect(Collectors.toList()));}
/*** 将CompletableFutureMapK,V的list转为 CompletableFutureMapK,V。 多个map合并为一个map。 如果key冲突采用新的value覆盖。*/public static K, V CompletableFutureMapK, V sequenceMap(CollectionCompletableFutureMapK, V completableFutures) {return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture?[0])).thenApply(v - completableFutures.stream().map(CompletableFuture::join).flatMap(map - map.entrySet().stream()).collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) - b)));}}异常提取工具类 public class ExceptionUtils {/*** 提取真正的异常*/public static Throwable extractRealException(Throwable throwable) {if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {if (throwable.getCause() ! null) {return throwable.getCause();}}return throwable;}}打印日志 Slf4jpublic abstract class AbstractLogActionR {protected final String methodName;protected final Object[] args;
public AbstractLogAction(String methodName, Object... args) {this.methodName methodName;this.args args;
}
protected void logResult(R result, Throwable throwable) {if (throwable ! null) {boolean isBusinessError throwable instanceof TBase || (throwable.getCause() ! null throwable.getCause() instanceof TBase);if (isBusinessError) {logBusinessError(throwable);} else if (throwable instanceof DegradeException || throwable instanceof DegradeRuntimeException) {//这里为内部rpc框架抛出的异常使用时可以酌情修改if (RhinoSwitch.getBoolean(isPrintDegradeLog, false)) {log.error({} degrade exception, param:{} , error:{}, methodName, args, throwable);}} else {log.error({} unknown error, param:{} , error:{}, methodName, args, ExceptionUtils.extractRealException(throwable));}} else {if (isLogResult()) {log.info({} param:{} , result:{}, methodName, args, result);} else {log.info({} param:{}, methodName, args);}}
}
private void logBusinessError(Throwable throwable) {log.error({} business error, param:{} , error:{}, methodName, args, throwable.toString(), ExceptionUtils.extractRealException(throwable));
}
private boolean isLogResult() {//这里是动态配置开关用于动态控制日志打印开源动态配置中心可以使用nacos、apollo等如果项目没有使用配置中心则可以删除return RhinoSwitch.getBoolean(methodName _isLogResult, false);
}}日志处理实现类
/*** 发生异常时根据是否为业务异常打印日志。* 跟CompletableFuture.whenComplete配合使用不改变completableFuture的结果正常OR异常*/
Slf4j
public class LogErrorActionR extends AbstractLogActionR implements BiConsumerR, Throwable {
public LogErrorAction(String methodName, Object... args) {super(methodName, args);
}
Override
public void accept(R result, Throwable throwable) {logResult(result, throwable);
}
}打印日志方式
completableFuture
.whenComplete(new LogErrorAction(orderService.getOrder, params));异常情况返回默认值
/*** 当发生异常时返回自定义的值*/
public class DefaultValueHandleR extends AbstractLogActionR implements BiFunctionR, Throwable, R {private final R defaultValue;
/*** 当返回值为空的时候是否替换为默认值*/
private final boolean isNullToDefault;
/*** param methodName 方法名称* param defaultValue 当异常发生时自定义返回的默认值* param args 方法入参*/public DefaultValueHandle(String methodName, R defaultValue, Object... args) {super(methodName, args);this.defaultValue defaultValue;this.isNullToDefault false;}
/*** param isNullToDefault* param defaultValue 当异常发生时自定义返回的默认值* param methodName 方法名称* param args 方法入参*/public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {super(methodName, args);this.defaultValue defaultValue;this.isNullToDefault isNullToDefault;}
Override
public R apply(R result, Throwable throwable) {logResult(result, throwable);if (throwable ! null) {return defaultValue;}if (result null isNullToDefault) {return defaultValue;}return result;
}
public static R DefaultValueHandle.DefaultValueHandleBuilderR builder() {return new DefaultValueHandle.DefaultValueHandleBuilder();
}
public static class DefaultValueHandleBuilderR {private boolean isNullToDefault;private R defaultValue;private String methodName;private Object[] args;DefaultValueHandleBuilder() {}public DefaultValueHandle.DefaultValueHandleBuilderR isNullToDefault(final boolean isNullToDefault) {this.isNullToDefault isNullToDefault;return this;}public DefaultValueHandle.DefaultValueHandleBuilderR defaultValue(final R defaultValue) {this.defaultValue defaultValue;return this;}public DefaultValueHandle.DefaultValueHandleBuilderR methodName(final String methodName) {this.methodName methodName;return this;}public DefaultValueHandle.DefaultValueHandleBuilderR args(final Object... args) {this.args args;return this;}public DefaultValueHandleR build() {return new DefaultValueHandleR(this.isNullToDefault, this.defaultValue, this.methodName, this.args);}public String toString() {return DefaultValueHandle.DefaultValueHandleBuilder(isNullToDefault this.isNullToDefault , defaultValue this.defaultValue , methodName this.methodName , args Arrays.deepToString(this.args) );}
}默认返回值应用示例
completableFuture.handle(new DefaultValueHandle(orderService.getOrder, Collections.emptyMap(), params));