ClickHouse索引采用唯一聚簇索引的方式,即Part內數據按照order by keys有序,在整個查詢計劃中,如果算子能夠有效利用輸入數據的有序性,對算子的執行性能將有巨大的提升。本文討論ClickHouse基于索引的查詢算子優化方式。
在整個查詢計劃中Sort、Distinct、聚合這3個算子相比其他算子比如:過濾、projection等有如下幾個特點:1.算子需要再內存中保存狀態,內存代價高;2.算子計算代價高;3.算子會阻斷執行pipeline,待所有數據計算完整后才會向下游輸出數據。所以上算子往往是整個查詢的瓶頸算子。
本文詳細討論,3個算子基于索引的查詢優化前后,在計算、內存和pipeline阻斷上的影響。
實驗前準備:
后續的討論主要基于實驗進行。
CREATE TABLE test_in_order ( `a` UInt64, `b` UInt64, `c` UInt64, `d` UInt64 ) ENGINE = MergeTree ORDER BY (a, b);
表中總共有3個part,每個part數據量4條。
PS: 用戶可以在插入數據前提前關閉后臺merge,以避免part合并成一個,如果part合并成一個將影響查詢并行度,可能對實驗有影響,以下查詢可以關閉后臺merge:system stop merges test_in_order
一、Sort算子
如果order by查詢的order by字段與表的order by keys的前綴列匹配,那么可以根據數據的有序特性對Sort算子進行優化。
1.Sort算子實現方式
首先看下不能利用主鍵有序性的場景,即對于order by查詢的order by字段與表的order by keys的前綴列不匹配。比如下面的查詢:
query_1: EXPLAIN PIPELINE SELECT b FROM read_in_order ORDER BY b ASC
它的執行計劃如下:
┌─explain───────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Sorting) │ │ MergingSortedTransform 3 → 1 │ │ MergeSortingTransform × 3 │ │ LimitsCheckingTransform × 3 │ │ PartialSortingTransform × 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 3 0 → 1 │ └───────────────────────────────────────┘
排序算法由3個Transform組成,其中
1)PartialSortingTransform對單個Chunk進行排序;
2)MergeSortingTransform對單個stream進行排序;
3)MergingSortedTransform合并多個有序的stream進行全局sort-merge排序
如果查詢的order by字段與表的order by keys的前綴列匹配,那么可以根據數據的有序特性對查詢進行優化,優化開關:optimize_read_in_order。
2.匹配索引列的查詢
以下查詢的order by字段與表的order by keys的前綴列匹配
query_3: EXPLAIN PIPELINE SELECT b FROM test_in_order ORDER BY a ASC, b ASCSETTINGS optimize_read_in_order = 0 -- 關閉read_in_order優化
查看order by語句的pipeline執行計劃
┌─explain───────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Sorting) │ │ MergingSortedTransform 3 → 1 │ │ MergeSortingTransform × 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 3 0 → 1 │ └───────────────────────────────────┘
此時order by算子的算法
1)首先MergeSortingTransform對輸入的stream進行排序
2)然后MergingSortedTransform將多個排好序的stream進行合并,并輸出一個整體有序的stream,也是最終的排序結果。
這里有個疑問在關閉read_in_order優化的查詢計劃中,系統直接默認了MergeSortingTransform的輸入在Chunk內是有序的,這里其實是一個默認優化,因為order by查詢的order by字段與表的order by keys的前綴列匹配,所以數據在Chunk內部一定是有序的。
3. 開啟優化optimize_read_in_order
┌─explain──────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Sorting) │ │ MergingSortedTransform 3 → 1 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder × 3 0 → 1 │ └──────────────────────────────────┘
4. 優化分析
打開optimize_read_in_order后:
1.對于計算方面:算法中只有一個MergingSortedTransform,省略了單個stream內排序的步驟
2.由于內存方面:由于MergeSortingTransform是消耗內存最大的步驟,所以優化后可以節約大量的內存
3.對于poipeline阻塞:MergeSortingTransform會阻塞整個pipeline,所以優化后也消除了對pipeline的阻塞
二、Distinct算子
如果distinct查詢的distinct字段與表的order by keys的前綴列匹配,那么可以根據數據的有序特性對Distinct算子進行優化,優化開關:optimize_distinct_in_order。通過以下實驗進行說明:
1. Distinct算子實現方式
查看distinct語句的pipeline執行計劃
query_2: EXPLAIN PIPELINE SELECT DISTINCT * FROM woo.test_in_order SETTINGS optimize_distinct_in_order = 0 -- 關閉distinct in order優化
┌─explain─────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Distinct) │ │ DistinctTransform │ │ Resize 3 → 1 │ │ (Distinct) │ │ DistinctTransform × 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 3 0 → 1 │ └─────────────────────────────────────┘
Distinct算子采用兩階段的方式,首先第一個DistinctTransform在內部進行初步distinct,其并行度為3,可以簡單的認為有3個線程在同時執行。然后第二個DistinctTransform進行final distinct。
每個DistinctTransform的計算方式為:首先構建一個HashSet數據結構,然后根據HashSet,構建一個Filter Mask(如果當前key存在于HashSet中,則過濾掉),最后過濾掉不需要的數據。
2.開啟優化optimize_distinct_in_order
┌─explain────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Distinct) │ │ DistinctTransform │ │ Resize 3 → 1 │ │ (Distinct) │ │ DistinctSortedChunkTransform × 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 3 0 → 1 │ └────────────────────────────────────────┘
可以看到初步distinct和final distinct采用了不同的transform,DistinctSortedChunkTransform和DistinctTransform。
DistinctSortedChunkTransform:對單個stream內的數據進行distinct操作,因為distinct列跟表的order by keys的前綴列匹配,scan算子讀取數據的時候一個stream只從一個part內讀取數據,那么每個distinct transform輸入的數據就是有序的。所以distinct算法有:
DistinctSortedChunkTransform算法一:
Transform中保留最后一個輸入的數據作為狀態,對于每個輸入的新數據如果跟保留的狀態相同,那么忽略,如果不同則將上一個狀態輸出給上一個算子,然后保留當前的數據最為狀態。這種算法對于在整個stream內部全局去重時間和空間復雜度都有極大的降低。
DistinctSortedStreamTransform算法二:(ClickHouse采用的)
Transform對與每個Chunk(ClickHouse中Transform數據處理的基本單位,默認大約6.5w行),首先將相同的數據劃分成多個Range,并設置一個mask數組,然后將相同的數據刪除掉,最后返回刪除重復數據的Chunk。
3. 優化分析
打開optimize_distinct_in_order后:主要對于第一階段的distinct步驟進行了優化,從基于HashSet過濾的算法到基于連續相同值的算法。
1.對于計算方面:優化后的算法,省去了Hash計算,但多了判斷相等的步驟,在不同數據基數集大小下,各有優劣。
2.由于內存方面:優化后的算法,不需要存儲HashSet
3.對于poipeline阻塞:優化前后都不會阻塞pipeline
三、聚合算子
如果group by查詢的order by字段與表的order by keys的前綴列匹配,那么可以根據數據的有序特性對聚合算子進行優化,優化開關:optimize_aggregation_in_order。
1.聚合算子實現方式
查看group by語句的pipeline執行計劃:
query_4: EXPLAIN PIPELINE SELECT a FROM test_in_order GROUP BY a SETTINGS optimize_aggregation_in_order = 0 -- 關閉read_in_order優化
┌─explain─────────────────────────────┐ │ (Expression) │ │ ExpressionTransform × 8 │ │ (Aggregating) │ │ Resize 3 → 8 │ │ AggregatingTransform × 3 │ │ StrictResize 3 → 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 3 0 → 1 │ └─────────────────────────────────────┘
對于聚合算子的整體算法沒有在執行計劃中完整顯示出來,其宏觀上采用兩階段的聚合算法,其完整算法如下:1.AggregatingTransform進行初步聚合,這一步可以并行計算;2.ConvertingAggregatedToChunksTransform進行第二階段聚合。(PS:為簡化起見,忽略two level HashMap,和spill to disk的介紹)。
2.開啟優化optimize_aggregation_in_order
執行計劃如下:
┌─explain───────────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform × 8 │ │ (Aggregating) │ │ MergingAggregatedBucketTransform × 8 │ │ Resize 1 → 8 │ │ FinishAggregatingInOrderTransform 3 → 1 │ │ AggregatingInOrderTransform × 3 │ │ (Expression) │ │ ExpressionTransform × 3 │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder × 3 0 → 1 │ └───────────────────────────────────────────────┘
可以看到打開optimize_aggregation_in_order后aggregating算法由三個步驟組成:
1)首先AggregatingInOrderTransform會將stream內連續的相同的key進行預聚合,預聚合后在當前stream內相同keys的數據只會有一條;
2)FinishAggregatingInOrderTransform將接收到的多個stream內的數據進行重新分組使得輸出的chunk間數據是有序的,假設前一個chunk中group by keys最大的一條數據是5,當前即將輸出的chunk中沒有大于5的數據;
3)MergingAggregatedBucketTransform的作用是進行最終的merge aggregating。
FinishAggregatingInOrderTransform的分組算法如下:
假設有3個stream當前算子會維護3個Chunk,每一次選取在當前的3個Chunk內找到最后一條數據的最小值,比如初始狀態最小值是5,然后將3個Chunk內所有小于5的數據一次性取走,如此反復如果一個Chunk被取光,需要從改stream內拉取新的Chunk。
這種算法保證了每次FinishAggregatingInOrderTransform向下游輸出的Chunk的最大值小于下一次Chunk的最小值,便于后續步驟的優化。
3.優化分析
打開optimize_aggregation_in_order后:主要對于第一階段的聚合步驟進行了優化,從基于HashMap的算法到基于連續相同值的算法。
1.對于計算方面:優化后的算法,減少了Hash計算,但多了判斷相等的步驟,在不同數據基數集大小下,各有優劣。
2.由于內存方面:優化前后無差別
3.對于poipeline阻塞:優化前后無差別
四、優化小結
在整個查詢計劃中Sort、Distinct、聚合這3個算子算子往往是整個查詢的瓶頸算子,所以值得對其進行深度優化。ClickHouse通過利用算子輸入數據的有序性,優化算子的算法或者選擇不同的算法,在計算、內存和pipeline阻塞三個方面均有不同程度的優化。
審核編輯 黃宇
-
Pipeline
+關注
關注
0文章
28瀏覽量
9345 -
算子
+關注
關注
0文章
16瀏覽量
7252
發布評論請先 登錄
相關推薦
評論