精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

如何用Redis實現延遲隊列呢?

jf_ro2CN3Fa ? 來源:三友的java日記 ? 2023-03-16 14:28 ? 次閱讀

背景

前段時間有個小項目需要使用延遲任務,談到延遲任務,我腦子第一時間一閃而過的就是使用消息隊列來做,比如RabbitMQ的死信隊列又或者RocketMQ的延遲隊列,但是奈何這是一個小項目,并沒有引入MQ,我也不太想因為一個延遲任務就引入MQ,增加系統復雜度,所以這個方案直接就被pass了。

雖然基于MQ這個方式走不通了,但是這個項目中使用到Redis,所以我就想是否能夠使用Redis來代替MQ實現延遲隊列的功能,于是我就查了一下有沒有現成可用的方案,別說,還真給我查到了兩種方案,并且我還仔細研究對比了這兩個方案,發現要想很好的實現延遲隊列,并不簡單。

監聽過期key

基于監聽過期key的方式來實現延遲隊列是我查到的第一個方案,為了弄懂這個方案實現的細節,我還特地去扒了扒官網,還真有所收獲

1、Redis發布訂閱模式

一談到發布訂閱模式,其實一想到的就是MQ,只不過Redis也實現了一套,并且跟MQ賊像,如圖:

f3e30a0a-c3b1-11ed-bfe3-dac502259ad0.png

圖中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。

生產者在消息發送時需要到指定發送到哪個channel上,消費者訂閱這個channel就能獲取到消息。

2、keyspace notifications

在Redis中,有很多默認的channel,只不過向這些channel發送消息的生產者不是我們寫的代碼,而是Redis本身。當消費者監聽這些channel時,就可以感知到Redis中數據的變化。

這個功能Redis官方稱為keyspace notifications,字面意思就是鍵空間通知。

這些默認的channel被分為兩類:

以__keyspace@__:為前綴,后面跟的是key的名稱,表示監聽跟這個key有關的事件。

舉個例子,現在有個消費者監聽了__keyspace@0__:sanyou這個channel,sanyou就是Redis中的一個普通key,那么當sanyou這個key被刪除或者發生了其它事件,那么消費者就會收到sanyou這個key刪除或者其它事件的消息

以__keyevent@__:為前綴,后面跟的是消息事件類型,表示監聽某個事件

同樣舉個例子,現在有個消費者監聽了__keyevent@0__:expired這個channel,代表了監聽key的過期事件。那么當某個Redis的key過期了(expired),那么消費者就能收到這個key過期的消息。如果把expired換成del,那么監聽的就是刪除事件。具體支持哪些事件,可從官網查。

上述db是指具體的數據庫,Redis不是默認分為16個庫么,序號從0-15,所以db就是0到15的數字,示例中的0就是指0對應的數據庫。

f3fa3f36-c3b1-11ed-bfe3-dac502259ad0.png

3、延遲隊列實現原理

通過對上面的兩個概念了解之后,應該就對監聽過期key的實現原理一目了然了,其實就是當這個key過期之后,Redis會發布一個key過期的事件到__keyevent@__:expired這個channel,只要我們的服務監聽這個channel,那么就能知道過期的Key,從而就算實現了延遲隊列功能。

所以這種方式實現延遲隊列就只需要兩步:

發送延遲任務,key是延遲消息本身,過期時間就是延遲時間

監聽__keyevent@__:expired這個channel,處理延遲任務

4、demo

好了,基本概念和核心原理都說完了之后,又到了show me the code環節。

好巧不巧,Spring已經實現了監聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監聽所有的數據庫。

所以demo寫起來就很簡單了,只需3步即可

引入pom


org.springframework.boot
spring-boot-starter-data-redis
2.2.5.RELEASE



org.springframework.boot
spring-boot-starter-web
2.2.5.RELEASE

配置類

@Configuration
publicclassRedisConfiguration{

@Bean
publicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){
RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
returnredisMessageListenerContainer;
}

@Bean
publicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerContainerredisMessageListenerContainer){
returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer);
}

}

