1 前言
說到線程池八股文背的很熟的肯定知道無非就這幾個考點:
(1)線程池三大核心參數(shù) corePoolSize、maximumPoolSize、workQueue 的含義
(2)線程池核心線程數(shù)制定策略
(3)建議通過 ThreadPoolExecutor 的構造函數(shù)來聲明,避免使用 Executors 創(chuàng)建線程池
以上考點作為線程池面試幾乎必問的內(nèi)容,大部分人應該都是如數(shù)家珍,張口就來,但是懂了面試八股文真的就不一定在實際運用中真的就會把線程池用好 。且看下面這次真實生產(chǎn)事故還原
2 事故還原
某次一位研發(fā)同事寫出了下面類似的代碼:
Listitems=getFromDb(); List >completableFutures=items.stream().map(item->CompletableFuture.supplyAsync(()->{ AppMapStationDatadata=mapper.copy(item); //發(fā)起價格信息查詢的RPC調(diào)用 data.setPriceInfo(priceApi.getPriceInfoById(item.getId())) returndata; },apiExecutor)).collect(Collectors.toList()); result=completableFutures.stream().map(e->{ returne.get(); }).filter(Objects::nonNull).collect(Collectors.toList());
上面的代碼中,代碼首先從數(shù)據(jù)庫里面查出來一堆對象,然后對每一個對象進行模型轉(zhuǎn)換,由于要獲取每個對象的價格信息發(fā)起了一次RPC調(diào)用,由于RPC服務沒有提供批量接口,所以代碼里面用了線程池并發(fā)請求,以求得接口盡可能快的返回數(shù)據(jù)。
使用的是CompletableFuture 而且自定義了線程池,線程池指定了10個核心線程,20個最大線程,這段代碼在上線后的一段時間確實沒有任何問題,但是在灰度放量用戶量多起來之后發(fā)現(xiàn)接口經(jīng)常超時告警。
請問為什么上面的代碼在用戶量稍微大一點的時候就運行緩慢了呢?
實際代碼問題出現(xiàn)在了這個get方法中,這個get方法沒有指定超時時間,當getPriceInfoById這個接口響應變慢的時候,這個主線程的代碼get又沒有指定超時時間,這時候問題就來了。
由于某次業(yè)務查詢查到了非常多的數(shù)據(jù),每條數(shù)據(jù)就是個模型轉(zhuǎn)換任務,這個任務就會在隊列排隊,get方法沒有指定超時時間的情況下,其最終耗時就取決于整個線程池中執(zhí)行最慢的那一個任務,所以當從DB中查出來的數(shù)據(jù)量越來越大的時候這個轉(zhuǎn)換任務的最大耗時就會逐漸增加,進而引發(fā)接口超時。
所以這里改進上述問題需要做到兩個點:
1、數(shù)據(jù)庫中查出來的數(shù)據(jù)集合必須分頁
2、get方法必須設置超時時間
此外需要知道get方法設置超時時間的計算方式也需要留意,考慮下面這種場景
提交兩個任務 A 和 B 到線程池,A 任務耗時 3 秒,B 任務耗時 4 秒,F(xiàn)uture 以 2 秒為超時時間獲取任務結果
代碼如下:
ExecutorServiceexecutorService=Executors.newFixedThreadPool(2); CallabletaskA=()->{ sleep(3); return"A"; }; Callable taskB=()->{ sleep(4); return"B"; }; List >futures=Stream.of(taskA,taskB) .map(executorService::submit) .collect(Collectors.toList()); for(Future future:futures){ try{ Strings=future.get(2,TimeUnit.SECONDS); System.out.println(s); }catch(Exceptione){ continue; } }
實際運行情況是第一個任務會超時但是第二個不會 ,看起來是不是還有點不可思議,耗時時間長的任務B反而沒超時。原因就在于 Future.get(long timeout, TimeUnit unit) ,調(diào)用 get 時才開始計時,而非任務加入線程池的時間
從圖上就可以看出來,在獲取B的任務執(zhí)行結果的時候B任務已經(jīng)執(zhí)行了兩秒,所以在等待兩秒的情況下可以獲取到結果
3 線程池不當使用舉例
(1)不區(qū)分業(yè)務一把梭哈,全用一個線程池
曾經(jīng)有一個項目,對接多個租戶,每個租戶都有各自的任務需要執(zhí)行,代碼中不區(qū)分租戶的將所有租戶的任務全部丟到一個線程池中執(zhí)行,結果一個租戶的任務提交過多導致線程池執(zhí)行緩慢,但是由于線程池是同一個,影響了所有租戶接口的響應時間。如果說上面說的這個場景用一個線程池產(chǎn)生了租戶互相影響的問題還不夠嚴重,那么下面的這種場景就問題更大了。
曾經(jīng)有一段這樣的場景,因為共用線程池直接導致線程池任務永遠完成不了,請看下面的這種情況:
首先向線程池中提交了一個任務,然后在這個任務的內(nèi)部實現(xiàn)中又往同一個線程池中再次提交了一個任務,相當于父子任務在同一個線程池中執(zhí)行,這時候極有可出現(xiàn)線程死鎖也就是循環(huán)等待的情況
如上圖所示,父任務全部處于執(zhí)行狀態(tài),這時候子任務想要執(zhí)行需要等父任務執(zhí)行完成,但是父任務都執(zhí)行不完,因為還有個子任務沒完成,即父任務等待子任務執(zhí)行完成,而子任務等待父任務釋放線程池資源,這也就造成了 "死鎖"
所以綜上所述,在代碼中應該避免各種任務都往一個線程池中投放,對每個線程池指定好線程名稱,做好分類比較合適,這里在日常開發(fā)中比較推薦使用Guava的工具類,來指定線程名稱前綴,這樣使用jstack分析線程問題也方便排查。
ThreadFactorythreadFactory=newThreadFactoryBuilder() .setNameFormat(threadNamePrefix+"-%d") .setDaemon(true).build(); ExecutorServicethreadPool=newThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
(2)@Async注解不自己定義線程池
@Async用在方法上標識這是一個異步方法,如果不自己指定線程池這個方法將直接新建一個線程執(zhí)行,可以翻看spring實現(xiàn)源碼知道這個點
@Async的實現(xiàn)其實非常簡單就是利用AOP,容器啟動的時候會掃描所有被打上@Async注解的方法,并代理這些方法的執(zhí)行,在執(zhí)行這個方法的時候,生成Callable任務丟到線程池中執(zhí)行(核心代碼位于org.springframework.aop.interceptor.AsyncExecutionInterceptor)
@Override @Nullable publicObjectinvoke(finalMethodInvocationinvocation)throwsThrowable{ Class>targetClass=(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null); MethodspecificMethod=ClassUtils.getMostSpecificMethod(invocation.getMethod(),targetClass); finalMethoduserDeclaredMethod=BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutorexecutor=determineAsyncExecutor(userDeclaredMethod); if(executor==null){ thrownewIllegalStateException( "NoexecutorspecifiedandnodefaultexecutorsetonAsyncExecutionInterceptoreither"); } //將方法調(diào)用封裝成Callable實例丟入線程池中執(zhí)行 Callable
如果不指定線程池這里就會啟用默認的線程池 SimpleAsyncTaskExecutor 然后我們看下這個類的注釋
那這個問題就很嚴重了,假定你的方法執(zhí)行速度慢,而且qps大,這時候線程數(shù)就會直接爆炸,所以建議寫一個類繼承 AsyncConfigurer接口并復寫getAsyncExecutor方法,然后在使用注解的時候指定線程池的名稱
//使用注解時指定線程池的Bean名稱 @Async("apiExecutor")
(3)線程池遇上ThreadLocal
線程池和 ThreadLocal 共用,可能會導致線程從 ThreadLocal 獲取到的是舊值/臟數(shù)據(jù)。這是因為線程池會復用線程對象,與線程對象綁定的類的靜態(tài)屬性 ThreadLocal 變量也會被重用,這就導致一個線程可能獲取到其他線程的 ThreadLocal 值。
比較常規(guī)的做法是在任務執(zhí)行完畢之后的finally代碼塊里面做清理工作
Runnablerunnable=()->{ try{ BizThreadLocal.set("xxxx"); //dosth }finally{ BizThreadLocal.remove(); } };
但是其實finally的代碼塊其實也不是百分百一定執(zhí)行,事實上Thread#stop() 方法打斷線程執(zhí)行的時候 finally代碼塊中的內(nèi)容就不會執(zhí)行,比較推薦的還是# TransmittableThreadLocal
4 再談線程池,幾個關鍵要點
(1)為什么默認線程池的隊列長度不能動態(tài)調(diào)整?
曾經(jīng)面對生產(chǎn)環(huán)境線程池的參數(shù)設定問題,我曾經(jīng)想到一個方案,既然線程池的參數(shù)不好定,那咱們直接動態(tài)修改就行不行呢,線程池本身提供了很多的set方法可以做到參數(shù)修改,比如我們在springBoot項目往往去使用ThreadPoolTaskExecutor 作為線程池,從下圖的set方法列表中可以看出存在很多修改線程池參數(shù)的方法
然后實際使用的時候發(fā)現(xiàn)核心線程數(shù)和最大線程數(shù)都能動態(tài)修改 但是隊列長度卻不能 ,為什么隊列長度不能調(diào)用setQueueCapacity方法進行動態(tài)修改呢?
首先我們可以簡單理解為spring的ThreadPoolTaskExecutor是Java原生ThreadPoolExecutor的封裝,觀察這個類的setMaxPoolSize和setQueueCapacity代碼實現(xiàn)我們就能發(fā)現(xiàn)setQueueCapacity 實際就是一個賦值僅在第一次實例化線程池的使用到了這個參數(shù)。
publicvoidsetMaxPoolSize(intmaxPoolSize){ synchronized(this.poolSizeMonitor){ if(this.threadPoolExecutor!=null){ this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } this.maxPoolSize=maxPoolSize; } } publicvoidsetQueueCapacity(intqueueCapacity){ this.queueCapacity=queueCapacity; } protectedBlockingQueuecreateQueue(intqueueCapacity){ return(BlockingQueue)(queueCapacity>0?newLinkedBlockingQueue(queueCapacity):newSynchronousQueue()); }
從上面的源碼我們還可以看出spring的ThreadPoolTaskExecutor使用的隊列是LinkedBlockingQueue ,那么為啥線程池ThreadPoolExecutor不支持修改隊列長度呢?這個原因就很簡單了因為這個隊列的capacity是final類型的,自然不能修改。
那如果我一定要修改這個隊列長度應該怎么處理?那完全就可以仿照美團的方式,自定義了一個叫做 ResizableCapacityLinkedBlockIngQueue 的隊列(把LinkedBlockingQueue的capacity 字段的final關鍵字修飾給去掉了,讓它變?yōu)榭勺兊模┦遣皇且彩呛芎唵巍?/p>
(2)再談核心線程數(shù)的參數(shù)設置
核心線程池的參數(shù)設置一般各種網(wǎng)絡資料中比較推崇的是N+1和2N法,即:
CPU 密集型任務(N+1) :這種任務消耗的主要是 CPU 資源,可以將線程數(shù)設置為 N(CPU 核心數(shù))+1。比 CPU 核心數(shù)多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因?qū)е碌娜蝿諘和6鴰淼挠绊憽R坏┤蝿諘和?,CPU 就會處于空閑狀態(tài),而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
I/O 密集型任務(2N) :這種任務應用起來,系統(tǒng)會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內(nèi)不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
如何判斷是 CPU 密集任務還是 IO 密集任務?
CPU 密集型 :簡單理解就是利用 CPU 計算能力的任務比如你在內(nèi)存中對大量數(shù)據(jù)進行排序。
IO 密集型 :涉及到網(wǎng)絡讀取,文件讀取這類都是 IO 密集型,這類任務的特點是 CPU 計算耗費時間相比于等待 IO 操作完成的時間來說很少,大部分時間都花在了等待 IO 操作完成上。
但是實際上比較科學的線程數(shù)計算方式是:
最佳線程數(shù) = N(CPU 核心數(shù))?(1+WT(線程等待時間)/ST(線程計算時間))
WT(線程等待時間)= 線程運行總時間 - ST(線程計算時間)
線程等待時間所占比例越高,需要越多線程。線程計算時間所占比例越高,需要越少線程。(我們可以通過 JDK 自帶的工具 VisualVM 來查看 WT/ST 比例)
CPU 密集型任務的 WT/ST 接近或者等于 0,因此, 線程數(shù)可以設置為 N(CPU 核心數(shù))?(1+0)= N,和我們上面說的 N(CPU 核心數(shù))+1 差不多。IO 密集型任務下,幾乎全是線程等待時間,從理論上來說,你就可以將線程數(shù)設置為 2N。
這里額外說一句早先我雖然知道線程池核心線程數(shù)應該和CPU核心線程數(shù)有關,但是悲劇的是我并不知道怎么查Linux系統(tǒng)的核心數(shù),這里把查詢命令貼出來供參考:
#總核數(shù)=物理CPU個數(shù)X每顆物理CPU的核數(shù) #查看物理CPU個數(shù) cat/proc/cpuinfo|grep"physicalid"|sort|uniq|wc-l #查看每個物理CPU中core的個數(shù)(即核數(shù)) cat/proc/cpuinfo|grep"cpucores"|uniq
(3)為什么不太推薦你用ParallelStream?
曾經(jīng)某次代碼評審中,有位同學寫出了下面類似的代碼
//從數(shù)據(jù)庫中查找學生對象 Liststudents=searchDataFromDb(); //使用并行流進行模型轉(zhuǎn)換 List res=newArrayList(); students.parallelStream().forEach(student->{ StudentVovo=newStudentVo(student) res.add(student); });
結果測試過程中返回給前端的數(shù)據(jù)總是莫名其妙的少很多和數(shù)據(jù)庫中的真實數(shù)據(jù)條數(shù)對不上,相信大家都看出來了原因是List并不是線程安全的容器,所以導致了最后結果不對,其實這不能算是parallelStream的問題,但是很多人寫代碼時,以為并行流就快為了追求效率,不假思索就寫了這樣的代碼,但是往往在線程池的環(huán)境下大家又仿佛繃緊了并發(fā)神經(jīng),又能考慮到了并發(fā)問題。
此外parallelStream的默認線程池遇上ThreadLocal同樣也存在一些問題,其實如果不做額外線程池指定,代碼中的 parallelStream 都是共用同一個線程池的,ParallelStream 底層使用了 ForkJoinPool,當 Stream 流中元素較多時,整個運行效率也會大大降低。
5 總結
本文通過一次生產(chǎn)事故,進一步總結了線程池在日常開發(fā)中需要注意的一些要點,希望對大家有所幫助。
審核編輯:湯梓紅
-
函數(shù)
+關注
關注
3文章
4304瀏覽量
62429 -
代碼
+關注
關注
30文章
4747瀏覽量
68348 -
線程池
+關注
關注
0文章
57瀏覽量
6834
原文標題:【避坑】線程池沒用好,直接出現(xiàn)了生產(chǎn)事故....
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關推薦
評論