精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

一套符合標(biāo)準(zhǔn)SQL語(yǔ)義的開(kāi)發(fā)語(yǔ)言,link的最新特性

WpOh_rgznai100 ? 來(lái)源:lq ? 2019-07-18 14:12 ? 次閱讀

一、Flink SQL 背景

Flink SQL 是 Flink 實(shí)時(shí)計(jì)算為簡(jiǎn)化計(jì)算模型,降低用戶使用實(shí)時(shí)計(jì)算門(mén)檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn) SQL 語(yǔ)義的開(kāi)發(fā)語(yǔ)言。

自 2015 年開(kāi)始,阿里巴巴開(kāi)始調(diào)研開(kāi)源流計(jì)算引擎,最終決定基于 Flink 打造新一代計(jì)算引擎,針對(duì) Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開(kāi)源,也就是我們熟知的 Blink。Blink 在原來(lái)的 Flink 基礎(chǔ)上最顯著的一個(gè)貢獻(xiàn)就是 Flink SQL 的實(shí)現(xiàn)。

Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計(jì)算領(lǐng)域,比如 Storm、Spark Streaming 都會(huì)提供一些 Function 或者 Datastream API,用戶通過(guò) Java 或 Scala 寫(xiě)業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門(mén)檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。

在這個(gè)背景下,毫無(wú)疑問(wèn),SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因?yàn)槠渚哂袔讉€(gè)非常重要的特點(diǎn):

SQL 屬于設(shè)定式語(yǔ)言,用戶只要表達(dá)清楚需求即可,不需要了解具體做法;

SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計(jì)劃;

SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低;

SQL 非常穩(wěn)定,在數(shù)據(jù)庫(kù) 30 多年的歷史中,SQL 本身變化較少;

流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個(gè)流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。

二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

2.1 Flink 1.7.0 新特性

在 Flink 1.7.0 中,我們更接近實(shí)現(xiàn)快速數(shù)據(jù)處理和以無(wú)縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序的目標(biāo)。最新版本包括一些新功能和改進(jìn),例如對(duì) Scala 2.12 的支持、一次性 S3 文件接收器、復(fù)雜事件處理與流 SQL 的集成等。

Apache Flink 中對(duì) Scala 2.12 的支持(FLINK-7811)

Apache Flink 1.7.0 是第一個(gè)完全支持 Scala 2.12 的版本。這允許用戶使用較新的 Scala 版本編寫(xiě) Flink 應(yīng)用程序并利用 Scala 2.12 生態(tài)系統(tǒng)。

狀態(tài)演進(jìn)(FLINK-9376)

許多情況下,由于需求的變化,長(zhǎng)期運(yùn)行的 Flink 應(yīng)用程序需要在其生命周期內(nèi)發(fā)展。在不失去當(dāng)前應(yīng)用程序進(jìn)度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序發(fā)展的關(guān)鍵要求。使用 Flink 1.7.0,社區(qū)添加了狀態(tài)演變,允許您靈活地調(diào)整長(zhǎng)時(shí)間運(yùn)行的應(yīng)用程序的用戶狀態(tài)模式,同時(shí)保持與以前保存點(diǎn)的兼容性。通過(guò)狀態(tài)演變,可以在狀態(tài)模式中添加或刪除列,以便更改應(yīng)用程序部署后應(yīng)用程序捕獲的業(yè)務(wù)功能?,F(xiàn)在,使用 Avro 生成時(shí),狀態(tài)模式演變現(xiàn)在可以立即使用作為用戶狀態(tài)的類,這意味著可以根據(jù) Avro 的規(guī)范來(lái)演變國(guó)家的架構(gòu)。雖然 Avro 類型是 Flink 1.7 中唯一支持模式演變的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來(lái)的 Flink 版本中進(jìn)一步擴(kuò)展對(duì)其他類型的支持。

MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

這是 Apache Flink 1.7.0 的一個(gè)重要補(bǔ)充,它為 Flink SQL 提供了 MATCH RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能結(jié)合了復(fù)雜事件處理(CEP)和 SQL,可以輕松地對(duì)數(shù)據(jù)流進(jìn)行模式匹配,從而實(shí)現(xiàn)一整套新的用例。此功能目前處于測(cè)試階段,因此我們歡迎社區(qū)提供任何反饋和建議。

流式 SQL 中的時(shí)態(tài)表和時(shí)間連接(FLINK-9712)

