精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

扒一扒RocketMQ中會導致消息重復消息的原因

jf_ro2CN3Fa ? 來源:三友的java日記 ? 2023-05-06 12:25 ? 次閱讀

消息發送異常時重復發送

首先,我們來瞅瞅RocketMQ發送消息和消費消息的基本原理。

5704e1c2-eb16-11ed-90ce-dac502259ad0.png

如圖,簡單說一下上圖中的概念:

Broker,就是RocketMQ的服務端,如上圖就有兩個服務實例

Topic就是一類消息集合的名字

Queue就是Topic的對應的隊列,消息都存在Queue上,每個Topic都會有自己的幾個Queue

所以,整個消息發送和消費過程大致如下:

生產者在發送消息之前根據負載均衡策略(默認是輪詢)選擇一個Queue,然后跟這個Queue所在的機器建立連接,把消息發送到這個Queue上

消費者只要消費這個Queue,那么就能消費到消息

在正常情況下,生產者的確是按照這個方式來發送消息的

但是當出現了異常時,這種異常包括消息發送超時、響應超時等等,RocketMQ為了保證消息成功發送,會進行消息發送的重試操作,默認情況下會最多會重試兩次

571c1aa4-eb16-11ed-90ce-dac502259ad0.png

重試操作比較簡單,就是選擇另一臺機器的Queue來發送。

雖然重試操作可以很大程度保證消息能夠發送成功,但是同時也會帶來消息重復發送的問題。

舉個例子,假設生產者向A機器發送消息,發生了異常,響應超時了,但是就一定代表消息沒發成功么?

不一定,有可能會出現服務端的確接受到并處理了消息,但是由于網絡波動等等,導致生產者接收不到服務端響應的情況,此時消息處理成功了,但是生成者還是以為發生了異常

此時如果發生重試操作,那么勢必會導致消息被發送了兩次甚至更多次,導致服務端存了多條相同的消息,那么就一定會導致消費者重復消費消息

消費消息拋出異常

在RocketMQ的并發消費消息的模式下,需要用戶實現MessageListenerConcurrently接口來處理消息

57c9e24c-eb16-11ed-90ce-dac502259ad0.png

當消費者獲取到消息之后會調用MessageListenerConcurrently的實現,傳入需要消費的消息集合 msgs,這里提到的msgs很重要

57eecf1c-eb16-11ed-90ce-dac502259ad0.png

如上代碼,當消息消費出現異常的時候,status就會為null,后面就會將status設置成為RECONSUME_LATER。

RECONSUME_LATER翻譯成功中文就是稍后重新消費的意思

所以從這可以看出,一旦拋出異常,那么消息之后就可以被重復消息。

到這其實可能有小伙伴覺得消息消費失敗重新消費很正常,保證消息盡可能消費成功。

對,這句話不錯,的確可以在一定程度上保證消費異常的消息可以消費成功。

但是坑不在這,而是前面提到的消費時傳入的整個集合 中的消息都需要被重新消費。

具體的原因我們接著往下看

當消息處理之后,不論是成功還是異常,都需要對結果進行處理,代碼如下

5809f58a-eb16-11ed-90ce-dac502259ad0.png

當處理結果為RECONSUME_LATER的時候(異常會設置為RECONSUME_LATER),此時ackIndex會設置成-1,后面循環遍歷的時候就會遍歷到所有這次消費的消息,然后調用sendMessageBack方法,sendMessageBack方式是用來實現消息重新消費的邏輯,這里就不展開說了。

所以,一旦被消費的一批消息中出現一個消費異常的情況,那么就會導致整批消息被重新消費,從而會導致在出現異常之前的成功處理的消息都會被重復消費 ,非???。

不過好在消費時傳入的消息集合中的消息數量是可以設置的,并且默認就是1

58362cea-eb16-11ed-90ce-dac502259ad0.png

也就說默認情況下那個集合中就一條消息,所以默認情況下不會出現消費成功的消息被重復消費的情況。

所以這個參數不要輕易設置,一旦設置大了,就可能導致消息被重新消費。

除了并發消費消息的模式以外,RocketMQ還支持順序消費消息的模式,也會造成重復消費,邏輯其實差不多,但是在實現消息重新消費的邏輯不一樣。

消費者提交offset失敗

首先來講一講什么是offset。

