1 前言
之前旁邊的小伙伴問我熱點數(shù)據(jù)相關問題,在給他粗略的講解一波redis數(shù)據(jù)傾斜的案例之后,自己也順道回顧了一些關于熱點數(shù)據(jù)處理的方法論,同時也想起去年所學習JD開源項目hotkey——專門用來解決熱點數(shù)據(jù)問題的框架。在這里結(jié)合兩者所關聯(lián)到的知識點,通過幾個小圖和部分粗略的講解,來讓大家了解相關方法論以及hotkey的源碼解析。
2 Redis數(shù)據(jù)傾斜
2.1 定義與危害
先說說數(shù)據(jù)傾斜的定義,借用百度詞條的解釋:
對于集群系統(tǒng),一般緩存是分布式的,即不同節(jié)點負責一定范圍的緩存數(shù)據(jù)。我們把緩存數(shù)據(jù)分散度不夠,導致大量的緩存數(shù)據(jù)集中到了一臺或者幾臺服務節(jié)點上,稱為數(shù)據(jù)傾斜。一般來說數(shù)據(jù)傾斜是由于負載均衡實施的效果不好引起的。
從上面的定義中可以得知,數(shù)據(jù)傾斜的原因一般是因為LB的效果不好,導致部分節(jié)點數(shù)據(jù)量非常集中。
那這又會有什么危害呢?
如果發(fā)生了數(shù)據(jù)傾斜,那么保存了大量數(shù)據(jù),或者是保存了熱點數(shù)據(jù)的實例的處理壓力就會增大,速度變慢,甚至還可能會引起這個實例的內(nèi)存資源耗盡,從而崩潰。這是我們在應用切片集群時要避免的。
2.2 數(shù)據(jù)傾斜的分類
2.2.1 數(shù)據(jù)量傾斜(寫入傾斜)
1.圖示
如圖,在某些情況下,實例上的數(shù)據(jù)分布不均衡,某個實例上的數(shù)據(jù)特別多。
2.bigkey導致傾斜
某個實例上正好保存了 bigkey。bigkey 的 value 值很大(String 類型),或者是 bigkey 保存了大量集合元素(集合類型),會導致這個實例的數(shù)據(jù)量增加,內(nèi)存資源消耗也相應增加。
應對方法
在業(yè)務層生成數(shù)據(jù)時,要盡量避免把過多的數(shù)據(jù)保存在同一個鍵值對中。
如果 bigkey 正好是集合類型,還有一個方法,就是把 bigkey 拆分成很多個小的集合類型數(shù)據(jù),分散保存在不同的實例上。
3.Slot分配不均導致傾斜
先簡單的介紹一下slot的概念,slot其實全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 個 Slot,這些哈希槽類似于數(shù)據(jù)分區(qū),每個鍵值對都會根據(jù)它的 key,被映射到一個哈希槽中。Redis Cluster 方案采用哈希槽來處理數(shù)據(jù)和實例之間的映射關系。
一張圖來解釋,數(shù)據(jù)、哈希槽、實例這三者的映射分布情況。
這里的CRC16(city)%16384可以簡單的理解為將key1根據(jù)CRC16算法取hash值然后對slot個數(shù)取模,得到的就是slot位置為14484,他所對應的實例節(jié)點是第三個。
運維在構(gòu)建切片集群時候,需要手動分配哈希槽,并且把16384 個槽都分配完,否則 Redis 集群無法正常工作。由于是手動分配,則可能會導致部分實例所分配的slot過多,導致數(shù)據(jù)傾斜。
應對方法
使用CLUSTER SLOTS 命令來查看slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個命令來進行slot數(shù)據(jù)的遷移,具體內(nèi)容不再這里細說,感興趣的同學可以自行學習一下。
4.Hash Tag導致傾斜
Hash Tag 定義 :指當一個key包含 {} 的時候,就不對整個key做hash,而僅對 {} 包括的字符串做hash。
假設hash算法為sha1。對user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。
Hash Tag 優(yōu)勢 :如果不同 key 的 Hash Tag 內(nèi)容都是一樣的,那么,這些 key 對應的數(shù)據(jù)會被映射到同一個 Slot 中,同時會被分配到同一個實例上。
Hash Tag 劣勢 :如果不合理使用,會導致大量的數(shù)據(jù)可能被集中到一個實例上發(fā)生數(shù)據(jù)傾斜,集群中的負載不均衡。
2.2.2 數(shù)據(jù)訪問傾斜(讀取傾斜-熱key問題)
一般來說數(shù)據(jù)訪問傾斜就是熱key問題導致的,如何處理redis熱key問題也是面試中常會問到的。所以了解相關概念及方法論也是不可或缺的一環(huán)。
1.圖示
如圖,雖然每個集群實例上的數(shù)據(jù)量相差不大,但是某個實例上的數(shù)據(jù)是熱點數(shù)據(jù),被訪問得非常頻繁。
但是為啥會有熱點數(shù)據(jù)的產(chǎn)生呢?
2.產(chǎn)生熱key的原因及危害
1)用戶消費的數(shù)據(jù)遠大于生產(chǎn)的數(shù)據(jù)(熱賣商品、熱點新聞、熱點評論、明星直播)。
在日常工作生活中一些突發(fā)的的事件,例如:雙十一期間某些熱門商品的降價促銷,當這其中的某一件商品被數(shù)萬次點擊瀏覽或者購買時,會形成一個較大的需求量,這種情況下就會造成熱點問題。
同理,被大量刊發(fā)、瀏覽的熱點新聞、熱點評論、明星直播等,這些典型的讀多寫少的場景也會產(chǎn)生熱點問題。
2)請求分片集中,超過單 Server 的性能極限。
在服務端讀數(shù)據(jù)進行訪問時,往往會對數(shù)據(jù)進行分片切分,此過程中會在某一主機 Server 上對相應的 Key 進行訪問,當訪問超過 Server 極限時,就會導致熱點 Key 問題的產(chǎn)生。
如果熱點過于集中,熱點 Key 的緩存過多,超過目前的緩存容量時,就會導致緩存分片服務被打垮現(xiàn)象的產(chǎn)生。當緩存服務崩潰后,此時再有請求產(chǎn)生,會緩存到后臺 DB 上,由于DB 本身性能較弱,在面臨大請求時很容易發(fā)生請求穿透現(xiàn)象,會進一步導致雪崩現(xiàn)象,嚴重影響設備的性能。
3.常用的熱key問題解決辦法:
解決方案一: 備份熱key
可以把熱點數(shù)據(jù)復制多份,在每一個數(shù)據(jù)副本的 key 中增加一個隨機后綴,讓它和其它副本數(shù)據(jù)不會被映射到同一個 Slot 中。
這里相當于把一份數(shù)據(jù)復制到其他實例上,這樣在訪問的時候也增加隨機前綴,將對一個實例的訪問壓力,均攤到其他實例上
例如:
我們在放入緩存時就將對應業(yè)務的緩存key拆分成多個不同的key。如下圖所示,我們首先在更新緩存的一側(cè),將key拆成N份,比如一個key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時都需要去改動這N個key,這一步就是拆key。
對于service端來講,我們就需要想辦法盡量將自己訪問的流量足夠的均勻。
如何給自己即將訪問的熱key上加入后綴?幾種辦法,根據(jù)本機的ip或mac地址做hash,之后的值與拆key的數(shù)量做取余,最終決定拼接成什么樣的key后綴,從而打到哪臺機器上;服務啟動時的一個隨機數(shù)對拆key的數(shù)量做取余。
偽代碼如下:
const M = N * 2
//生成隨機數(shù)
random = GenRandom(0, M)
//構(gòu)造備份新key
bakHotKey = hotKey + “_” + random
data = redis.GET(bakHotKey)
if data == NULL {
data = GetFromDB()
redis.SET(bakHotKey, expireTime + GenRandom(0,5))
}
解決方案二: 本地緩存+動態(tài)計算自動發(fā)現(xiàn)熱點緩存 基本流程圖
該方案通過主動發(fā)現(xiàn)熱點并對其進行存儲來解決熱點 Key 的問題。首先 Client 也會訪問 SLB,并且通過 SLB 將各種請求分發(fā)至 Proxy 中,Proxy 會按照基于路由的方式將請求轉(zhuǎn)發(fā)至后端的 Redis 中。 在熱點 key 的解決上是采用在服務端增加緩存的方式進行。具體來說就是在 Proxy 上增加本地緩存,本地緩存采用 LRU 算法來緩存熱點數(shù)據(jù),后端節(jié)點增加熱點數(shù)據(jù)計算模塊來返回熱點數(shù)據(jù)。
Proxy 架構(gòu)的主要有以下優(yōu)點:
Proxy 本地緩存熱點,讀能力可水平擴展
DB 節(jié)點定時計算熱點數(shù)據(jù)集合
DB 反饋 Proxy 熱點數(shù)據(jù)
對客戶端完全透明,不需做任何兼容
熱點數(shù)據(jù)的發(fā)現(xiàn)與存儲
對于熱點數(shù)據(jù)的發(fā)現(xiàn),首先會在一個周期內(nèi)對 Key 進行請求統(tǒng)計,在達到請求量級后會對熱點 Key 進行熱點定位,并將所有的熱點 Key 放入一個小的 LRU 鏈表內(nèi),在通過 Proxy 請求進行訪問時,若 Redis 發(fā)現(xiàn)待訪點是一個熱點,就會進入一個反饋階段,同時對該數(shù)據(jù)進行標記。 可以使用一個etcd或者zk集群來存儲反饋的熱點數(shù)據(jù),然后本地所有節(jié)點監(jiān)聽該熱點數(shù)據(jù),進而加載到本地JVM緩存中。
熱點數(shù)據(jù)的獲取
在熱點 Key 的處理上主要分為寫入跟讀取兩種形式,在數(shù)據(jù)寫入過程當 SLB 收到數(shù)據(jù) K1 并將其通過某一個 Proxy 寫入一個 Redis,完成數(shù)據(jù)的寫入。 假若經(jīng)過后端熱點模塊計算發(fā)現(xiàn) K1 成為熱點 key 后, Proxy 會將該熱點進行緩存,當下次客戶端再進行訪問 K1 時,可以不經(jīng) Redis。 最后由于 proxy 是可以水平擴充的,因此可以任意增強熱點數(shù)據(jù)的訪問能力。
最佳成熟方案: JD開源hotKey 這是目前較為成熟的自動探測熱key、分布式一致性緩存解決方案。原理就是在client端做洞察,然后上報對應hotkey,server端檢測到后,將對應hotkey下發(fā)到對應服務端做本地緩存,并且能保證本地緩存和遠程緩存的一致性。
在這里咱們就不細談了,這篇文章的第三部分:JD開源hotkey源碼解析里面會帶領大家了解其整體原理。
3 JD開源hotkey—自動探測熱key、分布式一致性緩存解決方案
3.1 解決痛點
從上面可知,熱點key問題在并發(fā)量比較高的系統(tǒng)中(特別是做秒殺活動)出現(xiàn)的頻率會比較高,對系統(tǒng)帶來的危害也很大。 那么針對此,hotkey誕生的目的是什么?需要解決的痛點是什么?以及它的實現(xiàn)原理。
在這里引用項目上的一段話來概述: 對任意突發(fā)性的無法預先感知的熱點數(shù)據(jù),包括并不限于熱點數(shù)據(jù)(如突發(fā)大量請求同一個商品)、熱用戶(如惡意爬蟲刷子)、熱接口(突發(fā)海量請求同一個接口)等,進行毫秒級精準探測到。然后對這些熱數(shù)據(jù)、熱用戶等,推送到所有服務端JVM內(nèi)存中,以大幅減輕對后端數(shù)據(jù)存儲層的沖擊,并可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地緩存、對熱用戶進行拒絕訪問、對熱接口進行熔斷或返回默認值)。這些熱數(shù)據(jù)在整個服務端集群內(nèi)保持一致性,并且業(yè)務隔離。
核心功能:熱數(shù)據(jù)探測并推送至集群各個服務器
3.2 集成方式
集成方式在這里就不詳述了,感興趣的同學可以自行搜索。
3.3 源碼解析
3.3.1 架構(gòu)簡介
1.全景圖一覽
流程介紹:
客戶端通過引用hotkey的client包,在啟動的時候上報自己的信息給worker,同時和worker之間建立長連接。定時拉取配置中心上面的規(guī)則信息和worker集群信息。
客戶端調(diào)用hotkey的ishot()的方法來首先匹配規(guī)則,然后統(tǒng)計是不是熱key。
通過定時任務把熱key數(shù)據(jù)上傳到worker節(jié)點。
worker集群在收取到所有關于這個key的數(shù)據(jù)以后(因為通過hash來決定key 上傳到哪個worker的,所以同一個key只會在同一個worker節(jié)點上),在和定義的規(guī)則進行匹配后判斷是不是熱key,如果是則推送給客戶端,完成本地緩存。
2.角色構(gòu)成
這里直接借用作者的描述: 1)etcd集群 etcd作為一個高性能的配置中心,可以以極小的資源占用,提供高效的監(jiān)聽訂閱服務。主要用于存放規(guī)則配置,各worker的ip地址,以及探測出的熱key、手工添加的熱key等。
2)client端jar包 就是在服務中添加的引用jar,引入后,就可以以便捷的方式去判斷某key是否熱key。同時,該jar完成了key上報、監(jiān)聽etcd里的rule變化、worker信息變化、熱key變化,對熱key進行本地caffeine緩存等。
3) worker端集群 worker端是一個獨立部署的Java程序,啟動后會連接etcd,并定期上報自己的ip信息,供client端獲取地址并進行長連接。之后,主要就是對各個client發(fā)來的待測key進行累加計算,當達到etcd里設定的rule閾值后,將熱key推送到各個client。
4) dashboard控制臺 控制臺是一個帶可視化界面的Java程序,也是連接到etcd,之后在控制臺設置各個APP的key規(guī)則,譬如2秒20次算熱。然后當worker探測出來熱key后,會將key發(fā)往etcd,dashboard也會監(jiān)聽熱key信息,進行入庫保存記錄。同時,dashboard也可以手工添加、刪除熱key,供各個client端監(jiān)聽。
3.hotkey工程結(jié)構(gòu)
3.3.2 client端
主要從下面三個方面來解析源碼:
4.客戶端啟動器
1)啟動方式
@PostConstruct
public void init() {
ClientStarter.Builder builder = new ClientStarter.Builder();
ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();
starter.startPipeline();
}
appName:是這個應用的名稱,一般為${spring.application.name}的值,后續(xù)所有的配置都以此為開頭 etcd:是etcd集群的地址,用逗號分隔,配置中心。 還可以看到ClientStarter實現(xiàn)了建造者模式,使代碼更為簡介。
2)核心入口 com.jd.platform.hotkey.client.ClientStarter#startPipeline
/**
* 啟動監(jiān)聽etcd
*/
public void startPipeline() {
JdLogger.info(getClass(), "etcdServer:" + etcdServer);
//設置caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設置etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);
PushSchedulerStarter.startCountPusher(10);
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
registEventBus();
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監(jiān)聽都開啟
starter.start();
}
該方法主要有五個功能:
① 設置本地緩存(caffeine)的最大值,并創(chuàng)建etcd實例
//設置caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設置etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
caffeineSize是本地緩存的最大值,在啟動的時候可以設置,不設置默認為200000。 etcdServer是上面說的etcd集群地址。
Context可以理解為一個配置類,里面就包含兩個字段:
public class Context {
public static String APP_NAME;
public static int CAFFEINE_SIZE;
}
EtcdConfigFactory是ectd配置中心的工廠類
public class EtcdConfigFactory {
private static IConfigCenter configCenter;
private EtcdConfigFactory() {}
public static IConfigCenter configCenter() {
return configCenter;
}
public static void buildConfigCenter(String etcdServer) {
//連接多個時,逗號分隔
configCenter = JdEtcdBuilder.build(etcdServer);
}
}
通過其configCenter()方法獲取創(chuàng)建etcd實例對象,IConfigCenter接口封裝了etcd實例對象的行為(包括基本的crud、監(jiān)控、續(xù)約等)
② 創(chuàng)建并啟動定時任務:PushSchedulerStarter
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key
PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數(shù)量統(tǒng)計,不可配置
pushPeriod是推送的間隔時間,可以再啟動的時候設置,最小為0.05s,推送越快,探測的越密集,會越快探測出來,但對client資源消耗相應增大
PushSchedulerStarter類
/**
* 每0.5秒推送一次待測key
*/
public static void startPusher(Long period) {
if (period == null || period <= 0) {
period = 500L;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
//熱key的收集器
IKeyCollector
//這里相當于每0.5秒,通過netty來給worker來推送收集到的熱key的信息,主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數(shù))
//這里面還有就是該熱key在每次上報的時候都會生成一個全局的唯一id,還有該熱key每次上報的創(chuàng)建時間是在netty發(fā)送的時候來生成,同一批次的熱key時間是相同的
List
if(CollectionUtil.isNotEmpty(hotKeyModels)){
//積攢了半秒的key集合,按照hash分發(fā)到不同的worker
KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.MILLISECONDS);
}
/**
* 每10秒推送一次數(shù)量統(tǒng)計
*/
public static void startCountPusher(Integer period) {
if (period == null || period <= 0) {
period = 10;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
IKeyCollector
List
if(CollectionUtil.isNotEmpty(keyCountModels)){
//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker
KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.SECONDS);
}
從上面兩個方法可知,都是通過定時線程池來實現(xiàn)定時任務的,都是守護線程。
咱們重點關注一下KeyHandlerFactory類,它是client端設計的一個比較巧妙的地方,從類名上直譯為key處理工廠。具體的實例對象是DefaultKeyHandler:
public class DefaultKeyHandler {
//推送HotKeyMsg消息到Netty的推送者
private IKeyPusher iKeyPusher = new NettyKeyPusher();
//待測key的收集器,這里面包含兩個map,key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)
private IKeyCollector
//數(shù)量收集器,這里面包含兩個map,這里面key是相應的規(guī)則,HitCount里面是這個規(guī)則的總訪問次數(shù)和熱后訪問次數(shù)
private IKeyCollector
public IKeyPusher keyPusher() {
return iKeyPusher;
}
public IKeyCollector
return iKeyCollector;
}
public IKeyCollector
return iKeyCounter;
}
}
這里面有三個成員對象,分別是封裝推送消息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數(shù)量收集器TurnCountCollector,其中后兩者都實現(xiàn)了接口IKeyCollector,能對hotkey的處理起到有效的聚合,充分體現(xiàn)了代碼的高內(nèi)聚。 先來看看封裝推送消息到netty的NettyKeyPusher:
/**
* 將msg推送到netty的pusher
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public class NettyKeyPusher implements IKeyPusher {
@Override
public void send(String appName, List
//積攢了半秒的key集合,按照hash分發(fā)到不同的worker
long now = System.currentTimeMillis();
Map
for(HotKeyModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());
if (channel == null) {
continue;
}
List
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);
hotKeyMsg.setHotKeyModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
@Override
public void sendCount(String appName, List
//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker
long now = System.currentTimeMillis();
Map
for(KeyCountModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());
if (channel == null) {
continue;
}
List
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);
hotKeyMsg.setKeyCountModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
}
send(String appName, Listlist) 主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel對象主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數(shù)) sendCount(String appName, Listlist) 主要是將TurnCountCollector收集的規(guī)則所對應的key通過netty推送給worker,KeyCountModel對象主要是一些key所對應的規(guī)則信息以及訪問次數(shù)等 WorkerInfoHolder.chooseChannel(model.getRuleKey()) 根據(jù)hash算法獲取key對應的服務器,分發(fā)到對應服務器相應的Channel 連接,所以服務端可以水平無限擴容,毫無壓力問題。
再來分析一下key收集器:TurnKeyCollector與TurnCountCollector: 實現(xiàn)IKeyCollector接口:
/**
* 對hotkey進行聚合
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public interface IKeyCollector
/**
* 鎖定后的返回值
*/
List
/**
* 輸入的參數(shù)
*/
void collect(T t);
void finishOnce();
}
lockAndGetResult() 主要是獲取返回collect方法收集的信息,并將本地暫存的信息清空,方便下個統(tǒng)計周期積攢數(shù)據(jù)。 collect(T t) 顧名思義他是收集api調(diào)用的時候,將收集的到key信息放到本地存儲。 finishOnce() 該方法目前實現(xiàn)都是空,無需關注。
待測key收集器:TurnKeyCollector
public class TurnKeyCollector implements IKeyCollector
//這map里面的key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)
private ConcurrentHashMap
private ConcurrentHashMap
private AtomicLong atomicLong = new AtomicLong(0);
@Override
public List
//自增后,對應的map就會停止被寫入,等待被讀取
atomicLong.addAndGet(1);
List
//可以觀察這里與collect方法里面的相同位置,會發(fā)現(xiàn)一個是操作map0一個是操作map1,這樣保證在讀map的時候,不會阻塞寫map,
//兩個map同時提供輪流提供讀寫能力,設計的很巧妙,值得學習
if (atomicLong.get() % 2 == 0) {
list = get(map1);
map1.clear();
} else {
list = get(map0);
map0.clear();
}
return list;
}
private List
return CollectionUtil.list(false, map.values());
}
@Override
public void collect(HotKeyModel hotKeyModel) {
String key = hotKeyModel.getKey();
if (StrUtil.isEmpty(key)) {
return;
}
if (atomicLong.get() % 2 == 0) {
//不存在時返回null并將key-value放入,已有相同key時,返回該key對應的value,并且不覆蓋
HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);
if (model != null) {
//增加該hotMey上報的次數(shù)
model.add(hotKeyModel.getCount());
}
} else {
HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);
if (model != null) {
model.add(hotKeyModel.getCount());
}
}
}
@Override
public void finishOnce() {}
}
可以看到該類中有兩個ConcurrentHashMap和一個AtomicLong,通過對AtomicLong來自增,然后對2取模,來分別控制兩個map的讀寫能力,保證每個map都能做讀寫,并且同一個map不能同時讀寫,這樣可以避免并發(fā)集合讀寫不阻塞,這一塊無鎖化的設計還是非常巧妙的,極大的提高了收集的吞吐量。 key數(shù)量收集器:TurnCountCollector 這里的設計與TurnKeyCollector大同小異,咱們就不細談了。值得一提的是它里面有個并行處理的機制,當收集的數(shù)量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時,lockAndGetResult處理是使用java Stream并行流處理,提升處理的效率。
③ 開啟worker重連器
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
public class WorkerRetryConnector {
/**
* 定時去重連沒連上的workers
*/
public static void retryConnectWorkers() {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取
scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);
}
private static void reConnectWorkers() {
List
if (nonList.size() == 0) {
return;
}
JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);
NettyClient.getInstance().connect(nonList);//這里會觸發(fā)netty連接方法channelActive
}
}
也是通過定時線程來執(zhí)行,默認時間間隔是30s,不可設置。 通過WorkerInfoHolder來控制client的worker連接信息,連接信息是個List,用的CopyOnWriteArrayList,畢竟是一個讀多寫少的場景,類似與元數(shù)據(jù)信息。
/**
* 保存worker的ip地址和Channel的映射關系,這是有序的。每次client發(fā)送消息時,都會根據(jù)該map的size進行hash
* 如key-1就發(fā)送到workerHolder的第1個Channel去,key-2就發(fā)到第2個Channel去
*/
private static final List
④ 注冊EventBus事件訂閱者
private void registEventBus() {
//netty連接器會關注WorkerInfoChangeEvent事件
EventBusCenter.register(new WorkerChangeSubscriber());
//熱key探測回調(diào)關注熱key事件
EventBusCenter.register(new ReceiveNewKeySubscribe());
//Rule的變化的事件
EventBusCenter.register(new KeyRuleHolder());
}
使用guava的EventBus事件消息總線,利用發(fā)布/訂閱者模式來對項目進行解耦。它可以利用很少的代碼,來實現(xiàn)多組件間通信。 基本原理圖如下:
監(jiān)聽worker信息變動:WorkerChangeSubscriber
/**
* 監(jiān)聽worker信息變動
*/
@Subscribe
public void connectAll(WorkerInfoChangeEvent event) {
List
if (addresses == null) {
addresses = new ArrayList<>();
}
WorkerInfoHolder.mergeAndConnectNew(addresses);
}
/**
* 當client與worker的連接斷開后,刪除
*/
@Subscribe
public void channelInactive(ChannelInactiveEvent inactiveEvent) {
//獲取斷線的channel
Channel channel = inactiveEvent.getChannel();
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String address = socketAddress.getHostName() + ":" + socketAddress.getPort();
JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");
WorkerInfoHolder.dealChannelInactive(address);
}
監(jiān)聽熱key回調(diào)事件:ReceiveNewKeySubscribe
private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();
@Subscribe
public void newKeyComing(ReceiveNewKeyEvent event) {
HotKeyModel hotKeyModel = event.getModel();
if (hotKeyModel == null) {
return;
}
//收到新key推送
if (receiveNewKeyListener != null) {
receiveNewKeyListener.newKey(hotKeyModel);
}
}
該方法會收到新的熱key訂閱事件之后,會將其加入到KeyHandlerFactory的收集器里面處理。
核心處理邏輯
@Override
public void newKey(HotKeyModel hotKeyModel) {
long now = System.currentTimeMillis();
//如果key到達時已經(jīng)過去1秒了,記錄一下。手工刪除key時,沒有CreateTime
if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {
JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +
+now + " keyCreateAt " + hotKeyModel.getCreateTime());
}
if (hotKeyModel.isRemove()) {
//如果是刪除事件,就直接刪除
deleteKey(hotKeyModel.getKey());
return;
}
//已經(jīng)是熱key了,又推過來同樣的熱key,做個日志記錄,并刷新一下
if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {
JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);
}
addKey(hotKeyModel.getKey());
}
private void deleteKey(String key) {
CacheFactory.getNonNullCache(key).delete(key);
}
private void addKey(String key) {
ValueModel valueModel = ValueModel.defaultValue(key);
if (valueModel == null) {
//不符合任何規(guī)則
deleteKey(key);
return;
}
//如果原來該key已經(jīng)存在了,那么value就被重置,過期時間也會被重置。如果原來不存在,就新增的熱key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
如果該HotKeyModel里面是刪除事件,則獲取RULE_CACHE_MAP里面該key超時時間對應的caffeine,然后從中刪除該key緩存,然后返回(這里相當于刪除了本地緩存)。
如果不是刪除事件,則在RULE_CACHE_MAP對應的caffeine緩存中添加該key的緩存。
這里有個注意點,如果不為刪除事件,調(diào)用addKey()方法在caffeine增加緩存的時候,value是一個魔術值0x12fcf76,這個值只代表加了這個緩存,但是這個緩存在查詢的時候相當于為null。
監(jiān)聽Rule的變化事件:KeyRuleHolder
可以看到里面有兩個成員屬性:RULE_CACHE_MAP,KEY_RULES
/**
* 保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]
*/
private static final ConcurrentHashMap
/**
* 這里KEY_RULES是保存etcd里面該appName所對應的所有rule
*/
private static final List
ConcurrentHashMapRULE_CACHE_MAP:
保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]。
巧妙的設計:這里將key的過期時間作為分桶策略,這樣同一個過期時間的key就會在一個桶(caffeine)里面,這里面每一個caffeine都是client的本地緩存,也就是說hotKey的本地緩存的KV實際上是存儲在這里面的。
ListKEY_RULES:
這里KEY_RULES是保存etcd里面該appName所對應的所有rule。
具體監(jiān)聽KeyRuleInfoChangeEvent事件方法:
@Subscribe
public void ruleChange(KeyRuleInfoChangeEvent event) {
JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());
List
if (ruleList == null) {
return;
}
putRules(ruleList);
}
核心處理邏輯
/**
* 所有的規(guī)則,如果規(guī)則的超時時間變化了,會重建caffeine
*/
public static void putRules(List
synchronized (KEY_RULES) {
//如果規(guī)則為空,清空規(guī)則表
if (CollectionUtil.isEmpty(keyRules)) {
KEY_RULES.clear();
RULE_CACHE_MAP.clear();
return;
}
KEY_RULES.clear();
KEY_RULES.addAll(keyRules);
Set
for (Integer duration : RULE_CACHE_MAP.keySet()) {
//先清除掉那些在RULE_CACHE_MAP里存的,但是rule里已沒有的
if (!durationSet.contains(duration)) {
RULE_CACHE_MAP.remove(duration);
}
}
//遍歷所有的規(guī)則
for (KeyRule keyRule : keyRules) {
int duration = keyRule.getDuration();
//這里如果RULE_CACHE_MAP里面沒有超時時間為duration的value,則新建一個放入到RULE_CACHE_MAP里面
//比如RULE_CACHE_MAP本來就是空的,則在這里來構(gòu)建RULE_CACHE_MAP的映射關系
//TODO 如果keyRules里面包含相同duration的keyRule,則也只會建一個key為duration,value為caffeine,其中caffeine是(string,object)
if (RULE_CACHE_MAP.get(duration) == null) {
LocalCache cache = CacheFactory.build(duration);
RULE_CACHE_MAP.put(duration, cache);
}
}
}
}
使用synchronized關鍵字來保證線程安全;
如果規(guī)則為空,清空規(guī)則表(RULE_CACHE_MAP、KEY_RULES);
使用傳遞進來的keyRules來覆蓋KEY_RULES;
清除掉RULE_CACHE_MAP里面在keyRules沒有的映射關系;
遍歷所有的keyRules,如果RULE_CACHE_MAP里面沒有相關的超時時間key,則在里面賦值;
⑤ 啟動EtcdStarter(etcd連接管理器)
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監(jiān)聽都開啟
starter.start();
public void start() {
fetchWorkerInfo();
fetchRule();
startWatchRule();
//監(jiān)聽熱key事件,只監(jiān)聽手工添加、刪除的key
startWatchHotKey();
}
fetchWorkerInfo() 從etcd里面拉取worker集群地址信息allAddress,并更新WorkerInfoHolder里面的WORKER_HOLDER
/**
* 每隔30秒拉取worker信息
*/
private void fetchWorkerInfo() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");
fetch();
}, 0, 30, TimeUnit.SECONDS);
}
使用定時線程池來執(zhí)行,單線程。
定時從etcd里面獲取,地址/jd/workers/+$appName或default,時間間隔不可設置,默認30秒,這里面存儲的是worker地址的ip+port。
發(fā)布WorkerInfoChangeEvent事件。
備注:地址有$appName或default,在worker里面配置,如果把worker放到某個appName下,則該worker只會參與該app的計算。
fetchRule() 定時線程來執(zhí)行,單線程,時間間隔不可設置,默認是5秒,當拉取規(guī)則配置和手動配置的hotKey成功后,該線程被終止(也就是說只會成功執(zhí)行一次),執(zhí)行失敗繼續(xù)執(zhí)行
private void fetchRule() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");
boolean success = fetchRuleFromEtcd();
if (success) {
//拉取已存在的熱key
fetchExistHotKey();
//這里如果拉取規(guī)則和拉取手動配置的hotKey成功之后,則該定時執(zhí)行線程停止
scheduledExecutorService.shutdown();
}
}, 0, 5, TimeUnit.SECONDS);
}
fetchRuleFromEtcd()
從etcd里面獲取該appName配置的rule規(guī)則,地址/jd/rules/+$appName。
如果查出來規(guī)則rules為空,會通過發(fā)布KeyRuleInfoChangeEvent事件來清空本地的rule配置緩存和所有的規(guī)則key緩存。
發(fā)布KeyRuleInfoChangeEvent事件。
fetchExistHotKey()
從etcd里面獲取該appName手動配置的熱key,地址/jd/hotkeys/+$appName。
發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel不是刪除事件。
startWatchRule()
/**
* 異步監(jiān)聽rule規(guī)則變化
*/
private void startWatchRule() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch rule change ----");
try {
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);
//如果有新事件,即rule的變更,就重新拉取所有的信息
while (watchIterator.hasNext()) {
//這句必須寫,next會讓他卡住,除非真的有新rule變更
WatchUpdate watchUpdate = watchIterator.next();
List
JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);
//全量拉取rule信息
fetchRuleFromEtcd();
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
異步監(jiān)聽rule規(guī)則變化,使用etcd監(jiān)聽地址為/jd/rules/+$appName的節(jié)點變化。
使用線程池,單線程,異步監(jiān)聽rule規(guī)則變化,如果有事件變化,則調(diào)用fetchRuleFromEtcd()方法。
startWatchHotKey() 異步開始監(jiān)聽熱key變化信息,使用etcd監(jiān)聽地址前綴為/jd/hotkeys/+$appName
/**
* 異步開始監(jiān)聽熱key變化信息,該目錄里只有手工添加的key信息
*/
private void startWatchHotKey() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch hotKey change ----");
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
try {
KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);
//如果有新事件,即新key產(chǎn)生或刪除
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
Event.EventType eventType = eventList.get(0).getType();
try {
//從這個地方可以看出,etcd給的返回是節(jié)點的全路徑,而我們需要的key要去掉前綴
String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");
//如果是刪除key,就立刻刪除
if (Event.EventType.DELETE == eventType) {
HotKeyModel model = new HotKeyModel();
model.setRemove(true);
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
} else {
HotKeyModel model = new HotKeyModel();
model.setRemove(false);
String value = keyValue.getValue().toStringUtf8();
//新增熱key
JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);
//如果這是一個刪除指令,就什么也不干
//TODO 這里有個疑問,監(jiān)聽到worker自動探測發(fā)出的惰性刪除指令,這里之間跳過了,但是本地緩存沒有更新吧?
//TODO 所以我猜測在客戶端使用判斷緩存是否存在的api里面,應該會判斷相關緩存的value值是否為"#[DELETE]#"刪除標記
//解疑:這里確實只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {
continue;
}
//手工創(chuàng)建的value是時間戳
model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
} catch (Exception e) {
JdLogger.error(getClass(), "new key err :" + keyValue);
}
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
使用線程池,單線程,異步監(jiān)聽熱key變化
使用etcd監(jiān)聽前綴地址的當前節(jié)點以及子節(jié)點的所有變化值
刪除節(jié)點動作
發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel是刪除事件
新增or更新節(jié)點動作
事件變化的value值為刪除標記#[DELETE]#
如果是刪除標記的話,代表是worker自動探測或者client需要刪除的指令。
如果是刪除標記則什么也不做,直接跳過(這里從HotKeyPusher#push方法可以看到,做刪除事件的操作時候,他會給/jd/hotkeys/+$appName的節(jié)點里面增加一個值為刪除標記的節(jié)點,然后再刪除相同路徑的節(jié)點,這樣就可以觸發(fā)上面的刪除節(jié)點事件,所以這里判斷如果是刪除標記直接跳過)。
不為刪除標記
發(fā)布ReceiveNewKeyEvent事件,事件內(nèi)容HotKeyModel里面的createTime是kv對應的時間戳
疑問: 這里代碼注釋里面說只監(jiān)聽手工添加或者刪除的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎? 解疑: 這里確實只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
5.API解析
1)流程圖示 ① 查詢流程
② 刪除流程:
從上面的流程圖中,大家應該知道該熱點key在代碼中是如何扭轉(zhuǎn)的,這里再給大家講解一下核心API的源碼解析,限于篇幅的原因,咱們不一個個貼相關源碼了,只是單純的告訴你它的內(nèi)部邏輯是怎么樣的。 2)核心類:JdHotKeyStore
JdHotKeyStore是封裝client調(diào)用的api核心類,包含上面10個公共方法,咱們重點解析其中6個重要方法: ① isHotKey(String key) 判斷是否在規(guī)則內(nèi),如果不在返回false 判斷是否是熱key,如果不是或者是且過期時間在2s內(nèi),則給TurnKeyCollector#collect收集 最后給TurnCountCollector#collect做統(tǒng)計收集 ② get(String key) 從本地caffeine取值 如果取到的value是個魔術值,只代表加入到caffeine緩存里面了,查詢的話為null ③ smartSet(String key, Object value) 判斷是否是熱key,這里不管它在不在規(guī)則內(nèi),如果是熱key,則給value賦值,如果不為熱key什么也不做 ④ forceSet(String key, Object value) 強制給value賦值 如果該key不在規(guī)則配置內(nèi),則傳遞的value不生效,本地緩存的賦值value會被變?yōu)閚ull ⑤ getValue(String key, KeyType keyType) 獲取value,如果value不存在則調(diào)用HotKeyPusher#push方法發(fā)往netty 如果沒有為該key配置規(guī)則,就不用上報key,直接返回null 如果取到的value是個魔術值,只代表加入到caffeine緩存里面了,查詢的話為null ⑥ remove(String key) 刪除某key(本地的caffeine緩存),會通知整個集群刪除(通過etcd來通知集群刪除) 3)client上傳熱key入口調(diào)用類:HotKeyPusher 核心方法:
public static void push(String key, KeyType keyType, int count, boolean remove) {
if (count <= 0) {
count = 1;
}
if (keyType == null) {
keyType = KeyType.REDIS_KEY;
}
if (key == null) {
return;
}
//這里之所以用LongAdder是為了保證多線程計數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個map里面,
//存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數(shù)屬性時,會存在線程安全計數(shù)不準確問題
LongAdder adderCnt = new LongAdder();
adderCnt.add(count);
HotKeyModel hotKeyModel = new HotKeyModel();
hotKeyModel.setAppName(Context.APP_NAME);
hotKeyModel.setKeyType(keyType);
hotKeyModel.setCount(adderCnt);
hotKeyModel.setRemove(remove);
hotKeyModel.setKey(key);
if (remove) {
//如果是刪除key,就直接發(fā)到etcd去,不用做聚合。但是有點問題現(xiàn)在,這個刪除只能刪手工添加的key,不能刪worker探測出來的
//因為各個client都在監(jiān)聽手工添加的那個path,沒監(jiān)聽自動探測的path。所以如果手工的那個path下,沒有該key,那么是刪除不了的。
//刪不了,就達不到集群監(jiān)聽刪除事件的效果,怎么辦呢?可以通過新增的方式,新增一個熱key,然后刪除它
//TODO 這里為啥不直接刪除該節(jié)點,難道worker自動探測處理的hotKey不會往該節(jié)點增加新增事件嗎?
//釋疑:worker根據(jù)探測配置的規(guī)則,當判斷出某個key為hotKey后,確實不會往keyPath里面加入節(jié)點,他只是單純的往本地緩存里面加入一個空值,代表是熱點key
EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這里很巧妙待補充描述
//也刪worker探測的目錄
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));
} else {
//如果key是規(guī)則內(nèi)的要被探測的key,就積累等待傳送
if (KeyRuleHolder.isKeyInRule(key)) {
//積攢起來,等待每半秒發(fā)送一次
KeyHandlerFactory.getCollector().collect(hotKeyModel);
}
}
}
從上面的源碼中可知:
這里之所以用LongAdder是為了保證多線程計數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個map里面,存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數(shù)屬性時,會存在線程安全計數(shù)不準確問題。
如果是remove刪除類型,在刪除手動配置的熱key配置路徑的同時,還會刪除dashboard展示熱key的配置路徑。
只有在規(guī)則配置的key,才會被積攢探測發(fā)送到worker內(nèi)進行計算。
6.通訊機制(與worker交互)
1)NettyClient:netty連接器
public class NettyClient {
private static final NettyClient nettyClient = new NettyClient();
private Bootstrap bootstrap;
public static NettyClient getInstance() {
return nettyClient;
}
private NettyClient() {
if (bootstrap == null) {
bootstrap = initBootstrap();
}
}
private Bootstrap initBootstrap() {
//少線程
EventLoopGroup group = new NioEventLoopGroup(2);
Bootstrap bootstrap = new Bootstrap();
NettyClientHandler nettyClientHandler = new NettyClientHandler();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) {
ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這里就是定義TCP多個包之間的分隔符,為了更好的做拆包
.addLast(new MsgDecoder())
.addLast(new MsgEncoder())
//30秒沒消息時,就發(fā)心跳包過去
.addLast(new IdleStateHandler(0, 0, 30))
.addLast(nettyClientHandler);
}
});
return bootstrap;
}
}
使用Reactor線程模型,只有2個工作線程,沒有單獨設置主線程
長連接,開啟TCP_NODELAY
netty的分隔符”$()$”,類似TCP報文分段的標準,方便拆包
Protobuf序列化與反序列化
30s沒有消息發(fā)給對端的時候,發(fā)送一個心跳包判活
工作線程處理器NettyClientHandler
JDhotkey的tcp協(xié)議設計就是收發(fā)字符串,每個tcp消息包使用特殊字符$()$來分割 優(yōu)點:這樣實現(xiàn)非常簡單。 獲得消息包后進行json或者protobuf反序列化。 缺點:是需要,從字節(jié)流-》反序列化成字符串-》反序列化成消息對象,兩層序列化損耗了一部分性能。 protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會損耗一部分性能。 2)NettyClientHandler:工作線程處理器
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
//這里表示如果讀寫都掛了
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
//向服務端發(fā)送消息
ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));
}
}
super.userEventTriggered(ctx, evt);
}
//在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用
//類似TCP三次握手成功之后觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) {
JdLogger.info(getClass(), "channelActive:" + ctx.name());
ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));
}
//類似TCP四次揮手之后,等待2MSL時間之后觸發(fā)(大概180s),比如channel通道關閉會觸發(fā)(channel.close())
//客戶端channel主動關閉連接時,會向服務端發(fā)送一個寫請求,然后服務端channel所在的selector會監(jiān)聽到一個OP_READ事件,然后
//執(zhí)行數(shù)據(jù)讀取操作,而讀取時發(fā)現(xiàn)客戶端channel已經(jīng)關閉了,則讀取數(shù)據(jù)字節(jié)個數(shù)返回-1,然后執(zhí)行close操作,關閉該channel對應的底層socket,
//并在pipeline中,從head開始,往下將InboundHandler,并觸發(fā)handler的channelInactive和channelUnregistered方法的執(zhí)行,以及移除pipeline中的handlers一系列操作。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
//斷線了,可能只是client和server斷了,但都和etcd沒斷。也可能是client自己斷網(wǎng)了,也可能是server斷了
//發(fā)布斷線事件。后續(xù)10秒后進行重連,根據(jù)etcd里的worker信息來決定是否重連,如果etcd里沒了,就不重連。如果etcd里有,就重連
notifyWorkerChange(ctx.channel());
}
private void notifyWorkerChange(Channel channel) {
EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {
if (MessageType.PONG == msg.getMessageType()) {
JdLogger.info(getClass(), "heart beat");
return;
}
if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {
JdLogger.info(getClass(), "receive new key : " + msg);
if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {
return;
}
for (HotKeyModel model : msg.getHotKeyModels()) {
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
}
}
}
userEventTriggered
收到對端發(fā)來的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)
channelActive
在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用
類似TCP三次握手成功之后觸發(fā),給對端發(fā)送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)
channelInactive
類似TCP四次揮手之后,等待2MSL時間之后觸發(fā)(大概180s),比如channel通道關閉會觸發(fā)(channel.close())該方法,發(fā)布ChannelInactiveEvent事件,來10s后重連
channelRead0
接收PONG消息類型時,打個日志返回
接收RESPONSE_NEW_KEY消息類型時,發(fā)布ReceiveNewKeyEvent事件
3.3.3 worker端
1.入口啟動加載:7個@PostConstruct
1)worker端對etcd相關的處理:EtcdStarter ① 第一個@PostConstruct:watchLog()
@PostConstruct
public void watchLog() {
AsyncPool.asyncDo(() -> {
try {
//取etcd的是否開啟日志配置,地址/jd/logOn
String loggerOn = configCenter.get(ConfigConstant.logToggle);
LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
//監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
logger.info("log toggle changed : " + keyValue);
String value = keyValue.getValue().toStringUtf8();
LOGGER_ON = "true".equals(value) || "1".equals(value);
}
});
}
放到線程池里面異步執(zhí)行
取etcd的是否開啟日志配置,地址/jd/logOn,默認true
監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束
② 第二個@PostConstruct:watch()
/**
* 啟動回調(diào)監(jiān)聽器,監(jiān)聽rule變化
*/
@PostConstruct
public void watch() {
AsyncPool.asyncDo(() -> {
KvClient.WatchIterator watchIterator;
if (isForSingle()) {
watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);
} else {
watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);
}
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
logger.info("rule changed : " + keyValue);
try {
ruleChange(keyValue);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* rule發(fā)生變化時,更新緩存的rule
*/
private synchronized void ruleChange(KeyValue keyValue) {
String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");
if (StrUtil.isEmpty(appName)) {
return;
}
String ruleJson = keyValue.getValue().toStringUtf8();
List
KeyRuleHolder.put(appName, keyRules);
}
通過etcd.workerPath配置,來判斷該worker是否為某個app單獨服務的,默認為”default”,如果是默認值,代表該worker參與在etcd上所有app client的計算,否則只為某個app來服務計算 使用etcd來監(jiān)聽rule規(guī)則變化,如果是共享的worker,監(jiān)聽地址前綴為”/jd/rules/“,如果為某個app獨享,監(jiān)聽地址為”/jd/rules/“+$etcd.workerPath 如果規(guī)則變化,則修改對應app在本地存儲的rule緩存,同時清理該app在本地存儲的KV緩存
KeyRuleHolder:rule緩存本地存儲
Map,>
相對于client的KeyRuleHolder的區(qū)別:worker是存儲所有app規(guī)則,每個app對應一個規(guī)則桶,所以用map
CaffeineCacheHolder:key緩存本地存儲
Map,>
相對于client的caffeine,第一是worker沒有做緩存接口比如LocalCache,第二是client的map的kv分別是超時時間、以及相同超時時間所對應key的緩存桶
放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束
③ 第三個@PostConstruct:watchWhiteList()
/**
* 啟動回調(diào)監(jiān)聽器,監(jiān)聽白名單變化,只監(jiān)聽自己所在的app,白名單key不參與熱key計算,直接忽略
*/
@PostConstruct
public void watchWhiteList() {
AsyncPool.asyncDo(() -> {
//從etcd配置中獲取所有白名單
fetchWhite();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
logger.info("whiteList changed ");
try {
fetchWhite();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
拉取并監(jiān)聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath
在白名單的key,不參與熱key計算,直接忽略
放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束 ④ 第四個@PostConstruct:makeSureSelfOn()
/**
* 每隔一會去check一下,自己還在不在etcd里
*/
@PostConstruct
public void makeSureSelfOn() {
//開啟上傳worker信息
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
if (canUpload) {
uploadSelfInfo();
}
} catch (Exception e) {
//do nothing
}
}, 0, 5, TimeUnit.SECONDS);
}
在線程池里面異步執(zhí)行,定時執(zhí)行,時間間隔為5s
將本機woker的hostName,ip+port以kv的形式定時上報給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續(xù)期時間為8s
有一個canUpload的開關來控制worker是否向etcd來定時續(xù)期,如果這個開關關閉了,代表worker不向etcd來續(xù)期,這樣當上面地址的kv到期之后,etcd會刪除該節(jié)點,這樣client循環(huán)判斷worker信息變化了
2)將熱key推送到dashboard供入庫:DashboardPusher ① 第五個@PostConstruct:uploadToDashboard()
@Component
public class DashboardPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue
@PostConstruct
public void uploadToDashboard() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
//要么key達到1千個,要么達到1秒,就匯總上報給etcd一次
List
Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
//將熱key推到dashboard
DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
當熱key的數(shù)量達到1000或者每隔1s,把熱key的數(shù)據(jù)通過與dashboard的netty通道來發(fā)送給dashboard,數(shù)據(jù)類型為REQUEST_HOT_KEY
LinkedBlockingQueue
hotKeyStoreQueue:worker計算的給dashboard熱key的集中營,所有給dashboard推送熱key存儲在里面 3)推送到各客戶端服務器:AppServerPusher ① 第六個@PostConstruct:batchPushToClient()
public class AppServerPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue
/**
* 和dashboard那邊的推送主要區(qū)別在于,給app推送每10ms一次,dashboard那邊1s一次
*/
@PostConstruct
public void batchPushToClient() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
List
//每10ms推送一次
Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
Map
//拆分出每個app的熱key集合,按app分堆
for (HotKeyModel hotKeyModel : tempModels) {
List
oneAppModels.add(hotKeyModel);
}
//遍歷所有app,進行推送
for (AppInfo appInfo : ClientInfoHolder.apps) {
List
if (CollectionUtil.isEmpty(list)) {
continue;
}
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);
hotKeyMsg.setHotKeyModels(list);
//整個app全部發(fā)送
appInfo.groupPush(hotKeyMsg);
}
//推送完,及時清理不使用內(nèi)存
allAppHotKeyModels = null;
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
會按照key的appName來進行分組,然后通過對應app的channelGroup來推送
當熱key的數(shù)量達到10或者每隔10ms,把熱key的數(shù)據(jù)通過與app的netty通道來發(fā)送給app,數(shù)據(jù)類型為RESPONSE_NEW_KEY
LinkedBlockingQueue
hotKeyStoreQueue:worker計算的給client熱key的集中營,所有給client推送熱key存儲在里面 4)client實例節(jié)點處理:NodesServerStarter ① 第七個@PostConstruct:start()
public class NodesServerStarter {
@Value("${netty.port}")
private int port;
private Logger logger = LoggerFactory.getLogger(getClass());
@Resource
private IClientChangeListener iClientChangeListener;
@Resource
private List
@PostConstruct
public void start() {
AsyncPool.asyncDo(() -> {
logger.info("netty server is starting");
NodesServer nodesServer = new NodesServer();
nodesServer.setClientChangeListener(iClientChangeListener);
nodesServer.setMessageFilters(messageFilters);
try {
nodesServer.startNettyServer(port);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
線程池里面異步執(zhí)行,啟動client端的nettyServer
iClientChangeListener和messageFilters這兩個依賴最終會被傳遞到netty消息處理器里面,iClientChangeListener會作為channel下線處理來刪除ClientInfoHolder下線或者超時的通道,messageFilters會作為netty收到事件消息的處理過濾器(責任鏈模式) ② 依賴的bean:IClientChangeListener iClientChangeListener
public interface IClientChangeListener {
/**
* 發(fā)現(xiàn)新連接
*/
void newClient(String appName, String channelId, ChannelHandlerContext ctx);
/**
* 客戶端掉線
*/
void loseClient(ChannelHandlerContext ctx);
}
對客戶端的管理,新來(newClient)(會觸發(fā)netty的連接方法channelActive)、斷線(loseClient)(會觸發(fā)netty的斷連方法channelInactive())的管理 client的連接信息主要是在ClientInfoHolder里面
List
apps,這里面的AppInfo主要是appName和對應的channelGroup
對apps的add和remove主要是通過新來(newClient)、斷線(loseClient) ③ 依賴的bean:List
messageFilters
/**
* 對netty來的消息,進行過濾處理
* @author wuweifeng wrote on 2019-12-11
* @version 1.0
*/
public interface INettyMsgFilter {
boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);
}
對client發(fā)給worker的netty消息,進行過濾處理,共有四個實現(xiàn)類,也就是說底下四個過濾器都是收到client發(fā)送的netty消息來做處理 ④ 各個消息處理的類型:MessageType
APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), //命中率
REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);
順序1:HeartBeatFilter
當消息類型為PING,則給對應的client示例返回PONG
順序2:AppNameFilter
當消息類型為APP_NAME,代表client與worker建立連接成功,然后調(diào)用iClientChangeListener的newClient方法增加apps元數(shù)據(jù)信息
順序3:HotKeyFilter
處理接收消息類型為REQUEST_NEW_KEY
先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker實例接收到的key的總數(shù)
publishMsg方法,將消息通過自建的生產(chǎn)者消費者模型(KeyProducer,KeyConsumer),來把消息給發(fā)到生產(chǎn)者中分發(fā)消費
接收到的消息HotKeyMsg里面List
首先判斷HotKeyModel里面的key是否在白名單內(nèi),如果在則跳過,否則將HotKeyModel通過KeyProducer發(fā)送
順序4:KeyCounterFilter
處理接收類型為REQUEST_HIT_COUNT
這個過濾器是專門給dashboard來匯算key的,所以這個appName直接設置為該worker配置的appName
該過濾器的數(shù)據(jù)來源都是client的NettyKeyPusher#sendCount(String appName, List
list),這里面的數(shù)據(jù)都是默認積攢10s的,這個10s是可以配置的,這一點在client里面有講
將構(gòu)造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞隊列LinkedBlockingQueue
COUNTER_QUEUE中,然后讓CounterConsumer來消費處理,消費邏輯是單線程的
CounterConsumer:熱key統(tǒng)計消費者
放在公共線程池中,來單線程執(zhí)行
從阻塞隊列COUNTER_QUEUE里面取數(shù)據(jù),然后將里面的key的統(tǒng)計數(shù)據(jù)發(fā)布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()里面,該路徑是worker服務的client集群或者default,用來存放客戶端hotKey訪問次數(shù)和總訪問次數(shù)的path,然后讓dashboard來訂閱統(tǒng)計展示
2.三個定時任務:3個@Scheduled
1)定時任務1:EtcdStarter#pullRules()
/**
* 每隔1分鐘拉取一次,所有的app的rule
*/
@Scheduled(fixedRate = 60000)
public void pullRules() {
try {
if (isForSingle()) {
String value = configCenter.get(ConfigConstant.rulePath + workerPath);
if (!StrUtil.isEmpty(value)) {
List
KeyRuleHolder.put(workerPath, keyRules);
}
} else {
List
for (KeyValue keyValue : keyValues) {
ruleChange(keyValue);
}
}
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
}
每隔1分鐘拉取一次etcd地址為/jd/rules/的規(guī)則變化,如果worker所服務的app或者default的rule有變化,則更新規(guī)則的緩存,并清空該appName所對應的本地key緩存 2)定時任務2:EtcdStarter#uploadClientCount()
/**
* 每隔10秒上傳一下client的數(shù)量到etcd中
*/
@Scheduled(fixedRate = 10000)
public void uploadClientCount() {
try {
String ip = IpUtils.getIp();
for (AppInfo appInfo : ClientInfoHolder.apps) {
String appName = appInfo.getAppName();
int count = appInfo.size();
//即便是full gc也不能超過3秒,因為這里給的過期時間是13s,由于該定時任務每隔10s執(zhí)行一次,如果full gc或者說上報給etcd的時間超過3s,
//則在dashboard查詢不到client的數(shù)量
configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);
}
configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);
//上報每秒QPS(接收key數(shù)量、處理key數(shù)量)
String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));
configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);
logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);
//如果是穩(wěn)定一直有key發(fā)送的應用,建議開啟該監(jiān)控,以避免可能發(fā)生的網(wǎng)絡故障
if (openMonitor) {
checkReceiveKeyCount();
}
// configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);
} catch (Exception ex) {
logger.error(ETCD_DOWN);
}
}
每個10s將worker計算存儲的client信息上報給etcd,來方便dashboard來查詢展示,比如/jd/count/對應client數(shù)量,/jd/caffeineSize/對應caffeine緩存的大小,/jd/totalKeyCount/對應該worker接收的key總量和處理的key總量
可以從代碼中看到,上面所有etcd的節(jié)點租期時間都是13s,而該定時任務是每10s執(zhí)行一次,意味著如果full gc或者說上報給etcd的時間超過3s,則在dashboard查詢不到client的相關匯算信息
長時間不收到key,判斷網(wǎng)絡狀態(tài)不好,斷開worker給etcd地址為/jd/workers/+$workerPath節(jié)點的續(xù)租,因為client會循環(huán)判斷該地址的節(jié)點是否變化,使得client重新連接worker或者斷開失聯(lián)的worker 3)定時任務3:EtcdStarter#fetchDashboardIp()
/**
* 每隔30秒去獲取一下dashboard的地址
*/
@Scheduled(fixedRate = 30000)
public void fetchDashboardIp() {
try {
//獲取DashboardIp
List
//是空,給個警告
if (CollectionUtil.isEmpty(keyValues)) {
logger.warn("very important warn !!! Dashboard ip is null!!!");
return;
}
String dashboardIp = keyValues.get(0).getValue().toStringUtf8();
NettyClient.getInstance().connect(dashboardIp);
} catch (Exception e) {
e.printStackTrace();
}
}
每隔30s拉取一次etcd前綴為/jd/dashboard/的dashboard連接ip的值,并且判斷DashboardHolder.hasConnected里面是否為未連接狀態(tài),如果是則重新連接worker與dashboard的netty通道
3.自建的生產(chǎn)者消費者模型(KeyProducer,KeyConsumer)
一般生產(chǎn)者消費者模型包含三大元素:生產(chǎn)者、消費者、消息存儲隊列 這里消息存儲隊列是DispatcherConfig里面的QUEUE,使用LinkedBlockingQueue,默認大小為200W 1)KeyProducer
@Component
public class KeyProducer {
public void push(HotKeyModel model, long now) {
if (model == null || model.getKey() == null) {
return;
}
//5秒前的過時消息就不處理了
if (now - model.getCreateTime() > InitConstant.timeOut) {
expireTotalCount.increment();
return;
}
try {
QUEUE.put(model);
totalOfferCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時間,如果是將expireTotalCount紀錄過期總數(shù)給自增,然后返回 2)KeyConsumer
public class KeyConsumer {
private IKeyListener iKeyListener;
public void setKeyListener(IKeyListener iKeyListener) {
this.iKeyListener = iKeyListener;
}
public void beginConsume() {
while (true) {
try {
//從這里可以看出,這里的生產(chǎn)者消費者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖
HotKeyModel model = QUEUE.take();
if (model.isRemove()) {
iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
} else {
iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
}
//處理完畢,將數(shù)量加1
totalDealCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key,appName+keyType+key
String key = buildKey(hotKeyModel);
hotCache.invalidate(key);
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//推送所有client刪除
hotKeyModel.setCreateTime(SystemClock.now());
logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());
for (IPusher pusher : iPushers) {
//這里可以看到,刪除熱key的netty消息只給client端發(fā)了過去,沒有給dashboard發(fā)過去(DashboardPusher里面的remove是個空方法)
pusher.remove(hotKeyModel);
}
}
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key
String key = buildKey(hotKeyModel);
//判斷是不是剛熱不久
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。
//畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷
Object o = hotCache.getIfPresent(key);
if (o != null) {
return;
}
//********** watch here ************//
//該方法會被InitConstant.threadCount個線程同時調(diào)用,存在多線程問題
//下面的那句addCount是加了鎖的,代表給Key累加數(shù)量時是原子性的,不會發(fā)生多加、少加的情況,到了設定的閾值一定會hot
//譬如閾值是2,如果多個線程累加,在沒hot前,hot的狀態(tài)肯定是對的,譬如thread1 加1,thread2加1,那么thread2會hot返回true,開啟推送
//但是極端情況下,譬如閾值是10,當前是9,thread1走到這里時,加1,返回true,thread2也走到這里,加1,此時是11,返回true,問題來了
//該key會走下面的else兩次,也就是2次推送。
//所以出現(xiàn)問題的原因是hotCache.getIfPresent(key)這一句在并發(fā)情況下,沒return掉,放了兩個key+1到addCount這一步時,會有問題
//測試代碼在TestBlockQueue類,直接運行可以看到會同時hot
//那么該問題用解決嗎,NO,不需要解決,1 首先要發(fā)生的條件極其苛刻,很難觸發(fā),以京東這樣高的并發(fā)量,線上我也沒見過觸發(fā)連續(xù)2次推送同一個key的
//2 即便觸發(fā)了,后果也是可以接受的,2次推送而已,毫無影響,客戶端無感知。但是如果非要解決,就要對slidingWindow實例加鎖了,必然有一些開銷
//所以只要保證key數(shù)量不多計算就可以,少計算了沒事。因為熱key必然頻率高,漏計幾次沒事。但非熱key,多計算了,被干成了熱key就不對了
SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這里可知,每個app的每個key都會對應一個滑動窗口
//看看hot沒
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
if (!hot) {
//如果沒hot,重新put,cache會自動刷新過期時間
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);
} else {
//這里之所以放入的value為1,是因為hotCache是用來專門存儲剛生成的hotKey
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。
//畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷
hotCache.put(key, 1);
//刪掉該key
//這個key從實際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//開啟推送
hotKeyModel.setCreateTime(SystemClock.now());
//當開關打開時,打印日志。大促時關閉日志,就不打印了
if (EtcdStarter.LOGGER_ON) {
logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());
}
//分別推送到各client和etcd
for (IPusher pusher : iPushers) {
pusher.push(hotKeyModel);
}
}
}
“thread.count”配置即為消費者個數(shù),多個消費者共同消費一個QUEUE隊列 生產(chǎn)者消費者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖 根據(jù)HotKeyModel里面是否是刪除消息類型
刪除CaffeineCacheHolder里面對應newkey的滑動窗口緩存。
向該hotKeyModel對應的app的client推送netty消息,表示新產(chǎn)生hotKey,使得client本地緩存,但是推送的netty消息只代表為熱key,client本地緩存不會存儲key對應的value值,需要調(diào)用JdHotKeyStore里面的api來給本地緩存的value賦值
向dashboard推送hotKeyModel,表示新產(chǎn)生hotKey
刪除消息類型
根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
刪除hotCache里面newkey的緩存,放入的緩存kv分別是newKey和1,hotCache作用是用來存儲該生成的熱key,hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷
刪除CaffeineCacheHolder里面對應appName的caffeine里面的newKey,這里面存儲的是slidingWindow滑動窗口
推送給該HotKeyModel對應的所有client實例,用來讓client刪除該HotKeyModel
非刪除消息類型
根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
通過hotCache來判斷該newkey是否剛熱不久,如果是則返回
根據(jù)滑動時間窗口來計算判斷該key是否為hotKey(這里可以學習一下滑動時間窗口的設計),并返回或者生成該newKey對應的滑動窗口
如果沒有達到熱key的標準
通過CaffeineCacheHolder重新put,cache會自動刷新過期時間
如果達到了熱key標準
向hotCache里面增加newkey對應的緩存,value為1表示剛為熱key。
3)計算熱key滑動窗口的設計 限于篇幅的原因,這里就不細談了,直接貼出項目作者對其寫的說明文章:Java簡單實現(xiàn)滑動窗口
3.3.4 dashboard端
這個沒啥可說的了,就是連接etcd、mysql,增刪改查,不過京東的前端框架很方便,直接返回list就可以成列表。
4 總結(jié)
文章第二部分為大家講解了redis數(shù)據(jù)傾斜的原因以及應對方案,并對熱點問題進行了深入,從發(fā)現(xiàn)熱key到解決熱key的兩個關鍵問題的總結(jié)。 文章第三部分是熱key問題解決方案——JD開源hotkey的源碼解析,分別從client端、worker端、dashboard端來進行全方位講解,包括其設計、使用及相關原理。 希望通過這篇文章,能夠使大家不僅學習到相關方法論,也能明白其方法論具體的落地方案,一起學習,一起成長。
審核編輯:湯梓紅
-
開源
+關注
關注
3文章
3252瀏覽量
42407 -
Redis
+關注
關注
0文章
371瀏覽量
10846
原文標題:Redis數(shù)據(jù)傾斜與JD開源hotkey源碼分析揭秘
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關推薦
評論