KeyExpirationEventMessageListener實現了對__keyevent@*__:expiredchannel的監聽

f4060884-c3b1-11ed-bfe3-dac502259ad0.png

當KeyExpirationEventMessageListener收到Redis發布的過期Key的消息的時候,會發布RedisKeyExpiredEvent事件

f4560ff0-c3b1-11ed-bfe3-dac502259ad0.png

所以我們只需要監聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。

對RedisKeyExpiredEvent事件的監聽實現MyRedisKeyExpiredEventListener

@Component
publicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{

@Override
publicvoidonApplicationEvent(RedisKeyExpiredEventevent){
byte[]body=event.getSource();
System.out.println("獲取到延遲消息:"+newString(body));
}

}

整個工程目錄也簡單

f4a5f2f4-c3b1-11ed-bfe3-dac502259ad0.png

代碼寫好,啟動應用

之后我直接通過Redis命令設置消息,就沒通過代碼發送消息了,消息的key為sanyou,值為task,值不重要,過期時間為5s

setsanyoutask

expiresanyou5

如果上面都理論都正確,不出意外的話,5s后MyRedisKeyExpiredEventListener應該可以監聽到sanyou這個key過期的消息,也就相當于拿到了延遲任務,控制臺會打印出獲取到延遲消息:sanyou。

于是我滿懷希望,靜靜地等待了5s。。

5、4、3、2、1,時間一到,我查看控制臺,但是控制臺并沒有按照預期打印出上面那句話。

為什么會沒打印出?難道是代碼寫錯了?正當我準備檢查代碼的時候,官網的一段話道出了真實原因。

f4dd25ee-c3b1-11ed-bfe3-dac502259ad0.png

我給大家翻譯一下上面這段話講的內容。

上面這段話主要討論的是key過期事件的時效性問題,首先提到了Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

惰性清除。當這個key過期之后,訪問時,這個Key才會被清除

定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除

再后面那段話是核心,意思是說,key的過期事件發布時機并不是當這個key的過期時間到了之后就發布,而是這個key在Redis中被清理之后,也就是真正被刪除之后才會發布。

到這我終于明白了,上面的例子中即使我設置了5s的過期時間,但是當5s過去之后,只要兩種清除策略都不滿足,沒人訪問sanyou這個key,后臺的定時清理的任務也沒掃描到sanyou這個key,那么就不會發布key過期的事件,自然而然也就監聽不到了。

至于后臺的定時清理的任務什么時候能掃到,這個沒有固定時間,可能一到過期時間就被掃到,也可能等一定時間才會被掃到,這就可能會造成了客戶端從發布到監聽到的消息時間差會大于等于過期時間,從而造成一定時間消息的延遲,這就著實有點坑了。。

5、坑

除了上面測試demo的時候遇到的坑之外,在我深入研究之后,還發現了一些更離譜的坑。

丟消息太頻繁

Redis的丟消息跟MQ不一樣,因為MQ都會有消息的持久化機制,可能只有當機器宕機了,才會丟點消息,但是Redis丟消息就很離譜,比如說你的服務在重啟的時候就消息會丟消息。

Redis實現的發布訂閱模式,消息是沒有持久化機制,當消息發布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

所以說,假設服務重啟期間,某個生產者或者是Redis本身發布了一條消息到某個channel,由于服務重啟,沒有監聽這個channel,那么這個消息自然就丟了。

消息消費只有廣播模式

Redis的發布訂閱模式消息消費只有廣播模式一種。

所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發布到這個channel的所有消息。

f500e1e6-c3b1-11ed-bfe3-dac502259ad0.png

如圖,生產者發布了一條消息,內容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。

所以,如果通過監聽channel來獲取延遲任務,那么一旦服務實例有多個的話,還得保證消息不能重復處理,額外地增加了代碼開發量。

接收到所有key的某個事件

這個不屬于Redis發布訂閱模式的問題,而是Redis本身事件通知的問題。

當消費者監聽了以__keyevent@__:開頭的消息,那么會導致所有的key發生了事件都會被通知給消費者。

