簡介
在實時數(shù)據(jù)開發(fā)過程中,大家經(jīng)常會用 Flink SQL 或者 Flink DataStream API 來做數(shù)據(jù)加工。通常情況下選用2者都能加工出想要的數(shù)據(jù),但是總會有 Flink SQL 覆蓋不了的問題,但 SQL 的易用性又難以讓人釋懷。所以有些場景在使用 FLink SQL 開始就與需要額外注意,下面就介紹一種多表關(guān)聯(lián)時存在部分列更新(partial Update)場景,在 DataStream API 和 Flink SQL 開發(fā)時都容易忽視的情況而導(dǎo)致的問題。為了簡化問題描述,采用了Flink SQL 來闡述此類問題。
場景介紹
多表關(guān)聯(lián)時表 A 關(guān)聯(lián)表 B, 表 A 具有pk1, field1, field2, field3字段,表 B 具有 pk2, field4, field5, field6 字段,表 A 通過 pk1 關(guān)聯(lián)表B pk2。使用 Flink SQL 會如下實現(xiàn):
CREATE TABLE jdq_source( pk1 INT, field1 STIRNG, field2 STIRNG, field3 STIRNG, PRIMARY KEY(pk1) NOT ENFORCED ) WITH(...); CREATE TABLE sr_sink( pk1 INT, field1 STRING, field2 STRING, field3 STRING, field4 STRING, field5 STRING, field6 STRING, PRIMARY KEY(pk2) NOT ENFORCED ) WITH (...); INSERT INTO C SELECT A.pk1,A.field1,A.field2,A.field3,B.pk2,B.field4,B.field5,B.field6 FROM jdq_source A INNER JOIN sr_sink B ON A.pk1 = B.pk2;
上述實例中有明顯特征:使用了Join 關(guān)聯(lián), 且需要注意的是寫入的數(shù)據(jù)庫 sink 是 StarRocks。StarRocks 存在如下特性:當表是主鍵表時是不支持部分列更新( Partial Update)的,實際上大部分時候大家都用的是主鍵表。
然后在一個SQL查詢數(shù)據(jù)的接口就遇到了如下問題:每次從接口查詢返回的結(jié)果都不穩(wěn)定,同樣的查詢條件不同時機返回的結(jié)果不一樣。SQL查詢語句如下:
select C.field1,C.field2,C.field3 FROM C group by field1,field2,field3;
為什么SQL查詢的結(jié)果會不一致呢?起初排查原因發(fā)現(xiàn) group by 返回結(jié)果有多條,而在SQL 中也沒有使用 order by 對數(shù)據(jù)進行排序,所以導(dǎo)致了結(jié)果不穩(wěn)定。后又排查為什么會出現(xiàn)多條結(jié)果呢?于是懷疑 field1, field2, field3 有不符合預(yù)期的數(shù)據(jù)。如:
20240530, 2, 3
20240530, 2, null
20240531, 2, 4
其中第2條是多余的,不應(yīng)該出現(xiàn)。結(jié)果發(fā)現(xiàn)可能是如下原因?qū)е碌模哼@3個字段 filed1, field2, filed3 在StarRocks數(shù)據(jù)庫中會一直在變化,不停的寫入新值。導(dǎo)致 SQL 查詢時可以查到 field3 為 null 的數(shù)據(jù)。
為什么field3為不斷變化呢?究其原因是:StarRocks 主鍵表不支持部分列更新(Partial Update)。當field3 為null時,同樣會被寫入 StarRocks。我們在通過JDQ讀取表A field1, field2, field3 數(shù)據(jù)給表C寫入數(shù)據(jù)時,當JDQ 消息隊列中表A的記錄存在亂序場景且field3 字段可能為null時,最終寫入StarRocks的field3 字段會出現(xiàn)時而為null,時而不為null。 所以SQL查詢接口中 group by的結(jié)果會出現(xiàn)不穩(wěn)定。
總結(jié)
為什么在開發(fā)的時候當時沒有發(fā)現(xiàn) StarRocks 主鍵表這個問題呢?原因:1. 大家所關(guān)注的部分列更新,多數(shù)是關(guān)注insert into table_C(field1, field2, field3) 中不包含的字段field4,field5...等被更新為null,而當前場景是會把 field3 為null的值也寫入SR數(shù)據(jù)庫中,這不是我們期望的結(jié)果。2.表A作為主表,通常不會出現(xiàn)開始field3有值后來又沒有值(null)的場景。出現(xiàn)這個現(xiàn)象大概率是因為上游JDQ消息隊列中的數(shù)據(jù)亂序了,導(dǎo)致field3 為null的后出現(xiàn)了。而這種問題又比較難發(fā)現(xiàn)。
什么情況下會出現(xiàn)此類問題呢?寫入的數(shù)據(jù)庫不支持部分列更新場景時會出現(xiàn)。如StarRocks, Doris。因為MySQL, ES,ClickHouse的部分表引擎支持部分列更新,所以在MySQL, ES,ClickHouse中不會出現(xiàn)。
同理在 DataStream API 中如果表 A,表 B 關(guān)聯(lián)后的數(shù)據(jù)直接寫入StarRocks 的話,也會出現(xiàn)此類問題。
以上這個問題在 Flink SQL 中無法解決,在 Flink DataStream API 中可以模擬部分列更新來避免此類問題。具體方法:在DatStream 任務(wù)中增加一個MapState, 用來在新數(shù)據(jù)到來時從MapState拿出緩存的數(shù)據(jù),并和新到來的數(shù)據(jù)進行合并,來實現(xiàn)部分列更新功能,最后再寫入 StarRocks。
雖然問題不是Flink SQL導(dǎo)致的,但是上面的問題可以通過Flink DataStream API來規(guī)避。
審核編輯 黃宇
-
SQL
+關(guān)注
關(guān)注
1文章
760瀏覽量
44080
發(fā)布評論請先 登錄
相關(guān)推薦
評論