引入 MQ
消息中間件最直接的目的:系統(tǒng)解耦以及流量控制(削峰填谷)
-
系統(tǒng)解耦: 上下游系統(tǒng)之間的通信相互依賴,利用
MQ
消息隊(duì)列可以隔離上下游環(huán)境變化帶來(lái)的不穩(wěn)定因素。 -
流量控制: 超高并發(fā)場(chǎng)景中,引入
MQ
可以實(shí)現(xiàn)流量 “削峰填谷” 的作用以及服務(wù)異步處理,不至于打崩服務(wù)。
引入 MQ
同樣帶來(lái)其他問(wèn)題:數(shù)據(jù)一致性。
在分布式系統(tǒng)中,如果兩個(gè)節(jié)點(diǎn)之間存在數(shù)據(jù)同步,就會(huì)帶來(lái)數(shù)據(jù)一致性的問(wèn)題。消息生產(chǎn)端發(fā)送消息到
MQ
再到消息消費(fèi)端需要保證消息不丟失。
所以在使用 MQ
消息隊(duì)列時(shí),需要考慮這 3 個(gè)問(wèn)題:
1、如何知道有消息丟失?
如何感知消息是否丟失了?可總結(jié)如下:
-
他人反饋: 運(yùn)營(yíng)、
PM
反饋消息丟失。 -
監(jiān)控報(bào)警: 監(jiān)控指定指標(biāo),即時(shí)報(bào)警人工調(diào)整。
Kafka
集群異常、Broker
宕機(jī)、Broker
磁盤掛載問(wèn)題、消費(fèi)者異常導(dǎo)致消息積壓等都會(huì)給用戶直接感覺(jué)是消息丟失了。
案例:輿情分析中數(shù)據(jù)采集同步
-
PM
可自己下發(fā)采集調(diào)度指令,去采集特定數(shù)據(jù)。 -
PM
可通過(guò)ES
近實(shí)時(shí)查詢對(duì)應(yīng)數(shù)據(jù),若沒(méi)相應(yīng)數(shù)據(jù)可再次下發(fā)指令。
當(dāng)感知消息丟失了,那就需要一種機(jī)制來(lái)檢查消息是否丟失。
檢索消息
運(yùn)維工具有:
-
查看
Kafka
消費(fèi)位置:
>基于SpringBoot+MyBatisPlus+Vue&Element實(shí)現(xiàn)的后臺(tái)管理系統(tǒng)+用戶小程序,支持RBAC動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
>
>*項(xiàng)目地址:
>*視頻教程:#查看某個(gè)topic的message數(shù)量
$./kafka-run-class.shkafka.tools.GetOffsetShell--broker-listlocalhost:9092--topictest_topic
>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element實(shí)現(xiàn)的后臺(tái)管理系統(tǒng)+用戶小程序,支持RBAC動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
>
>*項(xiàng)目地址:
>*視頻教程:#查看consumerGroup列表
$./kafka-consumer-groups.sh--list--bootstrap-server192.168.88.108:9092
#查看offset消費(fèi)情況
$./kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--groupconsole-consumer-1152--describe
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
console-consumer-1152test_topic0-4-consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942/127.0.0.1consumer-console-consumer-1152-1
-
利用工具:
Kafka Tools
- 其他可見化界面工具
2、哪些環(huán)節(jié)可能丟消息?
一條消息從生產(chǎn)到消費(fèi)完成經(jīng)歷 3 個(gè)環(huán)節(jié):消息生產(chǎn)者、消息中間件、消息消費(fèi)者。
哪個(gè)環(huán)節(jié)都有可能出現(xiàn)消息丟失問(wèn)題。
1)生產(chǎn)端
首先要認(rèn)識(shí)到 Kafka
生產(chǎn)端發(fā)送消息流程:
調(diào)用
send()
方法時(shí),不會(huì)立刻把消息發(fā)送出去,而是緩存起來(lái),選擇恰當(dāng)時(shí)機(jī)把緩存里的消息劃分成一批數(shù)據(jù),通過(guò)Sender
線程按批次發(fā)送給服務(wù)端Broker
。
此環(huán)節(jié)丟失消息的場(chǎng)景有: 即導(dǎo)致 Producer
消息沒(méi)有發(fā)送成功
-
網(wǎng)絡(luò)波動(dòng): 生產(chǎn)者與服務(wù)端之間的鏈路不可達(dá),發(fā)送超時(shí)。現(xiàn)象是:各端狀態(tài)正常,但消費(fèi)端就是沒(méi)有消費(fèi)消息,就像丟失消息一樣。
-
*解決措施: *重試
props.put("retries", "10");
-
不恰當(dāng)配置: 發(fā)送消息無(wú)
ack
確認(rèn); 發(fā)送消息失敗無(wú)回調(diào),無(wú)日志。producer.send(newProducerRecord<>(topic,messageKey,messageStr), newCallBack(){...});
-
*解決措施: *設(shè)置
acks=1
或者acks=all
。發(fā)送消息設(shè)置回調(diào)。
回顧下重要的參數(shù): acks
-
acks=0
:不需要等待服務(wù)器的確認(rèn). 這是retries
設(shè)置無(wú)效. 響應(yīng)里來(lái)自服務(wù)端的offset
總是-1
,producer
只管發(fā)不管發(fā)送成功與否。延遲低,容易丟失數(shù)據(jù)。 -
acks=1
:表示leader
寫入成功(但是并沒(méi)有刷新到磁盤)后即向producer
響應(yīng)。延遲中等,一旦leader
副本掛了,就會(huì)丟失數(shù)據(jù)。 -
acks=all
:等待數(shù)據(jù)完成副本的復(fù)制, 等同于-1
. 假如需要保證消息不丟失, 需要使用該設(shè)置. 同時(shí)需要設(shè)置unclean.leader.election.enable
為true
, 保證當(dāng)ISR
列表為空時(shí), 選擇其他存活的副本作為新的leader
.
2)服務(wù)端
先來(lái)了解下 Kafka Broker
寫入數(shù)據(jù)的過(guò)程:
-
Broker
接收到一批數(shù)據(jù),會(huì)先寫入內(nèi)存PageCache
(OS Cache
)中。 -
操作系統(tǒng)會(huì)隔段時(shí)間把
OS Cache
中數(shù)據(jù)進(jìn)行刷盤,這個(gè)過(guò)程會(huì)是 「異步批量刷盤」 。
這里就有個(gè)隱患,如果數(shù)據(jù)寫入 PageCache
后 Kafka Broker
宕機(jī)會(huì)怎樣?機(jī)子宕機(jī)/掉電?
-
Kafka Broker
宕機(jī): 消息不會(huì)丟失。因?yàn)閿?shù)據(jù)已經(jīng)寫入PageCache
,只等待操作系統(tǒng)刷盤即可。 -
機(jī)子宕機(jī)/掉電: 消息會(huì)丟失。因?yàn)閿?shù)據(jù)仍在內(nèi)存里,內(nèi)存
RAM
掉電后就會(huì)丟失數(shù)據(jù)。
- 解決方案 :使用帶蓄電池后備電源的緩存
cache
,防止系統(tǒng)斷電異常。
- 對(duì)比學(xué)習(xí)
MySQL
的 “雙1” 策略,基本不使用這個(gè)策略,因?yàn)?“雙1” 會(huì)導(dǎo)致頻繁的I/O
操作,也是最慢的一種。- 對(duì)比學(xué)習(xí)
Redis
的AOF
策略,默認(rèn)且推薦的策略:**Everysec
(AOF_FSYNC_EVERYSEC
) 每一秒鐘保存一次(默認(rèn)):** 。每個(gè)寫命令執(zhí)行完, 只是先把日志寫到AOF
文件的內(nèi)存緩沖區(qū), 每隔一秒把緩沖區(qū)中的內(nèi)容寫入磁盤。
拓展:Kafka
日志刷盤機(jī)制
# 推薦采用默認(rèn)值,即不配置該配置,交由操作系統(tǒng)自行決定何時(shí)落盤,以提升性能。
# 針對(duì) broker 配置:
log.flush.interval.messages=10000 # 日志落盤消息條數(shù)間隔,即每接收到一定條數(shù)消息,即進(jìn)行l(wèi)og落盤。
log.flush.interval.ms=1000 # 日志落盤時(shí)間間隔,單位ms,即每隔一定時(shí)間,即進(jìn)行l(wèi)og落盤。
# 針對(duì) topic 配置:
flush.messages.flush.ms=1000 # topic下每1s刷盤
flush.messages=1 # topic下每個(gè)消息都落盤
# 查看 Linux 后臺(tái)線程執(zhí)行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10 # 表示當(dāng)臟頁(yè)占總內(nèi)存的的百分比超過(guò)這個(gè)值時(shí),后臺(tái)線程開始刷新臟頁(yè)。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000 # 表示臟數(shù)據(jù)多久會(huì)被刷新到磁盤上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500 # 表示多久喚醒一次刷新臟頁(yè)的后臺(tái)線程(5秒)。
vm.dirtytime_expire_seconds = 43200
Broker
的可靠性需要依賴其多副本機(jī)制: 一般副本數(shù) 3 個(gè)(配置參數(shù):replication.factor=3
)
-
Leader Partition
副本:提供對(duì)外讀寫機(jī)制。 -
Follower Partition
副本:同步Leader
數(shù)據(jù)。
副本之間的數(shù)據(jù)同步也可能出現(xiàn)問(wèn)題:數(shù)據(jù)丟失問(wèn)題和數(shù)據(jù)不一致問(wèn)題。
解決方案:ISR
和 Epoch
機(jī)制
-
ISR
(In-Sync Replicas
) : 當(dāng)Le``ader
宕機(jī),可以從ISR
中選擇一個(gè)Follower
作為Leader
。 -
Epoch
機(jī)制: 解決Leader
副本高水位更新和Follower
副本高水位更新在時(shí)間上是存在錯(cuò)配問(wèn)題。Tips
:Kafka 0.11.x
版本才引入leader epoch
機(jī)制解決高水位機(jī)制弊端。
對(duì)應(yīng)需要的配置參數(shù)如下:
-
acks=-1
或者acks=all
: 必須所有副本均同步到消息,才能表明消息發(fā)送成功。 -
replication.factor >= 3
: 副本數(shù)至少有 3 個(gè)。 -
min.insync.replicas > 1
: 代表消息至少寫入 2個(gè)副本才算發(fā)送成功。前提需要acks=-1
。舉個(gè)栗子:
Leader
宕機(jī)了,至少要保證ISR
中有一個(gè)Follower
,這樣這個(gè)Follwer
被選舉為Leader
且不會(huì)丟失數(shù)據(jù)。公式:
replication.factor = min.insync.replicas + 1
-
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被選舉為Leader
。Kafka 0.11.0.0版本開始默認(rèn)
unclean.leader.election.enable=false
3)消費(fèi)端
消費(fèi)端消息丟失場(chǎng)景有:
-
消息堆積: 幾個(gè)分區(qū)的消息都沒(méi)消費(fèi),就跟丟消息一樣。
- 解決措施: 一般問(wèn)題都出在消費(fèi)端,盡量提高客戶端的消費(fèi)速度,消費(fèi)邏輯另起線程進(jìn)行處理。
-
自動(dòng)提交: 消費(fèi)端拉下一批數(shù)據(jù),正在處理中自動(dòng)提交了
offset
,這時(shí)候消費(fèi)端宕機(jī)了; 重啟后,拉到新一批數(shù)據(jù),而上一批數(shù)據(jù)卻沒(méi)處理完。
-
解決措施: 取消自動(dòng)提交
auto.commit = false
,改為手動(dòng)ack
。
-
心跳超時(shí),引發(fā)
Rebalance
: 客戶端心跳超時(shí),觸發(fā)Rebalance
被踢出消費(fèi)組。如果只有這一個(gè)客戶端,那消息就不會(huì)被消費(fèi)了。同時(shí)避免兩次
poll
的間隔時(shí)間超過(guò)閾值:
-
max.poll.records
:降低該參數(shù)值,建議遠(yuǎn)遠(yuǎn)小于<單個(gè)線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個(gè)數(shù)> *
的積。 -
max.poll.interval.ms
: 該值要大于/ (<單個(gè)線程每秒消費(fèi)的條數(shù)> * <消費(fèi)線程的個(gè)數(shù)>)
的值。 -
解決措施: 客戶端版本升級(jí)至
0.10.2
以上版本。
案例:凡凡曾遇到數(shù)據(jù)同步時(shí),消息中的文本需經(jīng)過(guò) NLP
的 NER
分析,再同步到 ES
。
這個(gè)過(guò)程的主要流程是:
-
數(shù)據(jù)同步程序從
Kafka
中拉取消息。 -
數(shù)據(jù)同步程序?qū)⑾?nèi)的文本發(fā)送的
NER
進(jìn)行分析,得到特征數(shù)組。 -
數(shù)據(jù)同步程序?qū)⑾⑼浇o
ES
。
現(xiàn)象:線上數(shù)據(jù)同步程序運(yùn)行一段時(shí)間后,消息就不消費(fèi)了。
-
排查日志: 發(fā)現(xiàn)有
Rebalance
日志,懷疑是客戶端消費(fèi)太慢被踢出了消費(fèi)組。 -
本地測(cè)試: 發(fā)現(xiàn)運(yùn)行一段時(shí)間也會(huì)出現(xiàn)
Rebalance
,且NLP
的NER
服務(wù)訪問(wèn)HTTP 500
報(bào)錯(cuò)。 -
得出結(jié)論: 因
NER
服務(wù)異常,導(dǎo)致數(shù)據(jù)同步程序消費(fèi)超時(shí)。且當(dāng)時(shí)客戶端版本為v0.10.1
,Consumer
沒(méi)有獨(dú)立線程維持心跳,而是把心跳維持與poll
接口耦合在一起,從而也會(huì)造成心跳超時(shí)。
當(dāng)時(shí)解決措施是:
-
session.timeout.ms
: 設(shè)置為25s
,當(dāng)時(shí)沒(méi)有升級(jí)客戶端版本,怕帶來(lái)其他問(wèn)題。 -
熔斷機(jī)制: 增加
Hystrix
,超過(guò) 3 次服務(wù)調(diào)用異常就熔斷,保護(hù)客戶端正常消費(fèi)數(shù)據(jù)。
3、如何確保消息不丟失?
掌握這些技能:
- 熟悉消息從發(fā)送到消費(fèi)的每個(gè)階段
-
監(jiān)控報(bào)警
Kafka
集群 - 熟悉方案 “MQ 可靠消息投遞”
怎么確保消息 100% 不丟失?
到這,總結(jié)下:
- 生產(chǎn)端:
-
設(shè)置重試:
props.put("retries", "10");
-
設(shè)置
acks=all
-
設(shè)置回調(diào):
producer.send(msg, new CallBack(){...});
- Broker:
-
內(nèi)存:使用帶蓄電池后備電源的緩存
cache
。 -
Kafka
版本0.11.x
以上:支持Epoch
機(jī)制。 -
replication.factor >= 3
: 副本數(shù)至少有 3 個(gè)。 -
min.insync.replicas > 1
: 代表消息至少寫入 2個(gè)副本才算發(fā)送成功。前提需要acks=-1
。 -
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被選舉為Leader
。
- 消費(fèi)端
-
客戶端版本升級(jí)至
0.10.2
以上版本。 -
取消自動(dòng)提交
auto.commit = false
,改為手動(dòng)ack
。 - 盡量提高客戶端的消費(fèi)速度,消費(fèi)邏輯另起線程進(jìn)行處理。
審核編輯:湯梓紅
-
消息隊(duì)列
+關(guān)注
關(guān)注
0文章
33瀏覽量
2969 -
kafka
+關(guān)注
關(guān)注
0文章
50瀏覽量
5211
原文標(biāo)題:案例 | Kafka 為什么會(huì)丟消息?
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論