舉個例子,某個消費者監聽了__keyevent@*__:expired這個channel,那么只要key過期了,不管這個key是張三還會李四,消費者都能收到。

所以如果你只想消費某一類消息的key,那么還得自行加一些標記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務。

所以,綜上能夠得出一個非常重要的結論,那就是監聽Redis過期Key這種方式實現延遲隊列,不穩定,坑賊多!

那有沒有比較靠譜的延遲隊列的實現方案呢?這就不得不提到我研究的第二種方案了。

Redisson實現延遲隊列

Redisson他是Redis的兒子(Redis son),基于Redis實現了非常多的功能,其中最常使用的就是Redis分布式鎖的實現,但是除了實現Redis分布式鎖之外,它還實現了延遲隊列的功能。

先來個demo,后面再來說說這種實現的原理。

1、demo

引入pom


org.redisson
redisson
3.13.1

封裝了一個RedissonDelayQueue類

@Component
@Slf4j
publicclassRedissonDelayQueue{

privateRedissonClientredissonClient;

privateRDelayedQueuedelayQueue;
privateRBlockingQueueblockingQueue;

@PostConstruct
publicvoidinit(){
initDelayQueue();
startDelayQueueConsumer();
}

privatevoidinitDelayQueue(){
Configconfig=newConfig();
SingleServerConfigserverConfig=config.useSingleServer();
serverConfig.setAddress("redis://localhost:6379");
redissonClient=Redisson.create(config);

blockingQueue=redissonClient.getBlockingQueue("SANYOU");
delayQueue=redissonClient.getDelayedQueue(blockingQueue);
}

privatevoidstartDelayQueueConsumer(){
newThread(()->{
while(true){
try{
Stringtask=blockingQueue.take();
log.info("接收到延遲任務:{}",task);
}catch(Exceptione){
e.printStackTrace();
}
}
},"SANYOU-Consumer").start();
}

publicvoidofferTask(Stringtask,longseconds){
log.info("添加延遲任務:{}延遲時間:{}s",task,seconds);
delayQueue.offer(task,seconds,TimeUnit.SECONDS);
}

}

這個類在創建的時候會去初始化延遲隊列,創建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。

當延遲隊列創建之后,會開啟一個延遲任務的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。

添加任務的時候是通過RDelayedQueue的offer方法添加的。

controller類,通過接口添加任務,延遲時間為5s

@RestController
publicclassRedissonDelayQueueController{

@Resource
privateRedissonDelayQueueredissonDelayQueue;

@GetMapping("/add")
publicvoidaddTask(@RequestParam("task")Stringtask){
redissonDelayQueue.offerTask(task,5);
}

}

啟動項目,添加任務

靜靜等待5s,成功獲取到任務。

f510d5e2-c3b1-11ed-bfe3-dac502259ad0.png

2、實現原理

如下圖就是上面demo中,一個延遲隊列會在Redis內部使用到的channel和數據類型

f51a509a-c3b1-11ed-bfe3-dac502259ad0.png

SANYOU前面的前綴都是固定的,Redisson創建的時候會拼上前綴。

redisson_delay_queue_timeout:SANYOU,sorted set數據類型,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執行的任務,這個概念很重要

redisson_delay_queue:SANYOU,list數據類型,也是存放所有的任務,但是研究下來發現好像沒什么用。。

SANYOU,list數據類型,被稱為目標隊列,這個里面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務的

redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務

有了這些概念之后,再來看看整體的運行原理圖

f534b340-c3b1-11ed-bfe3-dac502259ad0.png

生產者在提交任務的時候將任務放到redisson_delay_queue_timeout:SANYOU中,分數就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳

客戶端會有一個延遲任務,為了區分,后面我都說是客戶端延遲任務。這個延遲任務會向Redis Server發送一段lua腳本,Redis執行lua腳本中的命令,并且是原子性的

f5718518-c3b1-11ed-bfe3-dac502259ad0.png

這段lua腳本主要干了兩件事:

將到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標隊列

獲取到redisson_delay_queue_timeout:SANYOU中目前最早到過期時間的延遲任務的到期時間戳,然后發布到redisson_delay_queue_channel:SANYOU這個channel中

當客戶端監聽到redisson_delay_queue_channel:SANYOU這個channel的消息時,會再次提交一個客戶端延遲任務,延遲時間就是消息(最早到過期時間的延遲任務的到期時間戳)- 當前時間戳,這個時間其實也就是redisson_delay_queue_channel:SANYOU中最早到過期時間的任務還剩余的延遲時間。

此處可以等待10s,好好想想。。

這樣,一旦時間來到了上面說的最早到過期時間任務的到期時間戳,redisson_delay_queue_timeout:SANYOU中上面說的最早到過期時間的任務已經到期了,客戶端的延遲任務也同時到期,于是開始執行lua腳本操作,及時將到了延遲時間的任務放到目標隊列中。然后再次發布剩余的延遲任務中最早到期的任務到期時間戳到channe中,如此循環往復,一直運行下去,保證redisson_delay_queue_timeout:SANYOU中到期的數據能及時放到目標隊列中。

所以,上述說了一大堆的主要的作用就是保證到了延遲時間的任務能夠及時被放到目標隊列。

這里再補充兩個特殊情況,圖中沒有畫出:

第一個就是如果redisson_delay_queue_timeout:SANYOU是新添加的任務(隊列之前有或者沒有任務)是隊列中最早需要被執行的,也會發布消息到channel,之后就按時上面說的流程走了。

添加任務代碼如下,也是通過lua腳本來的

f5ad6d94-c3b1-11ed-bfe3-dac502259ad0.png

第二種特殊情況就是項目啟動的時候會執行一次客戶端延遲任務。項目在重啟時,由于沒有客戶端延遲任務的執行,可能會出現redisson_delay_queue_timeout:SANYOU隊列中有到期但是沒有被放到目標隊列的可能,重啟就執行一次就是為了保證到期的數據能被及時放到目標隊列中。

3、與第一種方案比較

現在來比較一下第一種方案和Redisson的這種方案,看看有沒有第一種方案的那些坑。

第一個任務延遲的問題,Redisson方案理論上是沒有延遲的,但是當消息數量增加,消費者消費緩慢這個情況下可能會導致延遲任務消費的延遲。

第二個丟消息的問題,Redisson方案很大程度上減輕了丟消息的可能性,因為所有的任務都是存在list和sorted set兩種數據類型中,Redis有持久化機制,就算Redis宕機了,也就可能會丟一點點數據。

第三個廣播消費任務的問題,這個是不會出現的,因為每個客戶端都是從同一個目標隊列中獲取任務的。

第四個問題是Redis內部channel發布事件的問題,跟這種方案不沾邊,就更不可能存在了。

所以,通過上面的對比可以看出,Redisson這種實現方案就顯得更加的靠譜了。






審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據庫
    +關注

    關注

    7

    文章

    3763

    瀏覽量

    64274
  • lua腳本
    +關注

    關注

    0

    文章

    21

    瀏覽量

    7577
  • Redis
    +關注

    關注

    0

    文章

    371

    瀏覽量

    10846

