最近不是正好在研究 canal 嘛,剛巧前兩天看了一篇關于解決緩存與數據庫一致性問題的文章,里邊提到了一種解決方案是結合 canal 來操作的,所以阿Q就想趁熱打鐵,手動來實現一下。
架構
文中提到的思想是:
- 采用先更新數據庫,后刪除緩存的方式來解決并發引發的一致性問題;
- 采用異步重試的方式來保證“更新數據庫、刪除緩存”這兩步都能執行成功;
- 可以采用訂閱變更日志的方式來清除 Redis 中的緩存;
基于這種思想,阿Q腦海中搭建了以下架構
- APP 從 Redis 中查詢信息,將數據的更新寫入 MySQL 數據庫中;
- Canal 向 MySQL 發送 dump 協議,接收 binlog 推送的數據;
- Canal 將接收到的數據投遞給 MQ 消息隊列;
- MQ 消息隊列消費消息,同時刪除 Redis 中對應數據的緩存;
環境準備
這篇文章中有 canal 的安裝教程以及對 mysql 的相關配置:canal安裝
考慮到我們服務器之前安裝過 RabbitMQ ,所以我們就用 RabbitMQ 來充當消息隊列吧。
Canal 配置
修改 conf/canal.properties
配置
# 指定模式
canal.serverMode = rabbitMQ
# 指定實例,多個實例使用逗號分隔: canal.destinations = example1,example2
canal.destinations = example
# rabbitmq 服務端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虛擬主機
rabbitmq.virtual.host = /
# rabbitmq 交換機
rabbitmq.exchange = xxx
# rabbitmq 用戶名
rabbitmq.username = xxx
# rabbitmq 密碼
rabbitmq.password = xxx
rabbitmq.deliveryMode =
修改實例配置文件 conf/example/instance.properties
#配置 slaveId,自定義,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10
# 數據庫地址:配置自己的ip和端口
canal.instance.master.address=ip:port
# 數據庫用戶名和密碼
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
# 指定庫和表
canal.instance.filter.regex=.*\\\\..* // 這里的 .* 表示 canal.instance.master.address 下面的所有數據庫
# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx
然后重啟 canal 服務。
數據庫
建表語句
CREATE TABLE `product_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,4) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
數據初始化
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(1, '從你的全世界路過', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(2, '喬布斯傳', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(3, 'java開發', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');
實戰
項目引入的依賴比較多,為了不占用過多的篇幅,大家可以在公眾號【阿Q說代碼】后臺回復“canal”獲取項目源碼!
MySQL 和 Redis 的相關配置在此不再贅述,有不懂的可以私聊阿Q:qingqing-4132;
RabbitMQ 配置
@Configuration
public class RabbitMQConfig {
public static final String CANAL_QUEUE = "canal_queue";//隊列
public static final String DIRECT_EXCHANGE = "canal";//交換機,要與canal中配置的相同
public static final String ROUTING_KEY = "routingkey";//routing-key,要與canal中配置的相同
/**
* 定義隊列
**/
@Bean
public Queue canalQueue(){
return new Queue(CANAL_QUEUE,true);
}
/**
* 定義直連交換機
**/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 隊列和交換機綁定
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
}
}
商品信息入緩存
/**
* 獲取商品信息:
* 先從緩存中查,如果不存在再去數據庫中查,然后將數據保存到緩存中
* @param productInfoId
* @return
*/
@Override
public ProductInfo findProductInfo(Long productInfoId) {
//1.從緩存中獲取商品信息
Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
if(ObjectUtil.isNotEmpty(object)){
return (ProductInfo)object;
}
//2.如果緩存中不存在,從數據庫獲取信息
ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
if(productInfo != null){
//3.將商品信息緩存
redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
return productInfo;
}
return null;
}
執行方法后,查看 Redis 客戶端是否有數據存入
更新數據入MQ
/**
* 更新商品信息
* @param productInfo
* @return
*/
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
productInfoService.updateById(productInfo);
return AjaxResult.success();
}
當我執行完 update 方法的時候,去RabbitMQ Management
查看,發現并沒有消息進入隊列。
問題描述
通過排查之后我在服務器中 canal 下的 /usr/local/logs/example/example.log
文件里發現了問題所在。
原因就是meta.dat
中保存的位點信息和數據庫的位點信息不一致導致 canal 抓取不到數據庫的動作。
于是我找到 canal 的 conf/example/instance.properties
實例配置文件,發現沒有將canal.instance.master.address=127.0.0.1:3306
設置成自己的數據庫地址。
解決方案
- 先停止 canal 服務的運行;
- 刪除
meta.dat
文件; - 再重啟 canal,問題解決;
再次執行 update 方法,會發現 RabbitMQ Management
中已經有我們想要的數據了。
MQ接收數據
編寫 RabbitMQ 消費代碼的邏輯
@RabbitListener(queues = "canal_queue")//監聽隊列名稱
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消費的隊列消息來自:" + message.getMessageProperties().getConsumerQueue());
//刪除reids中對應的key
ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
log.info("庫名:"+ productInfoDetail.getDatabase());
log.info("表名: "+ productInfoDetail.getTable());
if(productInfoDetail!=null && productInfoDetail.getData()!=null){
List
當我們再次調用 update
接口時,控制臺會打印以下信息
從圖中打印的信息可以看出就是我們的庫和表以及消息隊列,Redis 客戶端中緩存的信息也被刪除了。
拓展
看到這,你肯定會問:RabbitMQ 是閱后即焚的機制,它確認消息被消費者消費后會立刻刪除,如果此時我們的業務還沒有跑完,沒來的及刪除 Redis 中的緩存就宕機了,豈不是緩存一直都得不到更新了嗎?
首先我們要明確的是 RabbitMQ 是通過消費者回執來確認消費者是否成功處理消息的,即消費者獲取消息后,應該向 RabbitMQ 發送 ACK 回執,表明自己已經處理消息了。
為了不讓上述問題出現,消費者返回 ACK 回執的時機就顯得非常重要了, 而 SpringAMQP 也為我們提供了三種可選的確認模式:
- manual:手動 ack,需要在業務代碼結束后,調用 api 發送 ack;
- auto:自動 ack ,由 spring 監測 listener 代碼是否出現異常,沒有異常則返回 ack,拋出異常則返回 nack;
- none:關閉 ack,MQ 假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除;
由此可知在 none 模式下消息投遞最不可靠,可能會丟失消息;在默認的 auto 模式下如果出現服務器宕機的情況也是會丟失消息的,本次實戰中,阿Q為了防止消息丟失采用的是 manual 這種模式,配置信息如下:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #開啟手動確認
所以在代碼中也就出現了
//用于肯定確認
channel.basicAck(deliveryTag, true);
//用于否定確認
channel.basicReject(deliveryTag ,true);
當然此種模式雖然不會丟失消息,但是會導致效率變低。
-
緩存
+關注
關注
1文章
233瀏覽量
26649 -
數據庫
+關注
關注
7文章
3765瀏覽量
64276 -
MySQL
+關注
關注
1文章
802瀏覽量
26445
發布評論請先 登錄
相關推薦
評論