時(shí)態(tài)表是 Apache Flink 中的一個(gè)新概念,它為表的更改歷史提供(參數(shù)化)視圖,并在特定時(shí)間點(diǎn)返回表的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表格。隨著時(shí)間的推移,這種表格不斷增長(zhǎng)/發(fā)展,并且增加了新的更新匯率。時(shí)態(tài)表是一種視圖,可以將這些匯率的實(shí)際狀態(tài)返回到任何給定的時(shí)間點(diǎn)。使用這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。時(shí)間聯(lián)接允許使用不斷變化/更新的表來(lái)進(jìn)行內(nèi)存和計(jì)算有效的流數(shù)據(jù)連接。

Streaming SQL 的其他功能

除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴(kuò)展到更多用例。以下內(nèi)置函數(shù)被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會(huì)話中定義視圖。此外,CLI 中添加了基本的 SQL 語(yǔ)句自動(dòng)完成功能。社區(qū)添加了一個(gè) Elasticsearch 6 表接收器,允許存儲(chǔ)動(dòng)態(tài)表的更新結(jié)果。

Kafka 2.0 連接器(FLINK-10598)

Apache Flink 1.7.0 繼續(xù)添加更多連接器,使其更容易與更多外部系統(tǒng)進(jìn)行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,該連接器允許通過(guò)一次性保證讀取和寫(xiě)入 Kafka 2.0。

本地恢復(fù)(FLINK-9635)

Apache Flink 1.7.0 通過(guò)擴(kuò)展 Flink 的調(diào)度來(lái)完成本地恢復(fù)功能,以便在恢復(fù)時(shí)考慮以前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將保留最新檢查點(diǎn)的本地副本任務(wù)運(yùn)行的機(jī)器。通過(guò)將任務(wù)調(diào)度到以前的位置,F(xiàn)link 將通過(guò)從本地磁盤(pán)讀取檢查點(diǎn)狀態(tài)來(lái)最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。

2.2 Flink 1.8.0 新特性

Flink 1.8.0 引入對(duì)狀態(tài)的清理

使用 TTL(生存時(shí)間)連續(xù)增量清除舊的 Key 狀態(tài) Flink 1.8 引入了對(duì) RocksDB 狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊數(shù)據(jù)的連續(xù)清理。這意味著舊的數(shù)據(jù)將(根據(jù) TTL 設(shè)置)不斷被清理掉。

新增和刪除一些 Table API

1) 引入新的 CSV 格式符(FLINK-9964)

此版本為符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能與 Kafka 一起使用。舊描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系統(tǒng)連接器。

2) 靜態(tài)生成器方法在 TableEnvironment(FLINK-11445)上的棄用

為了將 API 與實(shí)際實(shí)現(xiàn)分開(kāi),TableEnvironment.getTableEnvironment() 不推薦使用靜態(tài)方法?,F(xiàn)在推薦使用Batch/StreamTableEnvironment.create()。

3) 表 API Maven 模塊中的更改(FLINK-11064)

之前具有 flink-table 依賴關(guān)系的用戶需要更新其依賴關(guān)系 flink-table-planner,以及正確的依賴關(guān)系 flink-table-api-*,具體取決于是使用 Java 還是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

Kafka Connector 的修改

引入可直接訪問(wèn) ConsumerRecord 的新KafkaDeserializationSchema(FLINK-8354),對(duì)于 FlinkKafkaConsumers 推出了一個(gè)新的 KafkaDeserializationSchema,可以直接訪問(wèn) KafkaConsumerRecord。

三、Flink SQL 的編程模型

Flink 的編程模型基礎(chǔ)構(gòu)建模塊是流(streams)與轉(zhuǎn)換 (transformations),每一個(gè)數(shù)據(jù)流起始于一個(gè)或多個(gè) source,并終止于一個(gè)或多個(gè) sink。

相信大家對(duì)上面的圖已經(jīng)十分熟悉了,當(dāng)然基于 Flink SQL 編寫(xiě)的 Flink 程序也離不開(kāi)讀取原始數(shù)據(jù),計(jì)算邏輯和寫(xiě)入計(jì)算結(jié)果數(shù)據(jù)三部分。

一個(gè)完整的 Flink SQL 編寫(xiě)的程序包括如下三部分:

Source Operator:Soruce operator 是對(duì)外部數(shù)據(jù)源的抽象, 目前 Apache Flink 內(nèi)置了很多常用的數(shù)據(jù)源實(shí)現(xiàn)例如 MySQL、Kafka 等;

Transformation Operators:算子操作主要完成例如查詢、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫(kù)支持的操作;

Sink Operator:Sink operator 是對(duì)外結(jié)果表的抽象,目前 Apache Flink 也內(nèi)置了很多常用的結(jié)果表的抽象,比如 Kafka Sink 等

我們通過(guò)用一個(gè)最經(jīng)典的 WordCount 程序作為入門(mén),看一下傳統(tǒng)的基于 DataSet/DataStream API 開(kāi)發(fā)和基于 SQL 開(kāi)發(fā)有哪些不同?

