我不是很理解,為什么越來越多的項目打著高性能的旗號,迷信般的使用響應式編程框架,然后把代碼搞的亂七八糟。響應式編程真的那么香么?還是“天下苦響應式編程久已”,在迫害我們的祖國花朵?在我看來,響應式編程至少犯了三宗罪:1. 易造成復雜;2. 調試困難;3. 性能迷霧。 鑒于此,我希望開發同學們在選擇編程范式的時候,能擦亮自己的眼睛,選一個真正適合自己和團隊的編程范式。
罪一、易造成復雜
響應式編程的代碼通常比傳統的命令式編程更復雜。它本質上是回調的封裝,需要將一步一步的操作轉換為一個一個的回調。因為底層采用的是觀察者模式,需要我們把所有的業務操作都注冊到Publisher里面,然后通過通知的模式去接收數據流動。為了發揮異步的效用,這根鏈條不能斷,這就導致開發人員很容易寫出有很多的點、點、點、點….可讀性差、易出錯的代碼。如下所示,這是一段真實的項目代碼示例:
private Mono<HyperClusterSwitch> ensureSwitchConfigured(String parentJobId, HyperClusterPort port) {
String switchIp = Optional.ofNullable(port.getLocation()).map(HyperClusterPort.Location::getSwitchIp)
.orElse(null);
Assert.hasText(switchIp, String.format("switchIp of port %s is blank", port.getId()));
return Mono.fromCallable(() -> {
HyperClusterSubnet subnet = subnetRepository.select(port.getHyperClusterSubnetId());
if (Objects.isNull(subnet)) {
String message = String.format("get subnet %s from redis failed", port.getHyperClusterSubnetId());
log.error(message);
throw new XlinkException(message);
}
return subnet;
}).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 并發優化
.doOnError(e -> log.error("XLINK.ALARM: get subnet {} from redis failed",
port.getHyperClusterSubnetId(), e))
.flatMap(subnet -> Mono
.fromCallable(() -> switchInfoRepository.select(port.getHyperClusterSubnetId(), switchIp))
.switchIfEmpty(Mono.fromCallable(() -> {
// switch加鎖
String switchIpLock = String.format("xlink:hyper_cluster_switch:%s", switchIp);
redisLock.lock(switchIpLock, port.getId());
HyperClusterSwitch switchInfo = new HyperClusterSwitch();
switchInfo.setSwitchIp(switchIp);
switchInfo.setSwitchType(port.getLocation().getSwitchType());
// 為新關聯的tor交換機分配vlan
Integer vlanId = allocateVlan(switchInfo);
switchInfo.setVlanId(vlanId);
// 將分配的vlan寫入redis
switchInfoRepository.update(port.getHyperClusterSubnetId(), switchInfo);
// 釋放switch鎖
redisLock.unlock(switchIpLock, port.getId());
return switchInfo;
}).flatMap(switchInfo -> {
// 上報vlan分配結果到manager, 下發交換機本地vlan配置
return reportVlanToManager(port.getHyperClusterSubnetId(), switchInfo)
.then(sendSwitchAclConfigMsg(parentJobId, Constant.CREATE, switchInfo, subnet))
.then(sendSwitchVlanifConfigMsg(parentJobId, Constant.CREATE, switchInfo, subnet))
.thenReturn(switchInfo);
})).retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))
.filter(ex -> ex instanceof XlinkException.LockError)));
}
說實話,這還不算糟糕的,比這個更長、更爛的Reactive代碼比比皆是。可以說,但凡采用Reactive編程的項目,基本就是這樣的調調。WTF!究其背后原因,我想這可能是因為響應式編程鼓勵函數式編程,導致很多應該被對象封裝的邏輯得不到封裝和業務顯性化的表達。從而導致長面條代碼,可讀性可理解性差。另外,因為是鏈式調用,多級回調之間的變量共享和傳遞也是隱式的,不直觀。對于多個變量的傳遞只能用tuple之類的完全沒有業務語義的對象。這樣的代碼從頭貫穿到尾,一環套一環,就像一口氣要唱完一首歌,給人透不過來氣的感覺!再加上Reactive自身有非常多的操作符,其認知成本高和學習曲線長,導致很多同學很難精通,能把邏輯跑通就謝天謝地了,什么clean code、可讀性、面向對象設計統統要給“這玩意”讓路。
就我個人而言,所有導致代碼可讀性、可理解性、可維護性下降的行為都是大罪! 我最不能容忍的也正是響應式編程的這一罪狀。有一說一,我并不排斥函數式,只是要分場景,比如大數據場景下的流式數據處理就非常合適用Reactive風格的函數式編程范式。我反對的是不分青紅皂白的認為這個技術NB(NB是因為我寫的代碼別人看不懂?),濫用響應式編程污染我們的代碼庫。對于大部分的業務代碼而言,用簡單直觀的方式,顯性化的表達業務語義,讓他人能看懂易理解,才是程序員最大的“善”。
罪二、調試困難
在響應式編程中,回調的堆棧里無法看到是誰放置了這個回調。這導致在排查問題時變得非常麻煩,因為無法準確追蹤回調的調用關系。傳統的堆棧,不管是調試時打的斷點,還是日志中的異常棧,都是能看到哪個函數出錯了,并向上逐級回溯調用方。但是響應式編程,在這個callback的堆棧里面是看不到誰放置了這個callback。
比如下面的代碼:
return Mono
.fromSupplier(() -> SingleResponse.of(String.valueOf(current)))
.doOnNext(e -> log.info("before delay: " + new Date()))
.delayElement(Duration.ofSeconds(2)) //模擬業務停頓三秒
.doOnNext(e -> log.info("after delay: " + new Date())) // 斷點處
.doOnNext(e -> {throw new RuntimeException("test");}); // 拋出異常處
如果我在“after delay”上面打上斷點,你將看到下面所示的stack,我根本看不到我的前序步驟是什么,只能看到一大堆“無意義”的框架調用鏈。這種調用上下文的丟失對我們troubleshooting造成了極大的困難。
同樣,對于上面代碼中拋出的Exception,其異常堆棧是這樣的,完全看不到我從哪里來,WTF!
java.lang.RuntimeException: test
at com.huawei.demo.adapter.ChargeController.lambda$pureReactiveTest$3(ChargeController.java:72)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
at reactor.core.publisher.MonoDelayElement$DelayElementSubscriber.lambda$onNext$0(MonoDelayElement.java:125)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
這種丟失調用方上下文的行為,是響應式編程的第二宗罪!
罪三、性能迷霧
使用響應式編程同學的最大理由就是性能提升。關于這一點,我自己親自做了性能測試,事實證明想用好Reactive達到性能提升的目的,也并非易事,需要我們對其底層線程模型有非常深刻的理解,否則性能不僅不會提升還可能惡化。測試的硬件環境不重要,因為主要是對比。軟件是這樣的,Web服務器是Tomcat 9.0.82。壓測工具是用JMeter發起1000個并發,每隔1秒發送一次,總共發送5次。我總共測試了4種情況:
1)情況一,使用普通的Spring MVC
實驗代碼如下:
@GetMapping("pressureTest")
public Response pressureTest() {
long start = System.currentTimeMillis();
log.info("pressureTest : " + start);
sleep("normalPressureTest", 2000); //模擬業務停頓2秒
long end = System.currentTimeMillis();
log.info("Pressure test, use time : " + (end - start));
return SingleResponse.of(String.valueOf(start));
}
我們用線程sleep 2秒來模擬業務處理時間,其測試結果如下。因為Tomcat的默認最大線程數是200,當壓測開始時,200個線程會被全部啟動。因為SpringMVC是thread-per-request模式,所以其處理的極限也就是100/S(因為業務處理需要2s,只有200個線程,所以每秒能處理的最大并發是200/2,也就是100),實測的結果是97/sec,可以理解。平均響應時間是10S怎么理解呢?這是因為服務器雖然同時收到了1000個request,但只有100/sec的處理能力,剩下的都得在緩存里排隊,那么最后排到的那一波,可不就要10s才能返回么。如果并發量再大,超過Tomcat默認最多接收10000個connection的上線,緩存里放不下了,request就會直接被丟掉,或者等待時間過長,導致response time太長,發生TimeOut錯誤。
這里我們如果要做性能優化的話,最簡單的方式就是加大線程數,比如我們可以在application.yml中調整最大線程數到400
server:
port: 8080
tomcat:
threads:
max: 400
按照我們上面的計算邏輯,同樣是sleep 2秒,400個線程的極限值應該是200,實測結果是178/sec,也差不多
2)情況二,使用Spring WebFlux的reactive
接下來,我們把普通的MVC,改成WebFlux,看看情況怎么樣,測試代碼如下:
@GetMapping("reactiveThenTest")
public Mono<Response> reactiveThenTest() {
return Mono.fromCallable(() -> step1())
.doOnNext(i -> {
step2();
})
.doOnNext(i -> {
step3();
})
.thenReturn(Response.buildSuccess());
}
private Mono step1() {
sleep("step1", 600);
return Mono.empty();
}
private Mono step2() {
sleep("step2", 600);
return Mono.empty();
}
private Mono step3() {
sleep("step3", 800);
return Mono.empty();
}
我們把2s拆成3個step,分別讓線程sleep 600ms、600ms和800ms,加起來也是2S。你們覺得吞吐率會怎樣?實測結果如下:
同樣是400個線程的配置,和SpringMVC的并發量基本是一樣的。這是因為我們是直接在exec線程上使用了sleep,而Mono的操作又是同步順序操作的,所以其效果是和SpringMVC一樣的。這就是我說的,如果你不了解WebFlux的底層線程模型,用了Reactive也不一定就能提升性能,甚至還可能導致性能惡化,后面會提到。3)情況三,正確的使用異步處理能力
上面之所以性能沒有提升,是因為我們的sleep操作block了exec線程,導致異步能力不能發揮,正確的delay方式應該是這樣:
@GetMapping("pureReactivePressureTest")
public Mono<SingleResponse<String>> pureReactiveTest() {
Date current = new Date();
log.info("pureReactiveTest : " + current);
return Mono
.fromSupplier(() -> SingleResponse.of(String.valueOf(current)))
.doOnNext(e -> log.info("before delay: " + new Date())) // delay之前,在exec線程執行
.delayElement(Duration.ofSeconds(2)) //模擬業務停頓二秒
.doOnNext(e -> log.info("after delay: " + new Date())); // delay之后,在parallel線程執行
}
為什么說這才是正確的方式呢?我們先來看一下壓測的結果,可以看到通過這種方式,我們的QPS達到了452/sec,平均Response Time是2S,性能翻倍了,這個收益還是很可觀的。但是,前提是我們要用對。
之所以能達到這樣的效果,是因為通過delayElement我們把延遲操作異步化,Reactor的delay實現是有專門的parallel線程來負責,然后等到delay時間到了以后,再通過事件機制callback,這樣就不會阻塞exec線程的執行,相當于有400個exec線程一直在接客。關于這一點,我們可以通過如下的日志得到證實:
“before delay”是在exec線程中執行
16:14:57 INFO [http-nio-8080-exec-493] c.a.demo.adapter.ChargeController: before delay: Sat May 11 16:14:57 CST 2024
“after delay”是在parallel線程中執行
16:14:57 INFO [parallel-4] c.a.demo.adapter.ChargeController: after delay: Sat May 11 16:14:57 CST 2024
4)情況四,手動并行化
最后,我們來看一個可怕的情況。響應式編程本身是concurrency-agnostic的,其并發模型是開發人員自己控制的。因此我們可以手動設置parallel模式,以期達到并行處理的目的,我們不妨用一個Flux來試一試,其代碼如下
@GetMapping("reactivePressureTest")
public Mono<Response> reactivePressureTest() {
log.info("Start reactivePressureTest");
return Flux.range(1, 3)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> {
execute(i);
})
.then()
.thenReturn(Response.buildSuccess());
}
private Mono execute(int i) {
int sleepTime = 600;
if (i == 3) sleepTime = 800;
sleep("parallelStep" + i, sleepTime);
return Mono.empty();
}
上面代碼的意圖是說通過增加parallel線程,讓execute函數可以并行被執行,當我們用Postman發送一個請求的時候,很好,因為并行,本來需要2s的操作,800ms就返回了,這正是我們想要的。然而,當我們啟動和前面實驗一樣的1000個并發壓測時,慘不忍睹的事情發生了:
吞吐量降低到只有37/sec,延遲達到了26s,因為超時造成96%的錯誤率。 這就是我說的,用不好可能導致性能惡化的情況。造成這種情況的原因是,系統的默認的parallel線程數等于cpu的核數,我電腦是8核的,所以這里有8個parallel線程,又因為我們手動block了parallel線程,導致瓶頸點積壓到8個parallel線程身上。盡管在外圍我們有NIO的無阻塞acceptor接收請求,分發給400個exec線程工作,但都被block在8個parallel線程這里了,相當于整個系統只有8個線程在工作,不慢才怪。