一、應(yīng)用場景
目前系統(tǒng)中有很多需要用到延時處理的功能:支付超時取消、排隊超時、短信、微信等提醒延遲發(fā)送、token刷新、會員卡過期等等。通過延時處理,極大地節(jié)省系統(tǒng)的資源,不必輪詢數(shù)據(jù)庫處理任務(wù)。
目前大部分功能通過定時任務(wù)完成,定時任務(wù)還分使用quartz及xxljob兩種類型輪詢時間短,每秒執(zhí)行一次,對數(shù)據(jù)庫造成一定的壓力,并且會有1秒的誤差。輪詢時間久,如30分鐘一次,03:01插入一條數(shù)據(jù),正常3:31執(zhí)行過期,但是3:30執(zhí)行輪詢時,掃描330的數(shù)據(jù),是掃描不到3:31的數(shù)據(jù)的,需要4:00的時候才能掃描到,相當(dāng)于多延遲了29分鐘!
二、演示處理方式調(diào)研
1.DelayQueue
實現(xiàn)方式:
jvm提供的延遲阻塞隊列,通過優(yōu)先級隊列對不同延遲時間任務(wù)進(jìn)行排序,通過condition進(jìn)行阻塞、睡眠dealy時間 獲取延遲任務(wù)。
當(dāng)有新任務(wù)加入時,會判斷新任務(wù)是否是第一個待執(zhí)行的任務(wù),若是,會解除隊列睡眠,防止新加入的元素時需要執(zhí)行的元素而不能正常被執(zhí)行線程獲取到。
存在的問題:
單機(jī)運行,系統(tǒng)宕機(jī)后,無法進(jìn)行有效的重試
沒有執(zhí)行記錄和備份
沒有重試機(jī)制
系統(tǒng)重啟時,會將任務(wù)清空!
不能分片消費
優(yōu)勢: 實現(xiàn)簡單,無任務(wù)時阻塞,節(jié)省資源,執(zhí)行時間準(zhǔn)確
2.延遲隊列mq
實現(xiàn)方式:依賴mq,通過設(shè)置延遲消費時間,達(dá)到延遲消費功能。像rabbitMq、jmq都可以設(shè)置延遲消費時間。RabbitMq通過將消息設(shè)置過期時間,放入私信隊列進(jìn)行消費實現(xiàn)。
存在的問題:時間設(shè)置不靈活,每個queue是固定的到期時間,每次新創(chuàng)建延時隊列,需要創(chuàng)建新的消息隊列
優(yōu)點:依靠jmq,可以有效的監(jiān)控、消費記錄、重試,具備多機(jī)同時消費能力,不懼怕宕機(jī)
3.定時任務(wù)
通過定時任務(wù)輪詢符合條件的數(shù)據(jù)
缺點:
必須要讀業(yè)務(wù)數(shù)據(jù)庫,對數(shù)據(jù)庫造成一定的壓力,
存在延時
一次掃描數(shù)據(jù)量過大時,占用過多的系統(tǒng)資源。
無法分片消費
優(yōu)點:
消費失敗后,下次還能繼續(xù)消費,具備重試能力,
消費能力穩(wěn)定
4.redis
任務(wù)存儲在redis中,使用redis的 zset隊列根據(jù)score進(jìn)行排序,程序通過線程不斷獲取隊列數(shù)據(jù)消費,實現(xiàn)延時隊列
優(yōu)點:
查詢redis相比較數(shù)據(jù)庫快,set隊列長度過大,會根據(jù)跳表結(jié)構(gòu)進(jìn)行查詢,效率高
redis可根據(jù)時間戳進(jìn)行排序,只需要查詢當(dāng)前時間戳內(nèi)的分?jǐn)?shù)的任務(wù)即可
無懼機(jī)器重啟
分布式消費
缺點:
受限于redis性能,并發(fā)10W
多個命令無法保證原子性,使用lua腳本會要求所有數(shù)據(jù)都在一個redis分片上。
5. 時間輪
通過時間輪實現(xiàn)的延遲任務(wù)執(zhí)行,也是基于jvm單機(jī)運行,如kafka、netty都有實現(xiàn)時間輪,redisson的看門狗也是通過netty的時間輪實現(xiàn)的。
缺點:不適合分布式服務(wù)的使用,宕機(jī)后,會丟失任務(wù)。
三、實現(xiàn)目標(biāo)
兼容目前在使用的異步事件組件,并提供更可靠,可重試、有記錄、可監(jiān)控報警、高性能的延遲組件。
消息傳輸可靠性:消息進(jìn)入到延遲隊列后,保證至少被消費一次。
Client支持豐富:支持多重語言。
高可用性:支持多實例部署。掛掉一個實例后,還有后備實例繼續(xù)提供服務(wù)。
實時性:允許存在一定的時間誤差。
支持消息刪除:業(yè)務(wù)使用方,可以隨時刪除指定消息。
支持消費查詢
支持手動重試
對當(dāng)前異步事件的執(zhí)行增加監(jiān)控
四、架構(gòu)設(shè)計
五、延遲組件實現(xiàn)方式
1.實現(xiàn)原理
目前選擇使用jimdb通過zset實現(xiàn)延時功能,將任務(wù)id和對應(yīng)的執(zhí)行時間作為score存在在zset隊列中,默認(rèn)會按照score排序,每次取0-當(dāng)前時間內(nèi)的score的任務(wù)id,
發(fā)送延遲任務(wù)時,會根據(jù)時間戳+機(jī)器ip+queueName+sequence 生成唯一的id,構(gòu)造消息體,加密后放入zset隊列中。
通過搬運線程,將達(dá)到執(zhí)行時間的任務(wù)移動到發(fā)布隊列中,等待消費者獲取。
監(jiān)控方通過集成ump
消費記錄通過redis備份+數(shù)據(jù)庫持久化完成。
通過緩存實現(xiàn)的方式,只是實現(xiàn)的一種,可以通過參數(shù)控制使用哪一種實現(xiàn)方式,并可通過spi自由擴(kuò)展。
2.消息結(jié)構(gòu)
每個Job必須包含以下幾個屬性:
Topic:Job類型,即QueueName
Id:Job的唯一標(biāo)識。用來檢索和刪除指定的Job信息。
Delay:Job需要延遲的時間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時間)
Body:Job的內(nèi)容,供消費者做具體的業(yè)務(wù)處理,以json格式存儲。
traceId:發(fā)送線程的traceId,待后續(xù)pfinder支持設(shè)置traceId后,可與發(fā)送線程公用同一個traceiD,便于日志追蹤
具體結(jié)構(gòu)如下圖表示:
TTR的設(shè)計目的是為了保證消息傳輸?shù)目煽啃浴?/p>
3.數(shù)據(jù)流轉(zhuǎn)及流程圖
基于redis-disruptor方式進(jìn)行發(fā)布、消費,可以作為消息來進(jìn)行使用,消費者采用原有異步事件的disruptor無鎖隊列消費,不同應(yīng)用、不同queue之間無鎖
1)支持應(yīng)用只發(fā)布,不消費,達(dá)到消息隊列的功能。
2)支持分桶,針對大key問題,若事件多,可以設(shè)置延遲隊列和任務(wù)隊列桶的數(shù)量,減小因大key造成的redis阻塞問題。
3)通過ducc配置,進(jìn)行性能的擴(kuò)展,目前只支持開啟消費和關(guān)閉消費。
4)支持設(shè)置超時時間配置,防止消費線程執(zhí)行過久
?瓶頸:消費速度慢,生產(chǎn)速度過快,會導(dǎo)致ringbuffer隊列占滿,當(dāng)前應(yīng)用既是生產(chǎn)者也是消費者時,生產(chǎn)者會休眠,性能取決于消費速度,可通過水平擴(kuò)展機(jī)器,直接提升性能。監(jiān)控redis隊列的長度,若不斷增長,可考慮增加消費者,直接提高性能。
可能出現(xiàn)的情況:因一個應(yīng)用公用一個disruptor,擁有64個消費者線程,如果某一個事件消費過慢,導(dǎo)致64個線程都在消費這個事件,會導(dǎo)致其他事件無消費線程消費,生產(chǎn)者線程也被阻塞,導(dǎo)致所有事件的消費都被阻塞。
后期觀察是否有這個性能瓶頸,可給每一個queue一個消費者線程池。
六、demo示例
增加配置文件
判斷是否開啟jd.event.enable:true
配置com.jd.car senna-event 1.0-SNAPSHOT
jd: senna: event: enable: true queue: retryEventQueue: bucketNum: 1 handleBean: retryHandle消費代碼:
package com.jd.car.senna.admin.event; import com.jd.car.senna.event.EventHandler; import com.jd.car.senna.event.annotation.SennaEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @author zhangluyao * @description * @create 2022-02-21-9:54 下午 */ @Slf4j @Component("retryHandle") public class RetryQueueEvent extends EventHandler { @Override protected void onHandle(String key, String eventType) { log.info("Handler開始消費:{}", key); } @Override protected void onDelayHandle(String key, String eventType) { log.info("delayHandler開始消費:{}", key); } }
注解形式:
package com.jd.car.senna.admin.event; import com.jd.car.senna.event.EventHandler; import com.jd.car.senna.event.annotation.SennaEvent; import lombok.extern.slf4j.Slf4j; /** * @author zhangluyao * @description * @create 2022-02-21-9:54 下午 */ @Slf4j @SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true) public class TestQueueEvent extends EventHandler { @Override protected void onHandle(String key, String eventType) { log.info("Handler開始消費:{}", key); } @Override protected void onDelayHandle(String key, String eventType) { log.info("delayHandler開始消費:{}", key); } }
發(fā)送代碼:
package com.jd.car.senna.admin.controller; import com.jd.car.senna.event.queue.IEventQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.CompletableFuture; /** * @author zly */ @RestController @Slf4j public class DemoController { @Lazy @Resource(name = "testQueue") private IEventQueue eventQueue; @ResponseBody @GetMapping("/api/v1/demo") public String demo() { log.info("發(fā)送無延遲消息"); eventQueue.push("no delay 5000 millseconds message 3"); return "ok"; } @ResponseBody @GetMapping("/api/v1/demo1") public String demo1() { log.info("發(fā)送延遲5秒消息"); eventQueue.push(" delay 5000 millseconds message,name",1000*5L); return "ok"; } @ResponseBody @GetMapping("/api/v1/demo2") public String demo2() { log.info("發(fā)送延遲到2022-04-02 00:00:00執(zhí)行的消息"); eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000)); return "ok"; } }
審核編輯:劉清
-
看門狗
+關(guān)注
關(guān)注
10文章
559瀏覽量
70746 -
SPI接口
+關(guān)注
關(guān)注
0文章
258瀏覽量
34342 -
JVM
+關(guān)注
關(guān)注
0文章
157瀏覽量
12209 -
Redis
+關(guān)注
關(guān)注
0文章
371瀏覽量
10846
原文標(biāo)題:一種異步延遲隊列的實現(xiàn)方式
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論