DataStream/DataSetAPI

public class WordCount { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Hello", "Flink", "Hello", "Blink" ); DataSet> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); counts.print(); } public static final class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { String[] tokens = value.toLowerCase().split("\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2(token, 1)); } } } }}

Flink SQL

//省略掉初始化環(huán)境等公共代碼SELECT word, COUNT(word) FROM table GROUP BY word;

我們已經(jīng)可以直觀體會(huì)到,SQL 開(kāi)發(fā)的快捷和便利性了。

四、Flink SQL 的語(yǔ)法和算子

4.1 Flink SQL 支持的語(yǔ)法

Flink SQL 核心算子的語(yǔ)義設(shè)計(jì)參考了 1992、2011 等 ANSI-SQL 標(biāo)準(zhǔn),F(xiàn)link 使用 Apache Calcite 解析 SQL ,Calcite 支持標(biāo)準(zhǔn)的 ANSI SQL。

那么 Flink 自身支持的 SQL 語(yǔ)法有哪些呢?

insert: INSERT INTO tableReference queryquery: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]orderItem: expression [ ASC | DESC ]select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* }projectItem: expression [ [ AS ] columnAlias ] | tableAlias . *tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]joinCondition: ON booleanExpression | USING '(' column [, column ]* ')'tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')'values: VALUES expression [, expression ]*groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef: windowName | windowSpecwindowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')'

上面 SQL 的語(yǔ)法支持也已經(jīng)表明了 Flink SQL 對(duì)算子的支持,接下來(lái)我們對(duì) Flink SQL 中最常見(jiàn)的算子語(yǔ)義進(jìn)行介紹。

4.2 Flink SQL 常用算子

SELECT

SELECT 用于從 DataSet/DataStream 中選擇數(shù)據(jù),用于篩選出某些列。

示例:

SELECT * FROM Table;// 取出表中的所有列SELECT name,age FROM Table;// 取出表中 name 和 age 兩列

與此同時(shí) SELECT 語(yǔ)句中可以使用函數(shù)和別名,例如我們上面提到的 WordCount 中:

SELECT word, COUNT(word) FROM table GROUP BY word;

WHERE

WHERE 用于從數(shù)據(jù)集/流中過(guò)濾數(shù)據(jù),與 SELECT 一起使用,用于根據(jù)某些條件對(duì)關(guān)系做水平分割,即選擇符合條件的記錄。

示例:

SELECT name,age FROM Table where name LIKE ‘% 小明 %’;SELECT * FROM Table WHERE age = 20;

WHERE 是從原數(shù)據(jù)中進(jìn)行過(guò)濾,那么在 WHERE 條件中,F(xiàn)link SQL 同樣支持 =、<、>、<>、>=、<=,以及 AND、OR 等表達(dá)式的組合,最終滿足過(guò)濾條件的數(shù)據(jù)會(huì)被選擇出來(lái)。并且 WHERE 可以結(jié)合 IN、NOT IN 聯(lián)合使用。舉個(gè)負(fù)責(zé)的例子:

SELECT name, ageFROM TableWHERE name IN (SELECT name FROM Table2)

DISTINCT

DISTINCT 用于從數(shù)據(jù)集/流中去重根據(jù) SELECT 的結(jié)果進(jìn)行去重。

示例:

SELECT DISTINCT name FROM Table;

對(duì)于流式查詢,計(jì)算查詢結(jié)果所需的 State 可能會(huì)無(wú)限增長(zhǎng),用戶需要自己控制查詢的狀態(tài)范圍,以防止?fàn)顟B(tài)過(guò)大。

GROUP BY

GROUP BY 是對(duì)數(shù)據(jù)進(jìn)行分組操作。例如我們需要計(jì)算成績(jī)明細(xì)表中,每個(gè)學(xué)生的總分。

SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;

UNION和UNION ALL

UNION 用于將兩個(gè)結(jié)果集合并起來(lái),要求兩個(gè)結(jié)果集字段完全一致,包括字段類型、字段順序。不同于 UNION ALL 的是,UNION 會(huì)對(duì)結(jié)果數(shù)據(jù)去重。

示例:

SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;

JOIN

JOIN 用于把來(lái)自兩個(gè)表的數(shù)據(jù)聯(lián)合起來(lái)形成結(jié)果表,F(xiàn)link 支持的 JOIN 類型包括:

JOIN - INNER JOIN

LEFT JOIN - LEFT OUTER JOIN

RIGHT JOIN - RIGHT OUTER JOIN

FULL JOIN - FULL OUTER JOIN