前面說過,消息在發送的時候需要指定發送到,消息最后會被放到Queue中,其實真正的消息不是在Queue中,Queue存的是每個消息的位置,但是你可以理解為Queue存的是消息。

而消息在Queue中是有序號的,這個序號就被稱為offset,從0開始,單調遞增1。

58555066-eb16-11ed-90ce-dac502259ad0.png

比如說,如上圖,消息1的offset就是0,消息2的offset就是1,依次類推。

這個offset的一個作用就是用來管理消費者的消費進度。

當消費者在成功消費消息之后,需要將所消費的消息的offset提交給RocketMQ服務端,告訴RocketMQ,這個Queue的消息我已經消費到了這個位置了。

提交offset的代碼就在上述第二節提到的處理結果的后面

58625cde-eb16-11ed-90ce-dac502259ad0.png

這樣有一個好處,那么一旦消費者重啟了或者其它啥的要從這個Queue拉取消息的時候,此時他只需要問問RocketMQ服務端上次這個Queue消息消費到哪個位置了,之后消費者只需要從這個位置開始消費消息就行了,這樣就解決了接著消費的問題。

但是RocketMQ在設計的時候,當消費完消息的時候并不是同步告訴RocketMQ服務端offset,而是定時發送。

5887c3e8-eb16-11ed-90ce-dac502259ad0.png

如圖,當消費者消費完消息的時候,會將offset保存到內存中的一個Map數據結構中,所以上面截圖的那段代碼其實是更新內存中的offset

589c957a-eb16-11ed-90ce-dac502259ad0.png

而在消費者啟動的時候會開啟一個定時任務,默認是5s一次,會通過網絡請求將內存中的每個Queue的消費進度offset發送給RocketMQ服務端。

58b27b74-eb16-11ed-90ce-dac502259ad0.png

由于是定時任務,所以就可能出現服務器一旦宕機,導致最新消費的offset沒有成功告訴RocketMQ服務端的情況

此時,消費進度offset就丟了,那么消費者重啟的時候只能從RocketMQ中獲取到上一次提交的offset,從這里開始消費,而不是最新的offset,出現明明消費到了第8個消息,RocketMQ卻告訴他只消費到了第5個消息的情況,此時必然會導致消息又出現重復消費 的情況。

服務端持久化offset失敗

上一節說到,消費者會有一個每隔5s鐘的定時任務將每個隊列的消費進度offset提交到RocketMQ服務端

當RocketMQ服務端接收到提交請求之后,會將這個消費進度offset保存到內存中

58d0a072-eb16-11ed-90ce-dac502259ad0.png

同時為了保證RocketMQ服務端重啟消費進度不會丟失,也會開啟一個定時任務,默認也是5s一次,將內存中的消費進度持久化到磁盤文件中

58f805ea-eb16-11ed-90ce-dac502259ad0.png

所以,整個消費進度offset的數據流轉過程如下

5906fe7e-eb16-11ed-90ce-dac502259ad0.png

當RocketMQ服務端重啟之后,會從磁盤中讀取文件的數據加載到內存中。

跟消費者產生的問題一樣,一旦RocketMQ發生宕機,那么offset就有可能丟失5s鐘的數據,RocketMQ服務端一旦重啟,消費者從RocketMQ服務端獲取到的消息消費進度就比實際消費的進度低,同樣也會導致消息重復消費。

主從同步offset失敗

在RocketMQ的高可用模式中,有一種名叫主從同步的模式,當主節點掛了之后,從節點可以手動升級為主節點對外提供訪問,保證高可用。

在主從同步模式下,從節點默認每隔10s會向主節點發送請求,同步一些元數據,這些元數據就包括消費進度

591ebd98-eb16-11ed-90ce-dac502259ad0.png

當從節點獲取到主節點的消費進度之后,會將主節點的消費進度設置到自己的內存中,同時也會持久化到磁盤。

所以整個消費進度offset的數據的流轉過程就會變成如下

59417978-eb16-11ed-90ce-dac502259ad0.png

同樣,由于也是定時任務,那么一旦主節點掛了,從節點就會丟10s鐘的消費進度,此時如果從節點升級為主節點對外提供訪問,就會出現跟上面提到的一樣的情況,消費者從這個新的主節點中拿到的消費進度比實際的低,自然而然就會重復消費消息。

