1 引言
隨著移動互聯網的不斷發展以及各類智能設備日益深入民眾日常生活中,人類社會產生的數據量正在以指數級快速增長,人類已經正式邁入大數據時代[1]。如今,運營商能夠獲得的用戶數據越來越豐富,通過DPI(Deep Packet Inspector,深度分組檢測)分析技術,能夠較好地識別網絡上的流量類別、應用層上的應用種類等[2]。在這個“數據為王”的時代,如何充分利用這筆重要的戰略資產已成為重中之重。
數據規模的快速增長給大數據分析處理帶來了巨大的挑戰,尤其是在通信行業,數據越發呈現出無限性、突發性和實時性等特征[3],傳統的基于MapReduce的批處理模式難以滿足數據實時性的要求,而能否在第一時間獲得數據所蘊含的信息決定了數據的價值。因此,流式處理技術成為大數據技術研究的新熱點[4]。流式處理能夠針對數據的變化進行實時處理,能夠在秒級獲得處理結果,特別適合一些對時效性要求很高的場景。
本文結合電信運營商的需求,對DPI數據進行實時的采集及處理,提出一種基于流式計算的DPI數據處理方案,能夠將獲得DPI數據實時信息的時延降低到分鐘級,甚至秒級,實現對電信用戶上網信息的實時處理、監測及分類匯總,為之后進行的大數據應用提供了良好基礎。
2 流式處理概述
傳統基于MapReduce大數據處理技術實際上是一種批處理方式,如圖1所示。批處理模式首先要完成數據的累積和存儲,然后Hadoop客戶端將數據上傳到HDFS上,最后才啟動Map/Reduce進行數據處理,處理后再寫入到HDFS。這種方式必須要所有數據都要準備好,然后統一進行集中計算和價值發現,無法滿足實時性的要求。
2015年,Nathan Marz提出了實時大數據處理框架Lambda架構[5],整合了離線計算和實時計算,能夠滿足實時系統高容錯、低時延和可擴展等要求,并且可集成Hadoop、Kafka、Storm、Spark及HBase等各類大數據組件。
一個典型的Lambda架構如圖2所示,主要使用的場景是邏輯復雜且延遲低的程序。數據會分別灌入實時系統和批處理系統,然后各自輸出自己的結果,結果會在查詢端進行合并。
圖2 Lambda架構圖
3 流式計算架構對比
流式計算對系統的容錯、時延、可擴展及可靠性能力提出了很高的要求,當前有許多流式計算框架(如Spark streaming[10]、Storm[11]、Kafka Stream[12]、Flink[13]和PipelineDB[14]等)已經廣泛應用于各行各業,并且還在不斷迭代發展,適用的場景也各不相同。
3.1 Spark streaming
Spark是由加州大學伯克利分校AMP實驗室專門為大數據處理而設計的計算框架[6]。Spark Streaming是建立在Spark上的實時計算框架,是Spark的核心組件之一,通過它內置的API和基于內存的高效引擎,用戶可以結合流處理、批處理和交互式查詢開發應用。
Spark Streaming并不像其他流式處理框架每次只處理一條記錄,而是將流數據離散化處理,每次處理一批數據(DStream),使之能夠進行秒級以下的快速批處理,執行流程如圖3所示。Spark Streaming的Receiver并行接收數據,將數據緩存至內存中,經過延遲優化后Spark引擎對短任務(幾十毫秒)進行批處理。這樣設計的好處讓Spark Streaming能夠同時處理離線處理和流處理問題。
圖3 Spark Streaming執行流程
Spark Streaming能在故障報錯下迅速恢復狀態,整合了批處理與流處理,內置豐富高級算法處理庫,發展迅速,社區活躍。毫無疑問,Spark Streaming是流式處理框架的佼佼者。缺點是由于需要累積一批小文件才處理,因此時延會稍大,是準實時系統。
3.2 Storm
Storm通常被比作“實時的Hadoop”,是Twitter開發的實時、分布式以及具備高容錯計算系統,可以簡單、可靠地處理大量數據流,用戶可以采用任意編程語言來開發應用。
在Storm中,一個用于實時計算的圖狀結構稱之為拓撲(topology),拓撲提交到集群,由集群中的主控節點分發代碼,分配任務到工作節點執行。一個拓撲中包括spout和bolt兩種角色,其中spout發送消息,負責將數據流以tuple元組的形式發送出去;而bolt則負責轉換這些數據流,在bolt中可以完成映射map、過濾filter等操作,bolt自身也可以隨機將數據發送給其他bolt。
圖4 Storm數據流動
Storm能將數據在不同的bolt中流動、移動數據,真正實現流式處理,易于擴展,靈活性強,高度專注于流式處理。Storm在事件處理與增量計算方面表現突出,能夠以實時方式根據不斷變化的參數對數據流進行處理。
3.3 Kafka Stream
Kafka Stream是Apache Kafka開源項目的一個組成部分,是一個功能強大、易于使用的庫,它使得Apache Kafka擁有流處理的能力。
Kafka Stream是輕量級的流計算類庫,除了Apache Kafka之外沒有任何外部依賴,可以在任何Java程序中使用,使用Kafka作為內部消息通訊存儲介質,因此不需要為流處理需求額外部署一個集群。
Kafka Stream入門簡單,并且不依賴其他組件,非常容易部署,支持容錯的本地狀態,延遲低,非常適合一些輕量級流處理的場景。
3.4 Flink
Flink是一個面向分布式數據流處理和批量數據處理的開源計算平臺,同時支持批處理以及流處理,主要針對流數據,將批數據視為流數據的一個極限特例。
Flink核心是一個流式的數據流執行引擎,它提供了數據分布、數據通信以及容錯機制等功能。流執行引擎之上,Flink提供了更高層次的API以便用戶使用。Flink還針對某些領域提供了領域庫,例如Flink ML、Flink的機器學習庫等。
Flink適合有極高流處理需求,并有少量批處理任務的場景。該技術可兼容原生Storm和Hadoop程序,可在YARN管理的集群上運行。目前Flink最大的局限之一是在社區活躍度方面,該項目的大規模部署尚不如其他處理框架那么常見。
3.5 PipeLineDB
PipelineDB是基于PostgreSQL的一個流式計算數據庫,效率非常高,通過SQL對數據流做操作,并把操作結果儲存起來。其基本過程是:創建PipelineDB Stream、編寫SQL、對Stream做操作、操作結果被保存到continuous view。
PipelineDB特點是可以只使用SQL進行流式處理,不需要代碼,可以高效可持續自動處理流式數據,只存儲處理后的數據,因此非常適合流式數據處理,例如網站流量統計、網頁的瀏覽統計等。
3.6 架構對比
上文提到的5種流式處理框架對比如表1所示:
表1 流式框架對比
Storm的特點是成熟,是流式處理框架實際上的標準,模型、編程難度都比較復雜,框架采用循環處理數據,對系統資源,尤其是CPU資源消耗很大,當任務空閑時,需要sleep程序,減少對資源的消耗。Spark Streaming兼顧了批處理以及流式處理,并且有Spark的強大支持,發展潛力大,但與Kafka的接口平滑性不夠。Kafka Stream是Kafka的一個開發庫,具有入門、編程、部署運維簡單的特點,并且不需要部署額外的組件,但對于多維度的統計來說,需要基于不同topic來做分區,編程模型復雜。Flink跟Spark Streaming很像,不同的是Flink把所有任務當成流來處理,在迭代計算、內存管理方面比Spark Streaming稍強,缺點是社區活躍度不高,還不夠成熟;PipelineDB是一個流式計算數據庫,能執行簡單的流式計算任務,優勢是基本不需要開發,只要熟悉SQL操作均可以輕松使用,但對于集群計算,需要商業上的支持。
4 DPI數據處理方案
基于實際任務需求以及上文流式框架的對比,由于Kafka Stream編程難度小,不需要另外安裝軟件,與Kafka等組件無縫連接,比較穩定,并且各種性能均比較優秀,因此本文選擇了Kafka Stream作為流式處理的核心組件。
4.1寬帶DPI處理
為了完成寬帶DPI數據的實時抓包、資料填補、清洗、轉換及并入庫等工作,應用了上述DPI數據處理方案。具體項目方案如圖5所示:
圖5 廣州寬帶DPI處理方案
Mina進程是一個JAVA程序,基于mina框架開發,主要接收AAA數據包,獲得用戶賬戶信息,解析計算,并持久化到redis,最后發送給抓包(Capture)程序。Capture程序由C語言編寫,使用開源pcap抓取網卡http包,解析,結合用戶帳號資料,把DPI寫入到Kafka中。Kafka stream完成DPI的實時清洗和轉換工作。
Flume[15]是Cloudera開源的分布式可靠、可用、高效的收集,聚合和移動不同數據源的海量數據系統,配置簡單,基本無需開發,資源消耗低,支持傳輸數據到HDFS,非常適合與大數據系統結合。本項目將流式處理完后的數據通過Flume從Kafka中寫入到HDFS,建立hive表,為上層應用提供數據。
Kafka Stream采用自主研發的ETL框架[16],負責數據過濾(圖片、視頻等去掉),數據處理(獲取網絡ID、字段解析等)。ETL框架采用JAVA語言開發,支持多種數據源,包括普通文本、壓縮格式及xml立體格式等。支持多種大數據計算框架,包括Map/Reduce、Spark streaming、Kafka Stream和Flume等;具有擴展方便、字段校驗、支持字段的通配符及支持維表查詢等功能。在運維方面,支持變量引用以及出錯處理等功能。
4.24GDPI實時統計
以電信4G DPI信息作為數據源,通過流式處理,完成DPI的實時統計工作,包括多粒度(5分鐘/1小時/1天)去重用戶統計、多粒度去重不同號碼頭用戶統計、多粒度流量統計及多粒度去重域名統計等。4G DPI實時統計具體項目方案如圖6所示:
圖6 4G DPI實時統計方案圖
數據源是gzip壓縮文件,因為flume原生不支持.gz或.tar.gz文件格式,所以修改了Flume底層代碼,實現對壓縮文件的處理,省去了解壓時間。Flume采集文件時以用戶手機號碼作為分區的key,將同一號碼的數據分到同一分區,便于去重。通過Kafka集群管理工具,Kafka Manager[17]可以很好地監測Kafka集群的狀態。Kafka集群生產者如圖7所示:
圖7 Kafka集群生產者
Kafka Stream消費4GDPI的數據,并行處理。在程序里設置不同的計數器,所有數據都經過這些計數器處理,為了解決去重問題,引入了布隆過濾,雖然有一定的誤判率,但是還是能比較好的完成去重,同時保證系統的性能。同樣消費者也可以通過Kafka Manager進行管理,可以直觀觀察到消費者的落后程度。
為了滿足不同的輸出要求,程序設置了三種輸出供選擇。粒度為天的數據將會寫到MySQL作為備份,針對熱點區域的監控數據將會輸出到Redis,同時,為了方便管理以及數據呈現,還采用了ELK框架(ElasticSearch+Logstash+Kibana),將所有數據傳到Kibana做前端展示。Kibana界面如圖8所示:
圖8 Kibana界面
5 實踐及分析
5.1 部署實踐
上述兩個系統均已應用在實際的生產中,均有不錯的表現,能夠滿足任務需求,并且已經穩定運行。
寬帶DPI處理項目有2臺采集機、1臺AAA服務器及5臺Kafka機器。采集機每臺每秒產生115 MB數據,兩臺1.8 G流量。采集機寫Kafka 33萬條/秒,Kafka Stream寫Kafka 22萬條/秒,清洗率(清洗工作把諸如圖片、視頻及js請求等與業務無關的DPI信息去掉)為33%。Kafka Stream落后處理穩定在500萬數據,延遲處理在15 s之內,Flume寫HDFS落后保持在100萬左右,5 s內的延遲。寬帶DPI處理項目性能如圖9所示:
圖9 寬帶DPI處理項目性能
4G DPI實時統計項目共6臺機器,1臺為Flume采集機,其余5臺部署Kafka、Kafka Stream及ELK。采集機寫Kafka一般為10萬條/秒,峰值可達到25萬條/秒。ElasticSearch集群一共8個實例,每個實例配置2 G內存。目前集群有13億條數據,占361 G空間。通過Logstash導入數據到ElasticSearch峰值可以達到8~9萬條/秒。Kafka Stream處理數據落后在10 s內,Logstash寫ElasticSearch落后在5 s內,如圖10所示。目前4G DPI實時統計項目日均處理文件超過15 000個,大小達到1.6 T,日均處理記錄數超過100億。
圖10 4G DPI實時統計項目性能
5.2 存在的問題
在4G DPI實時統計項目開發過程中,隨著項目的需求越來越多,后面增加了對域名和CGI的去重,而且同一域名或者CGI不在同一Kafka分區,導致結果有偏差。為了解決這一問題,程序設計了二次去重,第一次去重的結果把CGI或者域名作為key輸出到Kafka集群,再做了一次去重工作,導致延遲時間變大和系統維護變復雜。
由于寬帶DPI處理中不涉及去重,只是數據過濾和數據轉換,因此Kafka Stream是非常適合的。但在涉及分區和去重的4G DPI實時統計項目中,應當采用Storm作為流式處理框架。在Storm中,數據從一個bolt流到另外一個bolt,這樣數據可以在一個bolt中按手機號碼分區,在另外一個bolt中又可以按CGI或者域名分區,可以避免二次去重問題,降低編程模型復雜度。
在程序設計之初,應根據應用場景需求選擇合適的技術框架。如果項目基礎結構中涉及Spark,那Spark Streaming是不錯的選擇;如果像4G DPI實時統計項目一樣需要數據轉移或者去重,那么Storm是首選;如果是簡單的數據清洗和轉換處理,那么Kafka Stream是不錯的選擇。對于簡單小規模的實時統計,PipeLineDB足以勝任。
6 結束語
大數據流式計算和批處理適用于不同的業務場景,在對時效要求高的場景下,流式計算具有明顯的優勢。本文首先概述了流式處理以及其與批處理的區別,然后對業界流行的流式計算框架進行了對比,根據業務需求提出了以Kafka Stream為流式處理框架的DPI數據處理方案,搭配Kafka、Flume及ELK等組件,具有入門迅速、編程難度低和部署維護簡單等特點。并且將方案應用到了寬帶DPI處理項目以及4G DPI實時統計項目中,完成了任務需求,性能優異,運行穩定。
在對實際項目實踐中,隨著任務需求的增多,發現Kafka Stream在應對多維度數據去重問題時表現不力,需要引入二次過濾來解決問題。因此在項目需求階段,便要在技術框架選型時充分考慮可能出現的問題,結合技術框架適用場景,綜合考慮。
-
數據處理
+關注
關注
0文章
549瀏覽量
28453 -
DPI
+關注
關注
0文章
34瀏覽量
11483
發布評論請先 登錄
相關推薦
評論