這里的 JOIN 的語(yǔ)義和我們?cè)陉P(guān)系型數(shù)據(jù)庫(kù)中使用的 JOIN 語(yǔ)義一致。

示例:

JOIN(將訂單表數(shù)據(jù)和商品表進(jìn)行關(guān)聯(lián))SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)

LEFT JOIN 與 JOIN 的區(qū)別是當(dāng)右表沒(méi)有與左邊相 JOIN 的數(shù)據(jù)時(shí)候,右邊對(duì)應(yīng)的字段補(bǔ) NULL 輸出,RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個(gè)表交互一下位置。FULL JOIN 相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行 UNION ALL 操作。

示例:

SELECT *FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)SELECT *FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)

Group Window

根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:

Tumble,滾動(dòng)窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無(wú)疊加;

Hop,滑動(dòng)窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;

Session,會(huì)話窗口,窗口數(shù)據(jù)沒(méi)有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無(wú)疊加。

Tumble Window

Tumble 滾動(dòng)窗口有固定大小,窗口數(shù)據(jù)不重疊,具體語(yǔ)義如下:

Tumble 滾動(dòng)窗口對(duì)應(yīng)的語(yǔ)法如下:

SELECT [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], TUMBLE(timeCol, size)

其中:

[gk] 決定了是否需要按照字段進(jìn)行聚合;

TUMBLE_START 代表窗口開(kāi)始時(shí)間;

TUMBLE_END 代表窗口結(jié)束時(shí)間;

timeCol 是流表中表示時(shí)間字段;

size 表示窗口的大小,如 秒、分鐘、小時(shí)、天。

舉個(gè)例子,假如我們要計(jì)算每個(gè)人每天的訂單量,按照 user 進(jìn)行聚合分組:

SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;

Hop Window

Hop 滑動(dòng)窗口和滾動(dòng)窗口類似,窗口有固定的 size,與滾動(dòng)窗口不同的是滑動(dòng)窗口可以通過(guò) slide 參數(shù)控制滑動(dòng)窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時(shí)候多個(gè)滑動(dòng)窗口會(huì)重疊,具體語(yǔ)義如下:

Hop 滑動(dòng)窗口對(duì)應(yīng)語(yǔ)法如下:

SELECT [gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1GROUP BY [gk], HOP(timeCol, slide, size)

每次字段的意思和 Tumble 窗口類似:

[gk] 決定了是否需要按照字段進(jìn)行聚合;

HOP_START 表示窗口開(kāi)始時(shí)間;

HOP_END 表示窗口結(jié)束時(shí)間;

timeCol 表示流表中表示時(shí)間字段;

slide 表示每次窗口滑動(dòng)的大?。?/p>

size 表示整個(gè)窗口的大小,如 秒、分鐘、小時(shí)、天。

舉例說(shuō)明,我們要每過(guò)一小時(shí)計(jì)算一次過(guò)去 24 小時(shí)內(nèi)每個(gè)商品的銷量:

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

Session Window

會(huì)話時(shí)間窗口沒(méi)有固定的持續(xù)時(shí)間,但它們的界限由 interval 不活動(dòng)時(shí)間定義,即如果在定義的間隙期間沒(méi)有出現(xiàn)事件,則會(huì)話窗口關(guān)閉。

Seeeion 會(huì)話窗口對(duì)應(yīng)語(yǔ)法如下:

SELECT [gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN)FROM Tab1GROUP BY [gk], SESSION(timeCol, gap)

[gk] 決定了是否需要按照字段進(jìn)行聚合;

SESSION_START 表示窗口開(kāi)始時(shí)間;

SESSION_END 表示窗口結(jié)束時(shí)間;

timeCol 表示流表中表示時(shí)間字段;

gap 表示窗口數(shù)據(jù)非活躍周期的時(shí)長(zhǎng)。

例如,我們需要計(jì)算每個(gè)用戶訪問(wèn)時(shí)間 12 小時(shí)內(nèi)的訂單量:

SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user

五、Flink SQL 的內(nèi)置函數(shù)

Flink 提供大量的內(nèi)置函數(shù)供我們直接使用,我們常用的內(nèi)置函數(shù)分類如下:

比較函數(shù)

邏輯函數(shù)

算術(shù)函數(shù)

字符串處理函數(shù)

時(shí)間函數(shù)

我們接下來(lái)對(duì)每種函數(shù)舉例進(jìn)行講解。

5.1 比較函數(shù)

比較函數(shù)

描述

value1=value2 如果 value1 等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1<>value2 如果 value1 不等于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1>value2 如果 value1 大于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value1 < value2 如果 value1 小于 value2,則返回 TRUE ; 如果 value1 或 value2 為 NULL,則返回 UNKNOWN
value IS NULL 如果 value 為 NULL,則返回 TRUE
value IS NOT NULL 如果 value 不為 NULL,則返回 TRUE
string1 LIKE string2 如果 string1 匹配模式 string2,則返回 TRUE ; 如果 string1 或 string2為 NULL,則返回UNKNOWN
value1 IN (value2, value3…) 如果給定列表中存在 value1 (value2,value3,…),則返回 TRUE 。當(dāng)(value2,value3,…)包含 NULL,如果可以找到該數(shù)據(jù)元?jiǎng)t返回 TRUE,否則返回 UNKNOWN。如果 value1 為 NULL,則始終返回 UNKNOWN