所以,總的來說,在消費進度數據流轉的過程中,只要某個環節出現了問題,都有很有可能會導致消息重復消費。

重平衡

先來講一講什么是重平衡,其實重平衡很好理解,我說一下你就明白了。

前面說到,消費者是從隊列中獲取消息的

59727820-eb16-11ed-90ce-dac502259ad0.png

在RocketMQ中,有個消費者組的概念,一個消費者組中可以有多個消費者,不同消費者組之間消費消息是互不干擾的,所以前面提到的消費者其實都在消費組下

5988424a-eb16-11ed-90ce-dac502259ad0.png

在同一個消費者組中,消息消費有兩種模式:

集群消費模式

廣播消費模式

由于RocketMQ默認 是集群消費模式,并且絕大多數業務場景都是使用集群消費模式,所以這里就不討論廣播消費模式了。

集群消費模式 是指同一條消息只能被這個消費者組消費一次,這就叫集群消費。

并且前面提到提交消費進度給RocketMQ服務端的情況只會集群消費模式下才會有,在廣播消費模式不會提給到RocketMQ服務端,僅僅持久化到本地磁盤

同時前面說的消費者提交消費進度真正提交的是消費者組對于這個Queue的消費進度,而不是指具體的某個消費者對于Queue消費進度。

雖然說這里將前面提到的一些含義更深一步,但是并不妨礙前面的理解。

集群消費的實現就是將隊列按照一定的算法分配給消費者,默認是按照平均分配的。

5998a216-eb16-11ed-90ce-dac502259ad0.png

如圖所示,假設某個topic有4個Queue,有個消費者組訂閱了這個topic,這個消費者組有兩個消費者1和消費者2,此時每個消費者就可以被分配兩個隊列,這樣就能保證消息正常情況下只會被消費一次。如果只有一個消費者,那么這個消費者就會消費所有隊列,很好理解。

接著后面又啟動了一個消費者3,此時為了保證剛上線的消費者3能夠消費消息,就要進行重平衡 操作,重新分配每個消費者消費的隊列。

在重平衡之后就可能會出現下面這種情況

59a98f36-eb16-11ed-90ce-dac502259ad0.png

如上圖,原本被消費者2消費的Queue4被分配給消費者3,此時消費者3就能消費到消息了,這就是重平衡 。

除了新增消費者會導致重平衡之外,消費者數量減少,隊列的數量增加或者減少都會觸發重平衡。

在了解了重平衡概念之后,接下來分析一下為什么重平衡會導致消息的重復消費。

假設在進行重平衡時,還未重平衡完之前,消費者2此時還是會按照上面第二節提到的消費消息的邏輯來消費Queue4的消息

當消費者2已經重平衡完成了,發現Queue4自己已經不能消費了,那么此時就會把這個Queue4設置為dropped,就是丟棄的意思

59bb4f46-eb16-11ed-90ce-dac502259ad0.png

但是由于重平衡進行時消費者2仍然在消費Queue4的消息,但是當消費完之后,發現隊列被設置成dropped,那么此時被消費者2消費消息的offset就不會被提交 ,原因如下代碼

59e5afac-eb16-11ed-90ce-dac502259ad0.png

這段代碼前面已經出現過,一旦dropped被設置成true,這個if條件就通不過,消費進度就不會被提交。

成功消費消息了,但是卻不提交消費進度,這就非??恿恕?。

于是當消費者3開始消費Queue4的消息的時候,他就會問問RocketMQ服務端,我消費者3所在的消費者組對于Queue4這個隊列消費到哪了,我接著消費就行了。

此時由于沒有提交消費進度,RocketMQ服務端告訴消費者3的消費進度就會比實際的低,這就造成了消息重復消費的情況。

清理長時間消費的消息

在RocketMQ中有這么一個機制,會定時清理長時間正在消費的消息。

5a0db4c0-eb16-11ed-90ce-dac502259ad0.png

如圖,假設有5條消息現在正在被消費者處理,這5條消息會被存在一個集合中,并且是按照offset的大小排序,消息1的offset最小,消息5的offset最大。

RocketMQ消費者啟動時會開啟一個默認15分鐘執行一次的定時任務

5a1d61d6-eb16-11ed-90ce-dac502259ad0.png

這個定時任務會去檢查正在處理的消息的第一條消息,也就是圖中的消息1,一旦發現消息1已經處理了超過15分鐘了,那么此時就會將消息1從集合中移除,之后會隔一定時間再次消費消息1。