原文標題:用 Redis 實現延遲隊列,我研究了兩種方案,發現并不簡單

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    延遲隊列實現方式

    延遲任務 最近有一個需求,基于消息隊列對數據消費,并根據多次消費的結果對數據進行重新組裝,如果在指定時間內,需要的數據全部到達,則進行數據組裝以及后續邏輯。簡單的說,設置一個超時時間,如果在該時間內
    的頭像 發表于 09-30 11:17 ?791次閱讀

    MySQL與Redis延遲雙刪策略

    中,并且如果數據庫中的數據發生了改變則需要同步到redis中,同步過程中需要保證 MySQL與redis數據一致性問題,在這個同步過程中出現短暫的數據延遲也是正常現象,但是最終需要保證mysql與緩存中的一致性。 //我們通常使
    的頭像 發表于 09-25 14:28 ?886次閱讀
    MySQL與<b class='flag-5'>Redis</b><b class='flag-5'>延遲</b>雙刪策略

    Redis Stream應用案例

    的基本使用介紹和設計理念可以看我之前的一篇文章(Redis Stream簡介)。Redis Stream本質上是在Redis內核上(非Redis Module)
    發表于 06-26 17:15

    FreeRtos中消息隊列API的調用該怎樣去實現

    消息隊列是什么?消息隊列有何作用?FreeRtos中消息隊列API的調用該怎樣去實現
    發表于 01-20 07:04

    如何去實現一種隊列程序的設計

    隊列的原理是什么?隊列有何作用?如何去實現一種隊列程序的設計
    發表于 02-25 07:50

    環形隊列的操作如何去實現

    環形隊列結構的定義是什么?環形隊列的操作如何去實現
    發表于 02-25 06:35

    redis工作原理

    Redis作為內存數據庫,擁有非常高的性能,單個實例的QPS能夠達到10W左右。但我們在使用Redis時,經常時不時會出現訪問延遲很大的情況,如果你不知道Redis的內部
    的頭像 發表于 09-24 15:57 ?3537次閱讀

    Redis 延時隊列,一次性搞明白

    所謂延時隊列就是延時的消息隊列,下面說一下一些業務場景 實踐場景 訂單支付失敗,每隔一段時間提醒用戶 用戶并發量的情況,可以延時2分鐘給用戶發短信 先來看看Redis實現普通的消息
    的頭像 發表于 10-30 16:34 ?2121次閱讀
    <b class='flag-5'>Redis</b> 延時<b class='flag-5'>隊列</b>,一次性搞明白

    Springboot+redis操作多種實現

    操作。Jedis客戶端實例不是線程安全的,需要通過連接池來使用Jedis。 1.2、Redisson 優點點:分布式鎖,分布式集合,可通過Redis支持延遲隊列。 1.3、
    的頭像 發表于 09-22 10:48 ?1796次閱讀
    Springboot+<b class='flag-5'>redis</b>操作多種<b class='flag-5'>實現</b>

    什么是 Redis

    其他用例中變得可行,包括發布-訂閱機制、流(streaming)和隊列。 主要來說,Redis 是一個內存數據庫,用作另一個“真實”數據
    的頭像 發表于 05-22 15:32 ?1080次閱讀
    什么是 <b class='flag-5'>Redis</b>

    何用Springboot整合Redis

    本篇文件我們來介紹如何用Springboot整合Redis。 1、Docker 安裝 Redis 1.1 下載鏡像 docker pull redis: 6 . 2 . 6 1.2 創
    的頭像 發表于 10-08 14:56 ?559次閱讀
    如<b class='flag-5'>何用</b>Springboot整合<b class='flag-5'>Redis</b>

    Java redis鎖怎么實現

    在Java中實現Redis鎖涉及到以下幾個方面:Redis的安裝配置、Redis連接池的使用、Redis數據結構的選擇、
    的頭像 發表于 12-04 10:47 ?1118次閱讀

    redis hash底層實現原理

    數據結構是如何實現?本文將詳細介紹Redis哈希底層的實現原理。 在Redis中,每個哈希都是由一個類似于字典(Dictionary)的
    的頭像 發表于 12-04 16:27 ?559次閱讀

    redis容器內怎么查看redis日志

    redis是一款流行的開源內存數據庫,常用于緩存、消息隊列、任務管理等場景。在使用redis時,了解如何查看redis日志對于排查問題、監控性能和分析應用程序行為非常重要。在本文中,我
    的頭像 發表于 12-05 10:10 ?3484次閱讀

    redis數據結構的底層實現

    Redis是一種內存鍵值數據庫,常用于緩存、消息隊列、實時數據分析等場景。它的高性能得益于其精心設計的數據結構和底層實現。本文將詳細介紹Redis常用的數據結構和它們的底層
    的頭像 發表于 12-05 10:14 ?590次閱讀