今天來分享 RocketMQ 的定時任務。通過這些定時任務,能讓我們更加理解 RocketMQ 的消息處理機制和設計理念。
從 RocketMQ 4.9.4 的源代碼上看,RocketMQ 的定時任務有很多,今天主要講解一些核心的定時任務。
1 架構回顧
首先再來回顧一下 RocketMQ 的架構圖:
Name Server 集群部署,但是節點之間并不會同步數據,因為每個節點都會保存完整的數據。因此單個節點掛掉,并不會對集群產生影響。
Broker 可以采用主從集群部署,實現多副本存儲和高可用。每個 Broker 節點都要跟所有的 Name Server 節點建立長連接,定義注冊 Topic 路由信息和發送心跳。
Producer 和 Consumer 跟 Name Server 的任意一個節點建立長連接,定期從 Name Server 拉取 Topic 路由信息。
2 Producer 和 Consumer
2.1 獲取 NameServer 地址
Producer 和 Consumer 要跟 Name Server 建立連接,就必須首先獲取 Name Server 地址。Producer 和 Consumer 采用定時任務每兩分鐘獲取 Name Server 地址并更新本地緩存。代碼如下:
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchNameServerAddr?exception",?e); ??} ?} },?1000?*?10,?1000?*?60?*?2,?TimeUnit.MILLISECONDS);
2.2 更新路由信息
Producer 和 Consumer 會定時從 Name Server 獲取定時訂閱信息,更新本地緩存,默認間隔是 30s(可以配置)。代碼如下
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.updateTopicRouteInfoFromNameServer(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?updateTopicRouteInfoFromNameServer?exception",?e); ??} ?} },?10,?this.clientConfig.getPollNameServerInterval(),?TimeUnit.MILLISECONDS);
?
?
2.3 向 Broker 發送心跳
Producer 和 Consumer 會從本地緩存的 Broker 列表中定時清除離線的 Broker,并且向 Broker 發送心跳,默認間隔是 30s(可以配置)。代碼如下:
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.cleanOfflineBroker(); ???MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?sendHeartbeatToAllBroker?exception",?e); ??} ?} },?1000,?this.clientConfig.getHeartbeatBrokerInterval(),?TimeUnit.MILLISECONDS);
?
?
2.4 持久化 Offset
消費者需要定時持久化 MessageQueue 的偏移量,默認每 5s 更新一次(可以配置)。
注意:集群模式需要向 Broker 發送持久化消息,因為集群模式偏移量保存在 Broker 端,而廣播模式只需要把偏移量保存在消費者本地文件。 代碼如下:
?
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.persistAllConsumerOffset(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?persistAllConsumerOffset?exception",?e); ??} ?} },?1000?*?10,?this.clientConfig.getPersistConsumerOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
2.5 調整核心線程數
對于消費者采用推模式的情況,消費者會根據未消費的消息數量,定期更新核心線程數,默認每 1m 一次。
注意:在 4.9.4 這個版本,更新核心線程數的代碼并沒有實現,只是預留了接口。 代碼如下:
?
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.adjustThreadPool(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?adjustThreadPool?exception",?e); ??} ?} },?1,?1,?TimeUnit.MINUTES);
?
?
2.6 失效過期請求
Producer 和 Consumer 會定時掃描緩存在本地的請求,如果請求開始時間加超時時間(再加 1s)小于當前時間,則這個請求過期。通過定時任務(3s 一次)讓過期請求失效,并且觸發回調函數。
?
?
//NettyRemotingClient.java this.timer.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??try?{ ???NettyRemotingClient.this.scanResponseTable(); ??}?catch?(Throwable?e)?{ ???log.error("scanResponseTable?exception",?e); ??} ?} },?1000?*?3,?1000);
?
?
2.7 生產者
2.7.1 性能記錄
生產者發送消息后,會對成功失敗的狀態、花費時間進行記錄,以此來計算吞吐量 TPS,響應時間 RT,代碼如下:
?
?
//Producer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??snapshotList.addLast(statsBenchmark.createSnapshot()); ??if?(snapshotList.size()?>?10)?{ ???snapshotList.removeFirst(); ??} ?} },?1000,?1000,?TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(new?TimerTask()?{ ?private?void?printStats()?{ ??if?(snapshotList.size()?>=?10)?{ ???doPrintStats(snapshotList,??statsBenchmark,?false); ??} ?} ?@Override ?public?void?run()?{ ??try?{ ???this.printStats(); ??}?catch?(Exception?e)?{ ???e.printStackTrace(); ??} ?} },?10000,?10000,?TimeUnit.MILLISECONDS);
?
?
2.8 消費者
2.8.1 MessageQueue 加鎖
對于順序消息,要保證同一個 MessageQueue 只能被同一個 Consumer 消費。消費者初始化的時候,會啟動一個定時任務,定時(默認 20s,可以配置)地向 Broker 發送鎖定消息,Broker 收到請求后,就會把 MessageQueue、group 和 clientId 進行綁定,這樣其他客戶端就不能從這個 MessageQueue 拉取消息。
代碼如下:
?
?
//ConsumeMessageOrderlyService.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???ConsumeMessageOrderlyService.this.lockMQPeriodically(); ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?lockMQPeriodically?exception",?e); ??} ?} },?1000?*?1,?ProcessQueue.REBALANCE_LOCK_INTERVAL,?TimeUnit.MILLISECONDS);
?
?
注意:Broker 的加鎖是有時效的(默認 60s,可以配置),過期后,有可能被其他 Consumer 進行消費。
2.8.2 性能快照
Consumer 每秒會記錄一次性能快照,比如消息從創建到消費花費的時間,消息從保存到消費花費的時間,接收到消息的總數量,失敗總數量。代碼如下:
?
?
//Consumer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); ??if?(snapshotList.size()?>?10)?{ ???snapshotList.removeFirst(); ??} ?} },?1000,?1000,?TimeUnit.MILLISECONDS);
?
?
上面記錄了性能快照后,Consumer 會每隔 10s 進行性能參數計算和打印。代碼如下:
?
?
//Consumer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ private?void?printStats()?{ ?if?(snapshotList.size()?>=?10)?{ ??Long[]?begin?=?snapshotList.getFirst(); ??Long[]?end?=?snapshotList.getLast(); ??final?long?consumeTps?= ???(long)?(((end[1]?-?begin[1])?/?(double)?(end[0]?-?begin[0]))?*?1000L); ??final?double?averageB2CRT?=?(end[2]?-?begin[2])?/?(double)?(end[1]?-?begin[1]); ??final?double?averageS2CRT?=?(end[3]?-?begin[3])?/?(double)?(end[1]?-?begin[1]); ??final?long?failCount?=?end[4]?-?begin[4]; ??final?long?b2cMax?=?statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get(); ??final?long?s2cMax?=?statsBenchmarkConsumer.getStore2ConsumerMaxRT().get(); ??statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0); ??statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0); ??System.out.printf("Current?Time:?%s?TPS:?%d?FAIL:?%d?AVG(B2C)?RT(ms):?%7.3f?AVG(S2C)?RT(ms):?%7.3f?MAX(B2C)?RT(ms):?%d?MAX(S2C)?RT(ms):?%d%n", ????System.currentTimeMillis(),?consumeTps,?failCount,?averageB2CRT,?averageS2CRT,?b2cMax,?s2cMax ??); ?} }
?
?
通過性能參數的日志輸出,可以很方便的對 RocketMQ 的消費者進行監控。
2.8.3 清除過期消息
消費者會定期檢查本地拉取的消息列表,如果列表中的消息已經過期(默認 15 分鐘過期,可以配置),則把過期消息再次發送給 Broker,然后從本地消息列表刪除。代碼如下:
?
?
//ConsumeMessageConcurrentlyService.java this.cleanExpireMsgExecutors.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???cleanExpireMsg(); ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?cleanExpireMsg?exception",?e); ??} ?} },?this.defaultMQPushConsumer.getConsumeTimeout(),?this.defaultMQPushConsumer.getConsumeTimeout(),?TimeUnit.MINUTES);
?
?
2.8.4 清除過期消息
消費者會每隔 30s 向 NameServer 拉取 MessageQueue 信息,然后跟本地保存的進行比較,如果不一致,則更新本地緩存信息。代碼如下:
?
?
//DefaultLitePullConsumerImpl.java scheduledExecutorService.scheduleAtFixedRate( new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???fetchTopicMessageQueuesAndCompare(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchMessageQueuesAndCompare?exception",?e); ??} ?} },?1000?*?10,?this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(),?TimeUnit.MILLISECONDS);
?
?
3 Broker
3.1 狀態采樣
Broker 端會對狀態進行采用,比如一個 Topic、MessageQueue、Group 總共發送了多少條消息,Topic 總共發送的消息大小。Broker 會對這些狀態按照秒、分鐘、小時為單位進行采樣并且定時打印,這里一共有 6 個定時任務。比如下面是按照秒進行采樣的定時任務:
?
?
//StatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???samplingInSeconds(); ??}?catch?(Throwable?ignored)?{ ??} ?} },?0,?10,?TimeUnit.SECONDS);
?
?
3.2 記錄消息延時
Broker 讀取消息時會記錄消息從保存磁盤到被讀取的時間差并定時打印。定時任務代碼如下:
?
?
//MomentStatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???printAtMinutes(); ??}?catch?(Throwable?ignored)?{ ??} ?} },?Math.abs(UtilAll.computeNextMinutesTimeMillis()?-?System.currentTimeMillis()),?1000?*?60?*?5,?TimeUnit.MILLISECONDS);
?
?
3.3 持久化數據
Broker 會定時持久化消費偏移量、Topic 配置、定閱組配置等,默認 10s 一次(可以配置)。代碼如下:
?
?
//ScheduleMessageService.java this.deliverExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???if?(started.get())?{ ????ScheduleMessageService.this.persist(); ???} ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?flush?exception",?e); ??} ?} },?10000,?this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
3.4 失效過期請求
Broker 會定時掃描緩存在本地的請求,如果請求開始時間加超時時間(再加 1s)小于當前時間,則這個請求過期。通過定時任務(3s 一次)讓過期請求失效,并且觸發回調函數。
?
?
//NettyRemotingServer.java this.timer.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??try?{ ???NettyRemotingServer.this.scanResponseTable(); ??}?catch?(Throwable?e)?{ ???log.error("scanResponseTable?exception",?e); ??} ?} },?1000?*?3,?1000);
?
?
3.5 過濾服務
消費者可能會向 Broker 注冊 filterClass 用來過濾消息。Broker 收到消費者注冊的 filterClass 后會用定時任務來創建 FilterServer。代碼如下:
?
?
//FilterServerManager.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???FilterServerManager.this.createFilterServer(); ??}?catch?(Exception?e)?{ ???log.error("",?e); ??} ?} },?1000?*?5,?1000?*?30,?TimeUnit.MILLISECONDS);
?
?
這樣消費者拉取消息時首先從 FilterServer 拉取消息,FilterServer 從 Broker 拉取消息后進行過濾,只把消費者感興趣的消息返回給消費者。一個 Broker 可以有多個 FilterServer。如下圖:
3.6 記錄消息總量
Broker 每天會記錄前一天收發消息的總數量,定時任務如下(period 是 1 天):
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.getBrokerStats().record(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?record?error.",?e); ??} ?} },?initialDelay,?period,?TimeUnit.MILLISECONDS);
?
?
3.7 持久化 Offset
Broker 默認每隔 5s(可以配置) 會持久化一次消息的 Offset,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.consumerOffsetManager.persist(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?persist?consumerOffset?error.",?e); ??} ?} },?1000?*?10,?this.brokerConfig.getFlushConsumerOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
3.8 持久化過濾參數
上面提到過,消費者可能會向 Broker 注冊 filterClass,Broker 解析消費者注冊的 filterClass 后,會把解析后的 FilterData 持久化到文件,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.consumerFilterManager.persist(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?persist?consumer?filter?error.",?e); ??} ?} },?1000?*?10,?1000?*?10,?TimeUnit.MILLISECONDS);
?
?
3.9 Broker 自我保護
當消費者讀取消息緩慢時,Broker 為了保護自己,會把這個消費者設置為不允許讀取的狀態,這樣這個消費組就不能再拉取消息了,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.protectBroker(); ??}?catch?(Throwable?e)?{ ???log.error("protectBroker?error.",?e); ??} ?} },?3,?3,?TimeUnit.MINUTES);
?
?
3.10 Broker 打印水位
Broker 會每隔 1s 打印一次水位,包括發送消息的延遲、接收消息的延遲、事務消息的延遲、查詢消息的延遲,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.printWaterMark(); ??}?catch?(Throwable?e)?{ ???log.error("printWaterMark?error.",?e); ??} ?} },?10,?1,?TimeUnit.SECONDS);
?
?
3.11 Broker 打印 Offset 差
Broker 會定時打印最新的消息 Offset 和已經分發給 MessageQueue 和 Index 索引的 Offset 差距,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???log.info("dispatch?behind?commit?log?{}?bytes",?BrokerController.this.getMessageStore().dispatchBehindBytes()); ??}?catch?(Throwable?e)?{ ???log.error("schedule?dispatchBehindBytes?error.",?e); ??} ?} },?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);
3.12 獲取 NameServer 地址
Broker 會定期獲取 NameServer 的地址,并更新本地緩存,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); ??}?catch?(Throwable?e)?{ ???log.error("ScheduledTask?fetchNameServerAddr?exception",?e); ??} ?} },?1000?*?10,?1000?*?60?*?2,?TimeUnit.MILLISECONDS);
3.13 打印主從偏移量差距
Broker 會定時打印 master 節點和 slave 節點消息 Offset 的差距,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.printMasterAndSlaveDiff(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?printMasterAndSlaveDiff?error.",?e); ??} ?} },?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);
3.14 向 NameServer 注冊
Broker 會定時向(默認 30s,可配置,最高不超過 60s)所有 NameServer 發送注冊消息,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.registerBrokerAll(true,?false,?brokerConfig.isForceRegister()); ??}?catch?(Throwable?e)?{ ???log.error("registerBrokerAll?Exception",?e); ??} ?} },?1000?*?10,?Math.max(10000,?Math.min(brokerConfig.getRegisterNameServerPeriod(),?60000)),?TimeUnit.MILLISECONDS);
3.15 同步 Slave
Broker 的 Master 節點會每間隔 10s 向 Slave 節點同步數據,包括 Topic 配置、消費偏移量、延遲偏移量、消費組配置,代碼如下:
//BrokerController.java slaveSyncFuture?=?this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.slaveSynchronize.syncAll(); ??} ??catch?(Throwable?e)?{ ???log.error("ScheduledTask?SlaveSynchronize?syncAll?error.",?e); ??} ?} },?1000?*?3,?1000?*?10,?TimeUnit.MILLISECONDS);
3.16 刪除過期文件
Broker 會周期性(默認 10s,可以配置)地執行刪除任務,刪除過期的 CommitLog 文件和 ConsumeQueue 文件,代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??DefaultMessageStore.this.cleanFilesPeriodically(); ?} },?1000?*?60,?this.messageStoreConfig.getCleanResourceInterval(),?TimeUnit.MILLISECONDS);
3.17 文件大小檢查
Broker 會每隔 10 分鐘檢查 CommitLog 文件和 ConsumeQueue 文件,用當前文件的最?。ㄆ鹗迹?Offset 減去上一個文件最?。ㄆ鹗迹?Offset,如果不等于一個文件的大小,就說明文件存在問題。代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??DefaultMessageStore.this.checkSelf(); ?} },?1,?10,?TimeUnit.MINUTES);
3.18 保存堆棧映射
Broker 會每隔 1s 記錄所有存活線程的堆棧映射信息,前提是 debugLockEnable 開關配置是打開的。代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??if?(DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable())?{ ???try?{ ????if?(DefaultMessageStore.this.commitLog.getBeginTimeInLock()?!=?0)?{ ?????long?lockTime?=?System.currentTimeMillis()?-?DefaultMessageStore.this.commitLog.getBeginTimeInLock(); ?????if?(lockTime?>?1000?&&?lockTime?10000000)?{ ??????String?stack?=?UtilAll.jstack(); ??????final?String?fileName?=?System.getProperty("user.home")?+?File.separator?+?"debug/lock/stack-" ???????+?DefaultMessageStore.this.commitLog.getBeginTimeInLock()?+?"-"?+?lockTime; ??????MixAll.string2FileNotSafe(stack,?fileName); ?????} ????} ???}?catch?(Exception?e)?{ ???} ??} ?} },?1,?1,?TimeUnit.SECONDS);
3.19 檢查物理磁盤
Broker 會每隔 10s 檢查保存 CommitLog 的磁盤空間是否達到閾值,如果達到,會打印 error 級別的日志。代碼如下:
//DefaultMessageStore.java this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?public?void?run()?{ ??DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); ?} },?1000L,?10000L,?TimeUnit.MILLISECONDS);
3.20 持久化延時消息偏移量
RocketMQ 的延時消費分為 18 個級別,定義如下:
//ScheduleMessageService.java private?String?messageDelayLevel?=?"1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h";
RocketMQ 會為每個延時級別定義要給 ConsumeQueue,每隔 ConsumeQueue 都會有一個 Offset,通過 offsetTable(ConcurrentMap) 來記錄不同延時級別對應的 Offset。
RocketMQ 會周期性地(默認 10s,可以配置)把 offsetTable 中保存的 Offset 持久化到文件。代碼如下:
//DefaultMessageStore.java this.deliverExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???if?(started.get())?{ ????ScheduleMessageService.this.persist(); ???} ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?flush?exception",?e); ??} ?} },?10000,?this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(),?TimeUnit.MILLISECONDS);
3.21 關閉異常連接
Broker 會定時掃描所有的長連接,主要包括生產者、消費者和 FilterServer,如果連接不活躍,則關閉該連接,并從本地連接列表中移除。代碼如下:
//ClientHousekeepingService.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???ClientHousekeepingService.this.scanExceptionChannel(); ??}?catch?(Throwable?e)?{ ???log.error("Error?occurred?when?scan?not?active?client?channels.",?e); ??} ?} },?1000?*?10,?1000?*?10,?TimeUnit.MILLISECONDS);
3.22 清理過期消息
如果 Broker 配置了允許快速失?。╞rokerFastFailureEnable),則會每隔 10ms 定時清理過期請求,包括要發送的消息、接收的消息、心跳消息、要結束的事務消息。代碼如下:
scheduledExecutorService.scheduleAtFixedRate( new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???fetchTopicMessageQueuesAndCompare(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchMessageQueuesAndCompare?exception",?e); ??} ?} },?1000?*?10,?this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(),?TimeUnit.MILLISECONDS);
注意:清理消息前會判斷是否系統繁忙,如果系統繁忙,會給發送隊列中的消息直接返回系統繁忙,暫時不做過期消息清理。
4 NameServer
4.1 檢查過期 Broker
在 3.14 節中講過,Broker 會跟 NameServer 建立長連接,定時向 NameServer 發送注冊消息。NameServer 會在本地維護一個 Broker 列表,定時任務會輪詢本地保存的 Broker 列表,檢查注冊消息是否過期(超過 120s),如果注冊消息過期,則關閉長連接,從本地緩存刪除這個 Broker。代碼如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,?5,?10,?TimeUnit.SECONDS);
4.2 打印配置
NameServer 啟動時,會加載 KV 格式的配置文件到 configTable 這個變量,NameServer 客戶端也可以發送一個 KV 配置請求給 NameServer,NameServer 收到請求后也會保存到 configTable。
NameServer 會定時打印 configTable 中的配置,代碼如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,?1,?10,?TimeUnit.MINUTES);
5 總結
RocketMQ 的定時任務很多,這些定時任務的加入讓 RocketMQ 的設計更加完備,包括業務處理、監控日志、心跳、清理任務、關閉連接、持久化數據等。通過對定時任務的理解,能夠更深入地理解 RocketMQ 的設計理念。
編輯:黃飛
?
評論
查看更多