這也會有坑,雖然消息1從集合中被移除了,但是消息1并沒有消失,仍然被消費者繼續處理,但是消息1隔一定時間就會再次被消費,就會出現消息1被重復消費的情況。

這就是清理長時間消費的消息導致重復消費的原因。

但此時又會引出一個新的疑問,為什么要移除這個處理超過15分鐘的消息呢?

這就又跟前面提到的消費進度 提交有關!

前面說過消息被消費完成之后會提交消費進度,提交的消費進度實際會有兩種情況:

第一種 就是某個線程消費了所有的消息,當把所有的消息都消費完成之后,就會把消息從集合中全部移除,此時提交的消費進度offset就是圖中消息5的offset+1

加1的操作是為了保證如果發生重啟,那么消費者下次消費的起始位置就是消息5后面的消息,保證消息5不被重復消費

第二種 情況就不太一樣了

假設現在有兩個線程來處理這5條消息,線程1處理前2條,線程2處理后3條,如圖

5a36b1ae-eb16-11ed-90ce-dac502259ad0.png

現在線程1出現了長時間處理消息的情況。

此時線程2處理完消息之后,移除后面三條消息,準備提交offset的時候發現集合中還有元素,就是線程1正在處理的前兩條消息,此時線程2提交的offset并不是消息5對應的offset,而是消息1的offset,代碼如下

5a42f22a-eb16-11ed-90ce-dac502259ad0.png

這么做的主要原因就是保證消息1和消息2至少被消費一次。

因為一旦提交了消息5對應的offset,如果消費者重啟了,下次消費就會接著從消息5的后面開始消費,而對于消息1和消息2來說,并不知道有沒有被消費成功,就有可能出現消息丟失的情況。

所以,一旦集合中最前面的消息長時間處理,那么就會導致后面被消費的消息進度無法提交,那么重啟之后就會導致大量消息被重復消費。

為了解決這個問題,RocketMQ引入了定時清理的機制,定時清理長時間消費的消息,這樣消費進度就可以提交了。

最后

總得來說,RocketMQ中還是存在很多種導致消息重讀消費的情況,并且官方也說了,只是在大多數情況下消息不會重復

5a82f546-eb16-11ed-90ce-dac502259ad0.png

所以如果你的業務場景中需要保證消息不能重復消費,那么就需要根據業務場景合理的設計冪等技術方案。





審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 服務器
    +關注

    關注

    12

    文章

    9029

    瀏覽量

    85207
  • Queue
    +關注

    關注

    0

    文章

    16

    瀏覽量

    7254
  • null
    +關注

    關注

    0

    文章

    17

    瀏覽量

    3919