5.2 邏輯函數(shù)

邏輯函數(shù)

描述

A OR B 如果 A 為 TRUE 或 B 為 TRUE,則返回 TRUE
A AND B 如果 A 和 B 都為 TRUE,則返回 TRUE
NOT boolean 如果 boolean 為 FALSE,則返回 TRUE,否則返回 TRUE。如果 boolean 為 TRUE,則返回 FALSE
A IS TRUE 或 FALSE 判斷 A 是否為真

5.3 算術(shù)函數(shù)

算術(shù)函數(shù)

描述

numeric1 ±*/ numeric2 分別代表兩個(gè)數(shù)值加減乘除
ABS(numeric) 返回 numeric 的絕對(duì)值
POWER(numeric1, numeric2) 返回 numeric1 上升到 numeric2 的冪

除了上述表中的函數(shù),F(xiàn)link SQL 還支持種類豐富的函數(shù)計(jì)算。

5.4 字符串處理函數(shù)

字符串函數(shù)

描述

UPPER/LOWER 以大寫(xiě) / 小寫(xiě)形式返回字符串
LTRIM(string) 返回一個(gè)字符串,從去除左空格的字符串, 類似還有 RTRIM
CONCAT(string1, string2,…) 返回連接 string1,string2,…的字符串

5.5 時(shí)間函數(shù)

時(shí)間函數(shù)

描述

DATE string 返回以“yyyy-MM-dd”形式從字符串解析的 SQL 日期
TIMESTAMP string 返回以字符串形式解析的 SQL 時(shí)間戳,格式為“yyyy-MM-dd HH:mm:ss [.SSS]”
CURRENT_DATE 返回 UTC 時(shí)區(qū)中的當(dāng)前 SQL 日期
DATE_FORMAT(timestamp, string) 返回使用指定格式字符串格式化時(shí)間戳的字符串

六、Flink SQL 實(shí)戰(zhàn)應(yīng)用

上面我們分別介紹了 Flink SQL 的背景、新特性、編程模型和常用算子,這部分我們將模擬一個(gè)真實(shí)的案例為大家使用 Flink SQL 提供一個(gè)完整的 Demo。

相信這里應(yīng)該有很多 NBA 的球迷,假設(shè)我們有一份數(shù)據(jù)記錄了每個(gè)賽季的得分王的數(shù)據(jù),包括賽季、球員、出場(chǎng)、首發(fā)、時(shí)間、助攻、搶斷、蓋帽、得分等?,F(xiàn)在我們要統(tǒng)計(jì)獲得得分王榮譽(yù)最多的三名球員。

原數(shù)據(jù)存在 score.csv 文件中,如下:

17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.416-17,拉塞爾-威斯布魯克,81,81,34.6,10.4,1.6,0.4,31.615-16,斯蒂芬-庫(kù)里,79,79,34.2,6.7,2.1,0.2,30.114-15,拉塞爾-威斯布魯克,67,67,34.4,8.6,2.1,0.2,28.113-14,凱文-杜蘭特,81,81,38.5,5.5,1.3,0.7,3212-13,卡梅羅-安東尼,67,67,37,2.6,0.8,0.5,28.711-12,凱文-杜蘭特,66,66,38.6,3.5,1.3,1.2,2810-11,凱文-杜蘭特,78,78,38.9,2.7,1.1,1,27.709-10,凱文-杜蘭特,82,82,39.5,2.8,1.4,1,30.108-09,德維恩-韋德,79,79,38.6,7.5,2.2,1.3,30.207-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,3006-07,科比-布萊恩特,77,77,40.8,5.4,1.4,0.5,31.605-06,科比-布萊恩特,80,80,41,4.5,1.8,0.4,35.404-05,阿倫-艾弗森,75,75,42.3,7.9,2.4,0.1,30.703-04,特雷西·麥克格雷迪,67,67,39.9,5.5,1.4,0.6,2802-03,特雷西·麥克格雷迪,75,74,39.4,5.5,1.7,0.8,32.101-02,阿倫-艾弗森,60,59,43.7,5.5,2.8,0.2,31.400-01,阿倫-艾弗森,71,71,42,4.6,2.5,0.3,31.199-00,沙奎爾-奧尼爾,79,79,40,3.8,0.5,3,29.798-99,阿倫-艾弗森,48,48,41.5,4.6,2.3,0.1,26.897-98,邁克爾-喬丹,82,82,38.8,3.5,1.7,0.5,28.796-97,邁克爾-喬丹,82,82,37.9,4.3,1.7,0.5,29.695-96,邁克爾-喬丹,82,82,37.7,4.3,2.2,0.5,30.494-95,沙奎爾-奧尼爾,79,79,37,2.7,0.9,2.4,29.393-94,大衛(wèi)-羅賓遜,80,80,40.5,4.8,1.7,3.3,29.892-93,邁克爾-喬丹,78,78,39.3,5.5,2.8,0.8,32.691-92,邁克爾-喬丹,80,80,38.8,6.1,2.3,0.9,30.190-91,邁克爾-喬丹,82,82,37,5.5,2.7,1,31.589-90,邁克爾-喬丹,82,82,39,6.3,2.8,0.7,33.688-89,邁克爾-喬丹,81,81,40.2,8,2.9,0.8,32.587-88,邁克爾-喬丹,82,82,40.4,5.9,3.2,1.6,3586-87,邁克爾-喬丹,82,82,40,4.6,2.9,1.5,37.185-86,多米尼克-威爾金斯,78,78,39.1,2.6,1.8,0.6,30.384-85,伯納德-金,55,55,37.5,3.7,1.3,0.3,32.983-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.682-83,阿歷克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.481-82,喬治-格文,79,79,35.7,2.4,1,0.6,32.3

首先我們需要?jiǎng)?chuàng)建一個(gè)工程,并且在 Maven 中有如下依賴:

UTF-8 1.7.1 1.7.7 1.2.17 2.11 org.apache.flink flink-core ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-table_2.11 1.7.1 org.apache.flink flink-streaming-scala_${scala.binary.version} 1.7.1 org.slf4j slf4j-log4j12 ${slf4j.version} log4j log4j ${log4j.version}

第一步,創(chuàng)建上下文環(huán)境:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

第二步,讀取 score.csv 并且作為 source 輸入:

DataSet input = env.readTextFile("score.csv"); DataSet topInput = input.map(new MapFunction() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); } });其中的PlayerData類為自定義類:public static class PlayerData { /** * 賽季,球員,出場(chǎng),首發(fā),時(shí)間,助攻,搶斷,蓋帽,得分 */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } }

第三步,將 source 數(shù)據(jù)注冊(cè)成表:

Table topScore = tableEnv.fromDataSet(topInput);tableEnv.registerTable("score", topScore);

第四步,核心處理邏輯 SQL 的編寫(xiě):

Table queryResult = tableEnv.sqlQuery("select player, count(season) as num FROM score GROUP BY player ORDER BY num desc LIMIT 3");

第五步,輸出結(jié)果:

DataSet result = tableEnv.toDataSet(queryResult, Result.class);result.print();

我們直接運(yùn)行整個(gè)程序,觀察輸出結(jié)果:

...16:28:06,162 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Shut down complete.16:28:06,162 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.16:28:06,164 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_2.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.16:28:06,166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.16:28:06,169 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.16:28:06,177 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.16:28:06,187 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache16:28:06,187 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache16:28:06,188 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:5170316:28:06,188 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.邁克爾-喬丹:10凱文-杜蘭特:4阿倫-艾弗森:4

我們看到控制臺(tái)已經(jīng)輸出結(jié)果了:

