一、數(shù)據(jù)傾斜的基本概念
01 什么是數(shù)據(jù)傾斜?
用最通俗易懂的話來說,數(shù)據(jù)傾斜無非就是大量的相同key被partition分配到一個分區(qū)里,造成了'一個人累死,其他人閑死'的情況,這種情況是我們不能接受的,這也違背了并行計算的初衷,首先一個節(jié)點要承受著巨大的壓力,而其他節(jié)點計算完畢后要一直等待這個忙碌的節(jié)點,也拖累了整體的計算時間,可以說效率是十分低下的。
02? 數(shù)據(jù)傾斜發(fā)生時的現(xiàn)象?
(1)絕大多數(shù)task執(zhí)行得都非常快,但個別task執(zhí)行的極慢。
(2)原本能正常執(zhí)行的Spark作業(yè),某天突然爆出OOM(內(nèi)存溢出)異常。觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的。
03 通用的常規(guī)解決方案
(1)增加jvm內(nèi)存,這適用于第一種情況(唯一值非常少,極少數(shù)值有非常多的記錄值(唯一值少于幾千)),這種情況下,往往只能通過硬件的手段來進行調(diào)優(yōu),增加jvm內(nèi)存可以顯著的提高運行效率。
(2)增加reduce的個數(shù),這適用于第二種情況(唯一值比較多,這個字段的某些值有遠遠多于其他值的記錄數(shù),但是它的占比也小于百分之一或千分之一),我們知道,這種情況下,最容易造成的結(jié)果就是大量相同key被partition到一個分區(qū),從而一個reduce執(zhí)行了大量的工作,而如果我們增加了reduce的個數(shù),這種情況相對來說會減輕很多,畢竟計算的節(jié)點多了,就算工作量還是不均勻的,那也要小很多。
(3)自定義分區(qū),這需要用戶自己繼承partition類,指定分區(qū)策略,這種方式效果比較顯著。
(4)重新設(shè)計key,有一種方案是在map階段時給key加上一個隨機數(shù),有了隨機數(shù)的key就不會被大量的分配到同一節(jié)點(小幾率),待到reduce后再把隨機數(shù)去掉即可。
(5)使用combinner合并,combinner是在map階段,reduce之前的一個中間階段,在這個階段可以選擇性的把大量的相同key數(shù)據(jù)先進行一個合并,可以看做是local reduce,然后再交給reduce來處理,這樣做的好。
04 通用定位發(fā)生數(shù)據(jù)傾斜的代碼
(1)數(shù)據(jù)傾斜只會發(fā)生在shuffle中,下面是常用的可能會觸發(fā)shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現(xiàn)數(shù)據(jù)傾斜時,可能就是代碼中使用了這些算子的原因。
(2)通過觀察spark UI,定位數(shù)據(jù)傾斜發(fā)生在第幾個stage中,如果是用yarn-client模式提交,那么本地是可以直接看到log的,可以在log中找到當前運行到了第幾個stage;如果用yarn-cluster模式提交,可以通過Spark Web UI 來查看當前運行到了第幾個stage。此外,無論是使用了yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI 上深入看一下當前這個stage各個task分配的數(shù)據(jù)量,從而進一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
二、 Hive數(shù)據(jù)傾斜
1、Hive的執(zhí)行是分階段的,map處理數(shù)據(jù)量的差異取決于上一個stage的reduce輸出,所以如何將數(shù)據(jù)均勻的分配到各個reduce中,就是解決數(shù)據(jù)傾斜的根本所在。
2 、造成數(shù)據(jù)傾斜的原因
1)、key分布不均勻
2)、業(yè)務(wù)數(shù)據(jù)本身的特性
3)、建表時考慮不周
4)、某些SQL語句本身就有數(shù)據(jù)傾斜
3 、數(shù)據(jù)傾斜的表現(xiàn):
數(shù)據(jù)傾斜出現(xiàn)在SQL算子中包含join/group by/等聚合操作時,大量的相同KEY被分配到少量的reduce去處理。導(dǎo)致絕大多數(shù)TASK執(zhí)行得都非常快,但個別TASK執(zhí)行的極慢,原本能正常執(zhí)行的作業(yè),某天突然爆出OOM(內(nèi)存溢出)異常。任務(wù)進度長時間維持在99%(或100%)。任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量(1個或幾個)reduce子任務(wù)未完成。因為其處理的數(shù)據(jù)量和其他reduce差異過大。單一reduce的記錄數(shù)與平均記錄數(shù)差異過大,通常可能達到3倍甚至更多。 最長時長遠大于平均時長。可以查看具體job的reducer counter計數(shù)器協(xié)助定位。
4、數(shù)據(jù)傾斜的解決方案:
1)參數(shù)調(diào)節(jié):
hive.map.aggr=true(是否在Map端進行聚合,默認為true),這個設(shè)置可以將頂層的聚合操作放在Map階段執(zhí)行,從而減輕清洗階段數(shù)據(jù)傳輸和Reduce階段的執(zhí)行時間,提升總體性能 Set hive.groupby.skewindata=true(hive自動進行負載均衡)
2)SQL語句調(diào)節(jié)
a、如何Join: 關(guān)于驅(qū)動表的選取,選用join key分布最均勻的表作為驅(qū)動表。 做好列裁剪和filter操作,以達到兩表做join的時候,數(shù)據(jù)量相對變小的效果,避免笛卡爾積。 Hive中進行表的關(guān)聯(lián)查詢時,盡可能將較大的表放在Join之后。
b、大小表Join,開啟mapjoin
mapjoin的原理: MapJoin 會把小表全部讀入內(nèi)存中,在map階段直接拿另外一個表的數(shù)據(jù)和內(nèi)存中表數(shù)據(jù)做匹配,由于在map是進行了join操作,省去了reduce 階段,運行的效率就會高很多。參與連接的小表的行數(shù),以不超過2萬條為宜,大小不超過25M。
設(shè)置參數(shù)
set hive.auto.convert.join=true; hive.mapjoin.smalltable.filesize=25000000( 即25M)?手動指定
-- a 表是大表,數(shù)據(jù)量是百萬級別
-- b 表是小表,數(shù)據(jù)量在百級別,mapjion括號中的b就是指定哪張表為小表
select /*+mapjoin(b)*/ a.field1asfield1, b.field2asfield2, b.field3asfield3 fromaleftjoinb on a.field1 = b.field1;c、大表Join大表:
null值不參與連接,簡單舉例
select field1,field2,field3… fromlogaleftjoinuserbona.useridisnotnullanda.userid=b.userid unionselectfield1,field2,field3fromlogwhereuseridisnull;
將熱點key打散,但是需要注意,盡量不要在join時,對關(guān)聯(lián)key使用rand()函數(shù)。因為在hive中當遇到map失敗重算時,就會出現(xiàn)數(shù)據(jù)重復(fù)(數(shù)據(jù)丟失)的問題,spark引擎使用rand容易導(dǎo)致task失敗重新計算的時候偶發(fā)不一致的問題。可以使用md5加密唯一維度值的方式替代rand(), 比如: md5(concat(coalesce(sku_id, 0), '_', coalesce(dim_store_num, 0), '_', coalesce(store_id, 0), '_',coalesce(delv_center_id, 0))),其中concat的字段是表的唯一粒度;也可以使用hash。
d、count distinct大量相同特殊值,使用sum...group by代替count(distinct ) 例如
selecta,count(distinctb)fromtgroupbya 可以寫成selecta,sum(1)from(selecta,bfromtgroupbya,b)groupbya;
select count (distinct key) from a 可以寫成 Select sum(1) from (Select key from a group by key) t特殊情況特殊處理:在業(yè)務(wù)邏輯優(yōu)化效果的不大情況下,有些時候是可以將傾斜的數(shù)據(jù)單獨拿出來處理。最后union回去
e、 不管是join還是groupby 請先在內(nèi)層先進行數(shù)據(jù)過濾,建議只保留需要的key值
f、 取最大最小值盡量使用min/max;不要采用row_number
g、 不要直接select * ;在內(nèi)層做好數(shù)據(jù)過濾
h、 盡量使用sort by替換order by
i、 明確數(shù)據(jù)源,有上層匯總的就不要使用基礎(chǔ)fdm或明細表
J、join避免多對多關(guān)聯(lián)
在join鏈接查詢時,確認是否存在多對多的關(guān)聯(lián),起碼保證有一個表的結(jié)果集的關(guān)聯(lián)字段不重復(fù)。
5、典型的業(yè)務(wù)場景舉例
(1)空值產(chǎn)生的數(shù)據(jù)傾斜
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關(guān)聯(lián),會碰到數(shù)據(jù)傾斜的問題。
解決方法1: user_id為空的不參與關(guān)聯(lián)
select * from log a join users b on a.user_id is not null and a.user_id = b.user_idunion allselect * from log a where a.user_id is null;(2)不同數(shù)據(jù)類型關(guān)聯(lián)產(chǎn)生數(shù)據(jù)傾斜
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導(dǎo)致所有string類型id的記錄都分配到一個Reducer中。
解決方法:把數(shù)字類型轉(zhuǎn)換成字符串類型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)(3)小表不小不大,怎么用 map join 解決傾斜問題
使用 map join 解決小表(記錄數(shù)少)關(guān)聯(lián)大表的數(shù)據(jù)傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現(xiàn)bug或異常,這時就需要特別的處理 。
select * from log a left outer join users b on a.user_id = b.user_id;users 表有 600w+ 的記錄,把 users 分發(fā)到所有的 map 上也是個不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會碰到數(shù)據(jù)傾斜的問題。 解決方法:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;log里user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點擊的會員不會太多,有傭金的會員不會太多等等。所以這個方法能解決很多場景下的數(shù)據(jù)傾斜問題。
(4)業(yè)務(wù)邏輯突發(fā)熱key的處理(真實線上問題) 業(yè)務(wù)場景舉例:
流量數(shù)據(jù)多個設(shè)備號對應(yīng)了一個安裝id,突發(fā)某幾個安裝id數(shù)量級特別大。在歸一環(huán)節(jié)中,按照安裝id進行分發(fā)reduce,再進行處理,異常熱key會造成單一節(jié)點處理數(shù)據(jù)量大,由于數(shù)據(jù)傾斜從而導(dǎo)致任務(wù)卡死的情況。
解決方案:基于小時任務(wù),提前設(shè)置一個異常范圍,把異常安裝id和對應(yīng)的aid撈出來,寫到維表里面。按照歸一邏輯,優(yōu)先使用aid值作為歸一結(jié)果,所以在歸一任務(wù)中,讀取異常值,隨機分發(fā)到reduce中,并將aid賦值給歸一字段,這樣就避免了熱點處理。
總結(jié):
1、對于join,在判斷小表不大于1G的情況下,使用map join
2、對于group by或distinct,設(shè)定 hive.groupby.skewindata=true
3、盡量使用上述的SQL語句調(diào)節(jié)進行優(yōu)化
6、數(shù)據(jù)傾斜的監(jiān)控預(yù)防
(1)測試的時候需要關(guān)注數(shù)據(jù)分布,針對不同日期、關(guān)鍵指標、重點key、枚舉值等
(2)增加數(shù)據(jù)質(zhì)量監(jiān)控,數(shù)據(jù)計算的每層任務(wù)增加數(shù)據(jù)質(zhì)量監(jiān)控。
(3)L0任務(wù),大數(shù)據(jù)平臺需要有健康度巡檢,對資源、參數(shù)配置,數(shù)據(jù)傾斜、穩(wěn)定性等做任務(wù)健康度打分,從而發(fā)現(xiàn)數(shù)據(jù)傾斜的趨勢,及早檢查任務(wù)
三、spark數(shù)據(jù)傾斜
Spark優(yōu)化數(shù)據(jù)傾斜的思路,join方式從SMJ方式改成BMJ的方式,但是只適合大小表的情況。優(yōu)化思路一般是: 改join方式,開啟spark自適應(yīng)框架,優(yōu)化sql。
1、開啟sparksql的數(shù)據(jù)傾斜時的自適應(yīng)關(guān)聯(lián)優(yōu)化
spark.shuffle.statistics.verbose=true打開后MapStatus會采集每個partition條數(shù)的信息,用于傾斜處理。
2 、Sortmergejoin 改成 BroadcastHashJoin。調(diào)大BroadcastHashJoin的閾值。
在某些場景下可以把SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin而避免shuffle產(chǎn)生的數(shù)據(jù)傾斜。 增加參數(shù):
spark.sql.autoBroadcastJoinThreshold=524288000將BHJ的閾值提高到500M
3、優(yōu)化sql同hive
4、傾斜KEY查找
需要結(jié)合實際業(yè)務(wù)代碼,查找到引起Shuffle的算子,并按照以下兩種方式查找大KEY。?
方式一:通過SQL抽樣傾斜KEY
適用場景:如果數(shù)據(jù)量比較小的情況下,通過SQL的方式驗證比較便捷 。
操作步驟:
1、針對KEY進行數(shù)量統(tǒng)計
2、按照數(shù)量從大到小進行排序
3、直接取 limit N 即可?
方式二:通過sample抽樣傾斜KEY
適用場景:如果數(shù)據(jù)量很大,可以通過抽樣進行抽取大KEY。能否抽取到大KEY一般和抽取數(shù)據(jù)比例有關(guān)系。
操作步驟:
1、對KEY賦值為1,便于下一步進行計數(shù)
2、對KEY進行累計
3、對KEY和VALUE交換
4、針對KEY按照字典進行倒排
5、將KEY和VAlUE位置交換,還原到真實的
6、從已排序的RDD中,直接取前N條
數(shù)據(jù)傾斜一般由Shuffle時數(shù)據(jù)不均勻?qū)е拢话阌腥愃阕訒a(chǎn)生Shuffle:Aggregation (groupBy)、Join、Window。 01 Aggregation
建議打散key進行二次聚合:采用對 非constant值、與key無關(guān) 的列進行hash取模,不要使用rand類函數(shù)。
dataframe .groupBy(col("key"),pmod(hash(col("some_col")),100)).agg(max("value").as("partial_max")) .groupBy(col("key")).agg(max("partial_max").as("max"))02? Window
目前支持該模式下的傾斜window,(僅支持3.0)
select (... row_number() over(partition by ... order by ...) as rn) wherern[==|<=|<]?k?and?other?conditionsspark.sql.rankLimit.enabled=true?(目前支持基于row_number的topK計算邏輯)03? Shuffled Join
Spark 2.4開啟參數(shù)
spark.sql.adaptive.enabled=true spark.shuffle.statistics.verbose=true spark.sql.adaptive.skewedJoin.enabled=true spark.sql.adaptive.allowAdditionalShuffle=true如果不能處理,建議用戶自行定位熱點數(shù)據(jù)進行處理 Spark 3.0
spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.enhance.enabled=true (通用傾斜算法,可處理更多場景) spark.sql.adaptive.forceOptimizeSkewedJoin=true(允許插入額外shuffle,可處理更多場景)
其他參數(shù):
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (默認為256MB,分區(qū)大小超過該閾值才可被識別為傾斜分區(qū),如果希望調(diào)整的傾斜分區(qū)小于該閾值,可以酌情調(diào)小)?
spark.sql.adaptive.skewJoin.skewedPartitionFactor (默認為5,分區(qū)大小超過中位數(shù)Xfactor才可被識別為傾斜分區(qū),一般不需要調(diào)整)? spark.sql.adaptive.skewJoin.enhance.maxJoins (默認5,通用傾斜算法中,如果shuffled join超過此閾值則不處理,一般不需要調(diào)整)? spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition (默認1000,通用傾斜算法中,盡量使得每個傾斜分區(qū)的劃分不超過該閾值,一般不需要調(diào)整)?
04 數(shù)據(jù)膨脹(Join)
spark.sql.adaptive.skewJoin.inflation.enabled=true(默認false,由于采樣計算會導(dǎo)致性能回歸,正常任務(wù)不要開啟) spark.sql.adaptive.skewJoin.inflation.factor=50(默認為100,預(yù)估的分區(qū)輸出大小超過中位數(shù)Xfactor才可被識別為膨脹分區(qū),由于預(yù)估算法存在誤差,一般不要低于50) spark.sql.adaptive.shuffle.sampleSizePerPartition=500(默認100,每個Task中的采樣數(shù),基于該采樣數(shù)據(jù)預(yù)估Join之后的分區(qū)大小,如果Task數(shù)量不大,可以酌情調(diào)大)05 傾斜key檢測(Join)
由于Join語義限制,對于A left join skewed B之類的場景,無法對B進行劃分處理,否則會導(dǎo)致數(shù)據(jù)正確性問題,這也是Spark項目所面臨的難題。如果開啟以上功能依然不能處理數(shù)據(jù)傾斜,可以通過開啟傾斜key檢測功能來定位是哪些key導(dǎo)致了傾斜或膨脹,繼而進行過濾等處理。
spark.sql.adaptive.shuffle.detectSkewness=true(默認false,由于采樣計算會導(dǎo)致性能回歸,正常任務(wù)不要開啟)其他參數(shù):
spark.sql.adaptive.shuffle.sampleSizePerPartition=100(默認100,每個Task中的采樣數(shù),如果Task數(shù)量不大,可以酌情調(diào)大)
審核編輯機:劉清
-
計數(shù)器
+關(guān)注
關(guān)注
32文章
2254瀏覽量
94372 -
SQL
+關(guān)注
關(guān)注
1文章
760瀏覽量
44081 -
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7967 -
JVM
+關(guān)注
關(guān)注
0文章
157瀏覽量
12210
原文標題:淺談離線數(shù)據(jù)傾斜
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論