原文標題:RocketMQ源碼中,7種導致重復消費的坑!

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    與無線網絡相關的那些事

    ,體驗上網帶來的愉悅(ps:天下沒有免費的午餐,認證登陸界面的廣告以及商戶提供產品內容,也能接受這樣的廣告植入形式)在上菜時,而不是先吃而是拍照片發微博\微信到朋友圈內,這也間接的為商戶進行口碑宣傳。你與無線網絡都有哪些趣
    發表于 05-27 11:40

    我看過的那些Linux相關的書籍

    來北京工作已經個多月,大都市的生活比起讀大學要忙碌得多,尤其是出行,基本以小時為基本的計時單位。有時茫然看著窗外車水馬龍,會有些迷茫自己選擇的是對還是錯。  題外話不多說,回歸這次的主題,
    發表于 07-04 06:39

    渣機產品有哪些參數

     銑挖機履帶式挖掘裝載機(俗名:渣機)是種連續生產的高效率出礦設備,主要用于礦山巖巷、半煤巖巷掘進,也可以用于引水洞、鐵路隧道施工和國防洞窟施工中的裝載作業,渣機與我公司生產的煤礦用液壓鉆車
    發表于 09-02 08:00

    電源模塊發熱的原因

    次,我們引起電源模塊發熱的原因。電源模塊在電壓轉換過程中有能量損耗,產生熱能導致模塊發熱,降低電源的轉換效率,影響電源模塊正常工作,并
    發表于 01-03 07:38

    C語言hello world背后的內幕

    行時,它在內存中是什么樣子的?程序的執行入口為什么是 main 函數?可執行文件的內部結構是怎么樣的?閑話少說,讓我們進入正題, hello world 背后的內幕。注:本文是在 Ubuntu
    發表于 09-30 10:31

    美容儀哪個牌子好?來令人眼花繚亂的日本美容儀

    獲得了大眾的喜愛。美容儀哪個牌子好?小編給你日本的美容儀神器品牌。 我們都知道,日本是個科技大國,不管是電器類還是美容儀類,都收獲了世界大批粉絲的追捧,市面上的美容儀品牌多種多樣,價格相差也大,各大美容儀從洗臉到瘦臉
    發表于 04-16 19:50 ?9848次閱讀
    美容儀哪個牌子好?來<b class='flag-5'>扒</b><b class='flag-5'>一</b><b class='flag-5'>扒</b>令人眼花繚亂的日本美容儀

    好用的日本家用美容儀品牌,讓你享受清潔肌膚的樂趣

    了,很多人都愿意嘗試美容儀帶來的護膚體驗。美容儀真的有用嗎?它的價格相比較于去美容院會劃算很多,但是卻比般的護膚保養品昂貴,是真的物有所值還是商家的噱頭?今天就來日本好用的家用
    發表于 06-04 21:03 ?846次閱讀

    店saas系統創新性服務平臺的優勢是什么

    也在不斷升級換代,比如目前新代的店智能數字店鋪系統。 前沿科技讓店鋪數字化管理趨于精準,店數字店鋪系統除了常規的收銀支付等最基本的功能外,結合當前最前沿的人工智能、5G、大數據等技術于
    的頭像 發表于 10-15 10:14 ?2022次閱讀

    中斷為什么不能調printf?

    前面說會寫下Modbus-RTU的實現,寫了1000多字了,有興趣的稍等下哈。前面在個群里看到個朋友在個串口接收中斷里打印遇到了問
    發表于 12-04 12:21 ?0次下載
    <b class='flag-5'>扒</b><b class='flag-5'>一</b><b class='flag-5'>扒</b>中斷為什么不能調printf?

    個超棒的stm32的開源usb-can項目,canable及PCAN固件

    個超棒的stm32的開源usb-can項目,canable及PCAN固件
    發表于 12-20 18:55 ?36次下載
    <b class='flag-5'>扒</b><b class='flag-5'>一</b>個超棒的stm32的開源usb-can項目,canable及PCAN固件

    RocketMQ中各類重復消費的原理淺析

    利用消息中間件,如何保證MQ消費消息的冪等性?所謂知其然,才能知其所以然,本文將通過RocketMQ作為例子,來什么情況下會導致
    的頭像 發表于 01-08 09:29 ?1077次閱讀
    <b class='flag-5'>RocketMQ</b>中各類<b class='flag-5'>重復</b>消費的原理淺析

    雕銑機、雕刻機和加工中心之間的區別

    中心、雕銑機、雕刻機,之間有什么區別?相信這句話很多剛剛加入這個圈的朋友都會問,然后在買機械設備的時候不太懂,不知道怎么區分,到底應該買什么樣的設備,才能達到自己的需求,今天小編就為大家他們三者之間的區別。
    的頭像 發表于 01-15 09:48 ?705次閱讀

    晶振頻率漂移的原因

    晶振頻率漂移的原因? 晶振頻率漂移是指晶振器輸出頻率在長時間使用中逐漸偏離其標稱頻率的現象。晶振頻率漂移是種晶振器的固有性能,其
    的頭像 發表于 01-26 14:20 ?938次閱讀

    折疊屏手機背后的“黑科技”

    折疊屏手機似乎正成為各大品牌下步推新的產品,那么今天,就讓我們這里面到底有什么“黑科技”。
    的頭像 發表于 02-26 10:34 ?1229次閱讀
    <b class='flag-5'>扒</b><b class='flag-5'>一</b><b class='flag-5'>扒</b>折疊屏手機背后的“黑科技”

    渣機遠程監控運維管理系統解決方案

    渣機主要由機械手與輸送機相結合,將自動渣和自動輸送功能合二為,替代了傳統的人工和間歇式機械作業,大大提高了工作效率,降低了勞動強度,并顯著提升了作業安全性,在礦山、道路、隧道、水利等工程領域
    的頭像 發表于 09-05 17:10 ?168次閱讀