一、Flink SQL 背景
Flink SQL 是 Flink 實(shí)時(shí)計(jì)算為簡(jiǎn)化計(jì)算模型,降低用戶使用實(shí)時(shí)計(jì)算門(mén)檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn) SQL 語(yǔ)義的開(kāi)發(fā)語(yǔ)言。
自 2015 年開(kāi)始,阿里巴巴開(kāi)始調(diào)研開(kāi)源流計(jì)算引擎,最終決定基于 Flink 打造新一代計(jì)算引擎,針對(duì) Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開(kāi)源,也就是我們熟知的 Blink。Blink 在原來(lái)的 Flink 基礎(chǔ)上最顯著的一個(gè)貢獻(xiàn)就是 Flink SQL 的實(shí)現(xiàn)。
Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計(jì)算領(lǐng)域,比如 Storm、Spark Streaming 都會(huì)提供一些 Function 或者 Datastream API,用戶通過(guò) Java 或 Scala 寫(xiě)業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門(mén)檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。
在這個(gè)背景下,毫無(wú)疑問(wèn),SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因?yàn)槠渚哂袔讉€(gè)非常重要的特點(diǎn):
SQL 屬于設(shè)定式語(yǔ)言,用戶只要表達(dá)清楚需求即可,不需要了解具體做法;
SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計(jì)劃;
SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低;
SQL 非常穩(wěn)定,在數(shù)據(jù)庫(kù) 30 多年的歷史中,SQL 本身變化較少;
流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個(gè)流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。
二、Flink 的最新特性(1.7.0 和 1.8.0 更新)
2.1 Flink 1.7.0 新特性
在 Flink 1.7.0 中,我們更接近實(shí)現(xiàn)快速數(shù)據(jù)處理和以無(wú)縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序的目標(biāo)。最新版本包括一些新功能和改進(jìn),例如對(duì) Scala 2.12 的支持、一次性 S3 文件接收器、復(fù)雜事件處理與流 SQL 的集成等。
Apache Flink 中對(duì) Scala 2.12 的支持(FLINK-7811)
Apache Flink 1.7.0 是第一個(gè)完全支持 Scala 2.12 的版本。這允許用戶使用較新的 Scala 版本編寫(xiě) Flink 應(yīng)用程序并利用 Scala 2.12 生態(tài)系統(tǒng)。
狀態(tài)演進(jìn)(FLINK-9376)
許多情況下,由于需求的變化,長(zhǎng)期運(yùn)行的 Flink 應(yīng)用程序需要在其生命周期內(nèi)發(fā)展。在不失去當(dāng)前應(yīng)用程序進(jìn)度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序發(fā)展的關(guān)鍵要求。使用 Flink 1.7.0,社區(qū)添加了狀態(tài)演變,允許您靈活地調(diào)整長(zhǎng)時(shí)間運(yùn)行的應(yīng)用程序的用戶狀態(tài)模式,同時(shí)保持與以前保存點(diǎn)的兼容性。通過(guò)狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應(yīng)用程序部署后應(yīng)用程序捕獲的業(yè)務(wù)功能?,F(xiàn)在,使用 Avro 生成時(shí),狀態(tài)模式演變現(xiàn)在可以立即使用作為用戶狀態(tài)的類,這意味著可以根據(jù) Avro 的規(guī)范來(lái)演變國(guó)家的架構(gòu)。雖然 Avro 類型是 Flink 1.7 中唯一支持模式演變的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來(lái)的 Flink 版本中進(jìn)一步擴(kuò)展對(duì)其他類型的支持。
MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)
這是 Apache Flink 1.7.0 的一個(gè)重要補(bǔ)充,它為 Flink SQL 提供了 MATCH RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能結(jié)合了復(fù)雜事件處理(CEP)和 SQL,可以輕松地對(duì)數(shù)據(jù)流進(jìn)行模式匹配,從而實(shí)現(xiàn)一整套新的用例。此功能目前處于測(cè)試階段,因此我們歡迎社區(qū)提供任何反饋和建議。
流式 SQL 中的時(shí)態(tài)表和時(shí)間連接(FLINK-9712)
時(shí)態(tài)表是 Apache Flink 中的一個(gè)新概念,它為表的更改歷史提供(參數(shù)化)視圖,并在特定時(shí)間點(diǎn)返回表的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表格。隨著時(shí)間的推移,這種表格不斷增長(zhǎng)/發(fā)展,并且增加了新的更新匯率。時(shí)態(tài)表是一種視圖,可以將這些匯率的實(shí)際狀態(tài)返回到任何給定的時(shí)間點(diǎn)。使用這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。時(shí)間聯(lián)接允許使用不斷變化/更新的表來(lái)進(jìn)行內(nèi)存和計(jì)算有效的流數(shù)據(jù)連接。
Streaming SQL 的其他功能
除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴(kuò)展到更多用例。以下內(nèi)置函數(shù)被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會(huì)話中定義視圖。此外,CLI 中添加了基本的 SQL 語(yǔ)句自動(dòng)完成功能。社區(qū)添加了一個(gè) Elasticsearch 6 表接收器,允許存儲(chǔ)動(dòng)態(tài)表的更新結(jié)果。
Kafka 2.0 連接器(FLINK-10598)
Apache Flink 1.7.0 繼續(xù)添加更多連接器,使其更容易與更多外部系統(tǒng)進(jìn)行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,該連接器允許通過(guò)一次性保證讀取和寫(xiě)入 Kafka 2.0。
本地恢復(fù)(FLINK-9635)
Apache Flink 1.7.0 通過(guò)擴(kuò)展 Flink 的調(diào)度來(lái)完成本地恢復(fù)功能,以便在恢復(fù)時(shí)考慮以前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將保留最新檢查點(diǎn)的本地副本任務(wù)運(yùn)行的機(jī)器。通過(guò)將任務(wù)調(diào)度到以前的位置,F(xiàn)link 將通過(guò)從本地磁盤(pán)讀取檢查點(diǎn)狀態(tài)來(lái)最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。
2.2 Flink 1.8.0 新特性
Flink 1.8.0 引入對(duì)狀態(tài)的清理
使用 TTL(生存時(shí)間)連續(xù)增量清除舊的 Key 狀態(tài) Flink 1.8 引入了對(duì) RocksDB 狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊數(shù)據(jù)的連續(xù)清理。這意味著舊的數(shù)據(jù)將(根據(jù) TTL 設(shè)置)不斷被清理掉。
新增和刪除一些 Table API
1) 引入新的 CSV 格式符(FLINK-9964)
此版本為符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能與 Kafka 一起使用。舊描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系統(tǒng)連接器。
2) 靜態(tài)生成器方法在 TableEnvironment(FLINK-11445)上的棄用
為了將 API 與實(shí)際實(shí)現(xiàn)分開(kāi),TableEnvironment.getTableEnvironment() 不推薦使用靜態(tài)方法?,F(xiàn)在推薦使用Batch/StreamTableEnvironment.create()。
3) 表 API Maven 模塊中的更改(FLINK-11064)
之前具有 flink-table 依賴關(guān)系的用戶需要更新其依賴關(guān)系 flink-table-planner,以及正確的依賴關(guān)系 flink-table-api-*,具體取決于是使用 Java 還是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。
Kafka Connector 的修改
引入可直接訪問(wèn) ConsumerRecord 的新KafkaDeserializationSchema(FLINK-8354),對(duì)于 FlinkKafkaConsumers 推出了一個(gè)新的 KafkaDeserializationSchema,可以直接訪問(wèn) KafkaConsumerRecord。
三、Flink SQL 的編程模型
Flink 的編程模型基礎(chǔ)構(gòu)建模塊是流(streams)與轉(zhuǎn)換 (transformations),每一個(gè)數(shù)據(jù)流起始于一個(gè)或多個(gè) source,并終止于一個(gè)或多個(gè) sink。
相信大家對(duì)上面的圖已經(jīng)十分熟悉了,當(dāng)然基于 Flink SQL 編寫(xiě)的 Flink 程序也離不開(kāi)讀取原始數(shù)據(jù),計(jì)算邏輯和寫(xiě)入計(jì)算結(jié)果數(shù)據(jù)三部分。
一個(gè)完整的 Flink SQL 編寫(xiě)的程序包括如下三部分:
Source Operator:Soruce operator 是對(duì)外部數(shù)據(jù)源的抽象, 目前 Apache Flink 內(nèi)置了很多常用的數(shù)據(jù)源實(shí)現(xiàn)例如 MySQL、Kafka 等;
Transformation Operators:算子操作主要完成例如查詢、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫(kù)支持的操作;
Sink Operator:Sink operator 是對(duì)外結(jié)果表的抽象,目前 Apache Flink 也內(nèi)置了很多常用的結(jié)果表的抽象,比如 Kafka Sink 等
我們通過(guò)用一個(gè)最經(jīng)典的 WordCount 程序作為入門(mén),看一下傳統(tǒng)的基于 DataSet/DataStream API 開(kāi)發(fā)和基于 SQL 開(kāi)發(fā)有哪些不同?
DataStream/DataSetAPI
public class WordCount { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet
Flink SQL
//省略掉初始化環(huán)境等公共代碼SELECT word, COUNT(word) FROM table GROUP BY word;
我們已經(jīng)可以直觀體會(huì)到,SQL 開(kāi)發(fā)的快捷和便利性了。
四、Flink SQL 的語(yǔ)法和算子
4.1 Flink SQL 支持的語(yǔ)法
Flink SQL 核心算子的語(yǔ)義設(shè)計(jì)參考了 1992、2011 等 ANSI-SQL 標(biāo)準(zhǔn),F(xiàn)link 使用 Apache Calcite 解析 SQL ,Calcite 支持標(biāo)準(zhǔn)的 ANSI SQL。
那么 Flink 自身支持的 SQL 語(yǔ)法有哪些呢?
insert: INSERT INTO tableReference queryquery: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]orderItem: expression [ ASC | DESC ]select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* }projectItem: expression [ [ AS ] columnAlias ] | tableAlias . *tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]joinCondition: ON booleanExpression | USING '(' column [, column ]* ')'tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')'values: VALUES expression [, expression ]*groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef: windowName | windowSpecwindowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')'
上面 SQL 的語(yǔ)法支持也已經(jīng)表明了 Flink SQL 對(duì)算子的支持,接下來(lái)我們對(duì) Flink SQL 中最常見(jiàn)的算子語(yǔ)義進(jìn)行介紹。
4.2 Flink SQL 常用算子
SELECT
SELECT 用于從 DataSet/DataStream 中選擇數(shù)據(jù),用于篩選出某些列。
示例:
SELECT * FROM Table;// 取出表中的所有列SELECT name,age FROM Table;// 取出表中 name 和 age 兩列
與此同時(shí) SELECT 語(yǔ)句中可以使用函數(shù)和別名,例如我們上面提到的 WordCount 中:
SELECT word, COUNT(word) FROM table GROUP BY word;
WHERE
WHERE 用于從數(shù)據(jù)集/流中過(guò)濾數(shù)據(jù),與 SELECT 一起使用,用于根據(jù)某些條件對(duì)關(guān)系做水平分割,即選擇符合條件的記錄。
示例:
SELECT name,age FROM Table where name LIKE ‘% 小明 %’;SELECT * FROM Table WHERE age = 20;
WHERE 是從原數(shù)據(jù)中進(jìn)行過(guò)濾,那么在 WHERE 條件中,F(xiàn)link SQL 同樣支持 =、<、>、<>、>=、<=,以及 AND、OR 等表達(dá)式的組合,最終滿足過(guò)濾條件的數(shù)據(jù)會(huì)被選擇出來(lái)。并且 WHERE 可以結(jié)合 IN、NOT IN 聯(lián)合使用。舉個(gè)負(fù)責(zé)的例子:
SELECT name, ageFROM TableWHERE name IN (SELECT name FROM Table2)
DISTINCT
DISTINCT 用于從數(shù)據(jù)集/流中去重根據(jù) SELECT 的結(jié)果進(jìn)行去重。
示例:
SELECT DISTINCT name FROM Table;
對(duì)于流式查詢,計(jì)算查詢結(jié)果所需的 State 可能會(huì)無(wú)限增長(zhǎng),用戶需要自己控制查詢的狀態(tài)范圍,以防止?fàn)顟B(tài)過(guò)大。
GROUP BY
GROUP BY 是對(duì)數(shù)據(jù)進(jìn)行分組操作。例如我們需要計(jì)算成績(jī)明細(xì)表中,每個(gè)學(xué)生的總分。
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
UNION和UNION ALL
UNION 用于將兩個(gè)結(jié)果集合并起來(lái),要求兩個(gè)結(jié)果集字段完全一致,包括字段類型、字段順序。不同于 UNION ALL 的是,UNION 會(huì)對(duì)結(jié)果數(shù)據(jù)去重。
示例:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
JOIN
JOIN 用于把來(lái)自兩個(gè)表的數(shù)據(jù)聯(lián)合起來(lái)形成結(jié)果表,F(xiàn)link 支持的 JOIN 類型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
這里的 JOIN 的語(yǔ)義和我們?cè)陉P(guān)系型數(shù)據(jù)庫(kù)中使用的 JOIN 語(yǔ)義一致。
示例:
JOIN(將訂單表數(shù)據(jù)和商品表進(jìn)行關(guān)聯(lián))SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
LEFT JOIN 與 JOIN 的區(qū)別是當(dāng)右表沒(méi)有與左邊相 JOIN 的數(shù)據(jù)時(shí)候,右邊對(duì)應(yīng)的字段補(bǔ) NULL 輸出,RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個(gè)表交互一下位置。FULL JOIN 相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行 UNION ALL 操作。
示例:
SELECT *FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
Group Window
根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:
Tumble,滾動(dòng)窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無(wú)疊加;
Hop,滑動(dòng)窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
Session,會(huì)話窗口,窗口數(shù)據(jù)沒(méi)有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無(wú)疊加。
Tumble Window
Tumble 滾動(dòng)窗口有固定大小,窗口數(shù)據(jù)不重疊,具體語(yǔ)義如下:
Tumble 滾動(dòng)窗口對(duì)應(yīng)的語(yǔ)法如下:
SELECT [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], TUMBLE(timeCol, size)
其中:
[gk] 決定了是否需要按照字段進(jìn)行聚合;
TUMBLE_START 代表窗口開(kāi)始時(shí)間;
TUMBLE_END 代表窗口結(jié)束時(shí)間;
timeCol 是流表中表示時(shí)間字段;
size 表示窗口的大小,如 秒、分鐘、小時(shí)、天。
舉個(gè)例子,假如我們要計(jì)算每個(gè)人每天的訂單量,按照 user 進(jìn)行聚合分組:
SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;
Hop Window
Hop 滑動(dòng)窗口和滾動(dòng)窗口類似,窗口有固定的 size,與滾動(dòng)窗口不同的是滑動(dòng)窗口可以通過(guò) slide 參數(shù)控制滑動(dòng)窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時(shí)候多個(gè)滑動(dòng)窗口會(huì)重疊,具體語(yǔ)義如下:
Hop 滑動(dòng)窗口對(duì)應(yīng)語(yǔ)法如下:
SELECT [gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1GROUP BY [gk], HOP(timeCol, slide, size)
每次字段的意思和 Tumble 窗口類似:
[gk] 決定了是否需要按照字段進(jìn)行聚合;
HOP_START 表示窗口開(kāi)始時(shí)間;
HOP_END 表示窗口結(jié)束時(shí)間;
timeCol 表示流表中表示時(shí)間字段;
slide 表示每次窗口滑動(dòng)的大?。?/p>
size 表示整個(gè)窗口的大小,如 秒、分鐘、小時(shí)、天。
舉例說(shuō)明,我們要每過(guò)一小時(shí)計(jì)算一次過(guò)去 24 小時(shí)內(nèi)每個(gè)商品的銷量:
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
Session Window
會(huì)話時(shí)間窗口沒(méi)有固定的持續(xù)時(shí)間,但它們的界限由 interval 不活動(dòng)時(shí)間定義,即如果在定義的間隙期間沒(méi)有出現(xiàn)事件,則會(huì)話窗口關(guān)閉。
Seeeion 會(huì)話窗口對(duì)應(yīng)語(yǔ)法如下:
SELECT [gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], SESSION(timeCol, gap)
[gk] 決定了是否需要按照字段進(jìn)行聚合;
SESSION_START 表示窗口開(kāi)始時(shí)間;
SESSION_END 表示窗口結(jié)束時(shí)間;
timeCol 表示流表中表示時(shí)間字段;
gap 表示窗口數(shù)據(jù)非活躍周期的時(shí)長(zhǎng)。
例如,我們需要計(jì)算每個(gè)用戶訪問(wèn)時(shí)間 12 小時(shí)內(nèi)的訂單量:
SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user
五、Flink SQL 的內(nèi)置函數(shù)
Flink 提供大量的內(nèi)置函數(shù)供我們直接使用,我們常用的內(nèi)置函數(shù)分類如下:
比較函數(shù)
邏輯函數(shù)
算術(shù)函數(shù)
字符串處理函數(shù)
時(shí)間函數(shù)
我們接下來(lái)對(duì)每種函數(shù)舉例進(jìn)行講解。
5.1 比較函數(shù)
比較函數(shù)
描述
value1=value2 | 如果 value1 等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN |
value1<>value2 | 如果 value1 不等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN |
value1>value2 | 如果 value1 大于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN |
value1 < value2 | 如果 value1 小于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN |
value IS NULL | 如果 value 為 NULL,則返回 TRUE |
value IS NOT NULL | 如果 value 不為 NULL,則返回 TRUE |
string1 LIKE string2 | 如果 string1 匹配模式 string2,則返回 TRUE ; 如果 string1 或 string2為 NULL,則返回UNKNOWN |
value1 IN (value2, value3…) | 如果給定列表中存在 value1 (value2,value3,…),則返回 TRUE 。當(dāng)(value2,value3,…)包含 NULL,如果可以找到該數(shù)據(jù)元?jiǎng)t返回 TRUE,否則返回 UNKNOWN。如果 value1 為 NULL,則始終返回 UNKNOWN |
5.2 邏輯函數(shù)
邏輯函數(shù)
描述
A OR B | 如果 A 為 TRUE 或 B 為 TRUE,則返回 TRUE |
A AND B | 如果 A 和 B 都為 TRUE,則返回 TRUE |
NOT boolean | 如果 boolean 為 FALSE,則返回 TRUE,否則返回 TRUE。如果 boolean 為 TRUE,則返回 FALSE |
A IS TRUE 或 FALSE | 判斷 A 是否為真 |
— | — |
5.3 算術(shù)函數(shù)
算術(shù)函數(shù)
描述
numeric1 ±*/ numeric2 | 分別代表兩個(gè)數(shù)值加減乘除 |
ABS(numeric) | 返回 numeric 的絕對(duì)值 |
POWER(numeric1, numeric2) | 返回 numeric1 上升到 numeric2 的冪 |
除了上述表中的函數(shù),F(xiàn)link SQL 還支持種類豐富的函數(shù)計(jì)算。
5.4 字符串處理函數(shù)
字符串函數(shù)
描述
UPPER/LOWER | 以大寫(xiě) / 小寫(xiě)形式返回字符串 |
LTRIM(string) | 返回一個(gè)字符串,從去除左空格的字符串, 類似還有 RTRIM |
CONCAT(string1, string2,…) | 返回連接 string1,string2,…的字符串 |
5.5 時(shí)間函數(shù)
時(shí)間函數(shù)
描述
DATE string | 返回以“yyyy-MM-dd”形式從字符串解析的 SQL 日期 |
TIMESTAMP string | 返回以字符串形式解析的 SQL 時(shí)間戳,格式為“yyyy-MM-dd HH:mm:ss [.SSS]” |
CURRENT_DATE | 返回 UTC 時(shí)區(qū)中的當(dāng)前 SQL 日期 |
DATE_FORMAT(timestamp, string) | 返回使用指定格式字符串格式化時(shí)間戳的字符串 |
六、Flink SQL 實(shí)戰(zhàn)應(yīng)用
上面我們分別介紹了 Flink SQL 的背景、新特性、編程模型和常用算子,這部分我們將模擬一個(gè)真實(shí)的案例為大家使用 Flink SQL 提供一個(gè)完整的 Demo。
相信這里應(yīng)該有很多 NBA 的球迷,假設(shè)我們有一份數(shù)據(jù)記錄了每個(gè)賽季的得分王的數(shù)據(jù),包括賽季、球員、出場(chǎng)、首發(fā)、時(shí)間、助攻、搶斷、蓋帽、得分等?,F(xiàn)在我們要統(tǒng)計(jì)獲得得分王榮譽(yù)最多的三名球員。
原數(shù)據(jù)存在 score.csv 文件中,如下:
17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.416-17,拉塞爾-威斯布魯克,81,81,34.6,10.4,1.6,0.4,31.615-16,斯蒂芬-庫(kù)里,79,79,34.2,6.7,2.1,0.2,30.114-15,拉塞爾-威斯布魯克,67,67,34.4,8.6,2.1,0.2,28.113-14,凱文-杜蘭特,81,81,38.5,5.5,1.3,0.7,3212-13,卡梅羅-安東尼,67,67,37,2.6,0.8,0.5,28.711-12,凱文-杜蘭特,66,66,38.6,3.5,1.3,1.2,2810-11,凱文-杜蘭特,78,78,38.9,2.7,1.1,1,27.709-10,凱文-杜蘭特,82,82,39.5,2.8,1.4,1,30.108-09,德維恩-韋德,79,79,38.6,7.5,2.2,1.3,30.207-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,3006-07,科比-布萊恩特,77,77,40.8,5.4,1.4,0.5,31.605-06,科比-布萊恩特,80,80,41,4.5,1.8,0.4,35.404-05,阿倫-艾弗森,75,75,42.3,7.9,2.4,0.1,30.703-04,特雷西·麥克格雷迪,67,67,39.9,5.5,1.4,0.6,2802-03,特雷西·麥克格雷迪,75,74,39.4,5.5,1.7,0.8,32.101-02,阿倫-艾弗森,60,59,43.7,5.5,2.8,0.2,31.400-01,阿倫-艾弗森,71,71,42,4.6,2.5,0.3,31.199-00,沙奎爾-奧尼爾,79,79,40,3.8,0.5,3,29.798-99,阿倫-艾弗森,48,48,41.5,4.6,2.3,0.1,26.897-98,邁克爾-喬丹,82,82,38.8,3.5,1.7,0.5,28.796-97,邁克爾-喬丹,82,82,37.9,4.3,1.7,0.5,29.695-96,邁克爾-喬丹,82,82,37.7,4.3,2.2,0.5,30.494-95,沙奎爾-奧尼爾,79,79,37,2.7,0.9,2.4,29.393-94,大衛(wèi)-羅賓遜,80,80,40.5,4.8,1.7,3.3,29.892-93,邁克爾-喬丹,78,78,39.3,5.5,2.8,0.8,32.691-92,邁克爾-喬丹,80,80,38.8,6.1,2.3,0.9,30.190-91,邁克爾-喬丹,82,82,37,5.5,2.7,1,31.589-90,邁克爾-喬丹,82,82,39,6.3,2.8,0.7,33.688-89,邁克爾-喬丹,81,81,40.2,8,2.9,0.8,32.587-88,邁克爾-喬丹,82,82,40.4,5.9,3.2,1.6,3586-87,邁克爾-喬丹,82,82,40,4.6,2.9,1.5,37.185-86,多米尼克-威爾金斯,78,78,39.1,2.6,1.8,0.6,30.384-85,伯納德-金,55,55,37.5,3.7,1.3,0.3,32.983-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.682-83,阿歷克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.481-82,喬治-格文,79,79,35.7,2.4,1,0.6,32.3
首先我們需要?jiǎng)?chuàng)建一個(gè)工程,并且在 Maven 中有如下依賴:
第一步,創(chuàng)建上下文環(huán)境:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
第二步,讀取 score.csv 并且作為 source 輸入:
DataSet
第三步,將 source 數(shù)據(jù)注冊(cè)成表:
Table topScore = tableEnv.fromDataSet(topInput);tableEnv.registerTable("score", topScore);
第四步,核心處理邏輯 SQL 的編寫(xiě):
Table queryResult = tableEnv.sqlQuery("select player, count(season) as num FROM score GROUP BY player ORDER BY num desc LIMIT 3");
第五步,輸出結(jié)果:
DataSet
我們直接運(yùn)行整個(gè)程序,觀察輸出結(jié)果:
...16:28:06,162 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.16:28:06,162 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.16:28:06,164 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_2.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.16:28:06,169 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.16:28:06,177 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.16:28:06,187 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache16:28:06,187 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache16:28:06,188 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:5170316:28:06,188 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.邁克爾-喬丹:10凱文-杜蘭特:4阿倫-艾弗森:4
我們看到控制臺(tái)已經(jīng)輸出結(jié)果了:
完整的代碼如下:
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;public class TableSQL { public static void main(String[] args) throws Exception{ //1. 獲取上下文環(huán)境 table的環(huán)境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); //2. 讀取score.csv DataSet
當(dāng)然我們也可以自定義一個(gè) Sink,將結(jié)果輸出到一個(gè)文件中,例如:
TableSink sink = new CsvTableSink("/home/result.csv", ","); String[] fieldNames = {"name", "num"}; TypeInformation[] fieldTypes = {Types.STRING, Types.INT}; tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink); sqlQuery.insertInto("result"); env.execute();
然后我們運(yùn)行程序,可以看到 /home 目錄下生成的 result.csv,查看結(jié)果:
邁克爾-喬丹,10凱文-杜蘭特,4阿倫-艾弗森,4
七、總結(jié)
本篇向大家介紹了 Flink SQL 產(chǎn)生的背景,F(xiàn)link SQL 大部分核心功能,并且分別介紹了 Flink SQL 的編程模型和常用算子及內(nèi)置函數(shù)。最后以一個(gè)完整的示例展示了如何編寫(xiě) Flink SQL 程序。Flink SQL 的簡(jiǎn)便易用極大地降低了 Flink 編程的門(mén)檻,是我們必需掌握的使用 Flink 解決流式計(jì)算問(wèn)題最鋒利的武器!
-
編程
+關(guān)注
關(guān)注
88文章
3595瀏覽量
93600 -
SQL
+關(guān)注
關(guān)注
1文章
760瀏覽量
44078 -
應(yīng)用程序
+關(guān)注
關(guān)注
37文章
3243瀏覽量
57603
原文標(biāo)題:Flink最鋒利的武器:Flink SQL入門(mén)和實(shí)戰(zhàn) | 附完整實(shí)現(xiàn)代碼
文章出處:【微信號(hào):rgznai100,微信公眾號(hào):rgznai100】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論