完整的代碼如下:

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;public class TableSQL { public static void main(String[] args) throws Exception{ //1. 獲取上下文環(huán)境 table的環(huán)境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); //2. 讀取score.csv DataSet input = env.readTextFile("score.csv"); input.print(); DataSet topInput = input.map(new MapFunction() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); } }); //3. 注冊(cè)成內(nèi)存表 Table topScore = tableEnv.fromDataSet(topInput); tableEnv.registerTable("score", topScore); //4. 編寫(xiě)sql 然后提交執(zhí)行 //select player, count(season) as num from score group by player order by num desc; Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3"); //5. 結(jié)果進(jìn)行打印 DataSet result = tableEnv.toDataSet(queryResult, Result.class); result.print(); } public static class PlayerData { /** * 賽季,球員,出場(chǎng),首發(fā),時(shí)間,助攻,搶斷,蓋帽,得分 */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } } public static class Result { public String player; public Long num; public Result() { super(); } public Result(String player, Long num) { this.player = player; this.num = num; } @Override public String toString() { return player + ":" + num; } }}//

當(dāng)然我們也可以自定義一個(gè) Sink,將結(jié)果輸出到一個(gè)文件中,例如:

TableSink sink = new CsvTableSink("/home/result.csv", ","); String[] fieldNames = {"name", "num"}; TypeInformation[] fieldTypes = {Types.STRING, Types.INT}; tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink); sqlQuery.insertInto("result"); env.execute();

然后我們運(yùn)行程序,可以看到 /home 目錄下生成的 result.csv,查看結(jié)果:

邁克爾-喬丹,10凱文-杜蘭特,4阿倫-艾弗森,4

七、總結(jié)

本篇向大家介紹了 Flink SQL 產(chǎn)生的背景,F(xiàn)link SQL 大部分核心功能,并且分別介紹了 Flink SQL 的編程模型和常用算子及內(nèi)置函數(shù)。最后以一個(gè)完整的示例展示了如何編寫(xiě) Flink SQL 程序。Flink SQL 的簡(jiǎn)便易用極大地降低了 Flink 編程的門(mén)檻,是我們必需掌握的使用 Flink 解決流式計(jì)算問(wèn)題最鋒利的武器!

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3595

    瀏覽量

    93600
  • SQL
    SQL
    +關(guān)注

    關(guān)注

    1

    文章

    760

    瀏覽量

    44078
  • 應(yīng)用程序
    +關(guān)注

    關(guān)注

    37

    文章

    3243

    瀏覽量

    57603

原文標(biāo)題:Flink最鋒利的武器:Flink SQL入門(mén)和實(shí)戰(zhàn) | 附完整實(shí)現(xiàn)代碼

文章出處:【微信號(hào):rgznai100,微信公眾號(hào):rgznai100】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    一套常用的AD封裝庫(kù)

    一套常用的AD封裝庫(kù)
    發(fā)表于 01-24 15:22

    NLPIR語(yǔ)義分析是對(duì)自然語(yǔ)言處理的完美理解

    ?! ∪祟愖匀?b class='flag-5'>語(yǔ)言通常以詞為基本構(gòu)成單位,進(jìn)而構(gòu)成句子,再由句子形成篇章。篇章的語(yǔ)義由篇章中包含的所有句子的語(yǔ)義綜合而成,而句子的語(yǔ)義又由句中的詞語(yǔ)
    發(fā)表于 10-19 11:34

    一套完整的PCB布線設(shè)計(jì)

    來(lái)源:互聯(lián)網(wǎng)PCB 布線設(shè)計(jì)中,對(duì)于布通率的的提高有一套完整的方法,在此,我們?yōu)榇蠹姨峁┨岣?PCB 設(shè)計(jì)布通率以及設(shè)計(jì)效率的有效技巧,不僅能為客戶節(jié)省項(xiàng)目開(kāi)發(fā)周期,還能最大限度的保證設(shè)計(jì)成品的質(zhì)量。
    發(fā)表于 10-22 08:00

    如何制作一套波形發(fā)生系統(tǒng)?

    本設(shè)計(jì)中涉及到單片機(jī)匯編語(yǔ)言、VHDL語(yǔ)言的運(yùn)用,充分地利用了二者的優(yōu)點(diǎn),制作了一套波形發(fā)生系統(tǒng)。
    發(fā)表于 04-21 06:09

    分享一套通用的開(kāi)發(fā)環(huán)境搭建教程

    針對(duì)嵌入式的開(kāi)發(fā),不同的開(kāi)發(fā)者使用不同的操作系統(tǒng),可能是Windows 或者 Linux,隨之而來(lái)所搭建的開(kāi)發(fā)環(huán)境亦不同。所以在這里希望分享一套通用的
    發(fā)表于 11-05 06:23

    一套支持中文C語(yǔ)言編程的鴻蒙Hi3861智能硬件開(kāi)發(fā)套件

    由于目前學(xué)習(xí)和開(kāi)發(fā)開(kāi)源鴻蒙Hi3861難度比較大,整了一套支持中文C語(yǔ)言編程的鴻蒙Hi3861智能硬件開(kāi)發(fā)套件,難度直接從專業(yè)級(jí)降到入門(mén)級(jí),主要在開(kāi)源鴻蒙
    發(fā)表于 04-10 20:34

    慕尼黑大學(xué)開(kāi)發(fā)一套RFID應(yīng)用模型

    慕尼黑科技大學(xué)了開(kāi)發(fā)一套RFID應(yīng)用模型:在木材采伐機(jī)上安裝支工業(yè)訂槍,將RFID標(biāo)簽訂在切割下來(lái)的圓木的表面上。這套系統(tǒng)可以幫助木材公司追蹤木材的多重處理流程和運(yùn)往加工廠的運(yùn)送過(guò)程。 慕尼黑
    發(fā)表于 12-13 14:26 ?761次閱讀

    關(guān)系數(shù)據(jù)庫(kù)標(biāo)準(zhǔn)語(yǔ)言SQL的資料說(shuō)明

    本文檔的主要內(nèi)容詳細(xì)介紹的是關(guān)系數(shù)據(jù)庫(kù)標(biāo)準(zhǔn)語(yǔ)言SQL的資料說(shuō)明。
    發(fā)表于 03-23 16:51 ?6次下載
    關(guān)系數(shù)據(jù)庫(kù)<b class='flag-5'>標(biāo)準(zhǔn)語(yǔ)言</b><b class='flag-5'>SQL</b>的資料說(shuō)明

    如何開(kāi)發(fā)一套高精密的LCR阻抗標(biāo)準(zhǔn)的研究說(shuō)明

    針對(duì)當(dāng)前LCR阻抗標(biāo)準(zhǔn)器存在的關(guān)鍵技術(shù)問(wèn)題,通過(guò)研究,開(kāi)發(fā)一套高精密的LCR阻抗標(biāo)準(zhǔn).
    發(fā)表于 12-08 08:00 ?1次下載
    如何<b class='flag-5'>開(kāi)發(fā)</b><b class='flag-5'>一套</b>高精密的LCR阻抗<b class='flag-5'>標(biāo)準(zhǔn)</b>的研究說(shuō)明

    一套基于ARM的卷煙防偽識(shí)別系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)

    為有效打擊假冒、走私、串貨卷煙等違法行為,采用信息技術(shù),設(shè)計(jì)并開(kāi)發(fā)一套卷煙防偽識(shí)別系統(tǒng)。以卷煙32位代碼作
    的頭像 發(fā)表于 05-05 09:05 ?1777次閱讀
    <b class='flag-5'>一套</b>基于ARM的卷煙防偽識(shí)別系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)

    altiumdesigner多少錢(qián)一套

    altiumdesigner多少錢(qián)一套 altiumdesigner是很多工程師都要用到的設(shè)計(jì)工具,那么altiumdesigner多少錢(qián)一套? altiumdesigner口碑評(píng)價(jià)很好
    的頭像 發(fā)表于 04-07 11:36 ?3.3w次閱讀
    altiumdesigner多少錢(qián)<b class='flag-5'>一套</b>

    開(kāi)發(fā)支持符合AUTOSAR標(biāo)準(zhǔn)的軟件組件的建模特定領(lǐng)域的語(yǔ)言

    開(kāi)發(fā)支持符合AUTOSAR標(biāo)準(zhǔn)的軟件組件的建模特定領(lǐng)域的語(yǔ)言。支持建模軟件組件的應(yīng)用是基于TextX python模塊和內(nèi)部開(kāi)發(fā)的建模框架。
    的頭像 發(fā)表于 10-08 11:19 ?1060次閱讀

    文掌握MyBatis的動(dòng)態(tài)SQL使用與原理

    摘要:使用動(dòng)態(tài) SQL 并非件易事,但借助可用于任何 SQL 映射語(yǔ)句中的強(qiáng)大的動(dòng)態(tài) SQL 語(yǔ)言,MyBatis 顯著地提升了這
    的頭像 發(fā)表于 01-06 11:27 ?951次閱讀

    如何用java語(yǔ)言開(kāi)發(fā)一套數(shù)字化產(chǎn)科系統(tǒng)? 數(shù)字化產(chǎn)科管理平臺(tái)源碼

    如何用java語(yǔ)言開(kāi)發(fā)一套數(shù)字化產(chǎn)科系統(tǒng) 數(shù)字化產(chǎn)科管理平臺(tái)源碼
    的頭像 發(fā)表于 07-06 09:38 ?979次閱讀
    如何用java<b class='flag-5'>語(yǔ)言</b><b class='flag-5'>開(kāi)發(fā)</b><b class='flag-5'>一套</b>數(shù)字化產(chǎn)科系統(tǒng)? 數(shù)字化產(chǎn)科管理平臺(tái)源碼

    Vector推出一套基于Visual Studio Code的免費(fèi)插件

    編寫(xiě)工具(如Vector的CAPL Browser)雖然功能強(qiáng)大,但是結(jié)合Visual Studio Code更能夠滿足多語(yǔ)言編程和集成現(xiàn)代開(kāi)發(fā)工具的需求。 因此,Vector推出一套基于Visual
    的頭像 發(fā)表于 11-24 14:15 ?117次閱讀
    Vector推出<b class='flag-5'>一套</b>基于Visual Studio Code的免費(fèi)插件