摘要:?RocksDB版本:v5.13.4 1. 概述 得益于LSM-Tree結(jié)構(gòu),RocksDB所有的寫入并非是update in-place,所以他支持起來事務(wù)的難度也相對(duì)較小,主要原理就是利用WriteBatch將事務(wù)所有寫操作在內(nèi)存緩存打包,然后在commit時(shí)一次性將WriteBatch寫入,保證了原子,另外通過Sequence和Key鎖來解決沖突實(shí)現(xiàn)隔離。
RocksDB版本:v5.13.4
1. 概述
得益于LSM-Tree結(jié)構(gòu),RocksDB所有的寫入并非是update in-place,所以他支持起來事務(wù)的難度也相對(duì)較小,主要原理就是利用WriteBatch將事務(wù)所有寫操作在內(nèi)存緩存打包,然后在commit時(shí)一次性將WriteBatch寫入,保證了原子,另外通過Sequence和Key鎖來解決沖突實(shí)現(xiàn)隔離。
RocksDB的Transaction分為兩類:Pessimistic和Optimistic,類似悲觀鎖和樂觀鎖的區(qū)別,PessimisticTransaction的沖突檢測(cè)和加鎖是在事務(wù)中每次寫操作之前做的(commit后釋放),如果失敗則該操作失敗;OptimisticTransaction不加鎖,沖突檢測(cè)是在commit階段做的,commit時(shí)發(fā)現(xiàn)沖突則失敗。
具體使用時(shí)需要結(jié)合實(shí)際場(chǎng)景來選擇,如果并發(fā)事務(wù)寫入操作的Key重疊度不高,那么用Optimistic更合適一些(省掉Pessimistic中額外的鎖操作)
2. 用法
介紹實(shí)現(xiàn)原理前,先來看一下用法:
【1. 基本用法】
Options?options; TransactionDBOptions?txn_db_options; options.create_if_missing?=?true; TransactionDB*?txn_db;//?打開DB(默認(rèn)Pessimistic)Status?s?=?TransactionDB::Open(options,?txn_db_options,? kDBPath,?&txn_db); assert(s.ok());//?創(chuàng)建一個(gè)事務(wù)Transaction*?txn?=?txn_db->BeginTransaction(write_options); assert(txn);//?事務(wù)txn讀取一個(gè)keys?=?txn->Get(read_options,?"abc",?&value); assert(s.IsNotFound());//?事務(wù)txn寫一個(gè)keys?=?txn->Put("abc",?"def"); assert(s.ok());//?通過TransactionDB::Get在事務(wù)外讀取一個(gè)keys?=?txn_db->Get(read_options,?"abc",?&value);//?通過 TrasactionDB::Put在事務(wù)外寫一個(gè)key//?這里并不會(huì)有影響,因?yàn)閷懙牟皇?abc",不沖突//?如果是"abc"的話//?則Put會(huì)一直卡住直到 超時(shí)或等待事務(wù)Commit(本例中會(huì)超時(shí))s?=?txn_db->Put(write_options,?"xyz",?"zzz"); s?=?txn->Commit(); assert(s.ok());//?析構(gòu)事務(wù)delete?txn;delete?txn_db;
通過BeginTransaction打開一個(gè)事務(wù),然后調(diào)用Put、Get等接口進(jìn)行事務(wù)操作,最后調(diào)用Commit進(jìn)行提交。
【2. 回滾】
...//?事務(wù)txn寫入abcs?=?txn->Put("abc",?"def"); assert(s.ok());//?設(shè)置回滾點(diǎn)txn->SetSavePoint();//?事務(wù)txn寫入cbas?=?txn->Put("cba",?"fed"); assert(s.ok());//?回滾至回滾點(diǎn)s?=?txn->RollbackToSavePoint();//?提交,此時(shí)事務(wù)中不包含對(duì)cba的寫入s?=?txn->Commit(); assert(s.ok()); ...
【3. GetForUpdate】
...//?事務(wù)txn讀取abc并獨(dú)占該key,確保不被外部事務(wù)再修改s?=?txn->GetForUpdate(read_options,?“abc”,?&value); assert(s.ok());//?通過TransactionDB::Put接口在事務(wù)外寫abc//?不會(huì)成功s?=?txn_db->Put(write_options,?“abc”,?“value0”); s?=?txn->Commit(); assert(s.ok()); ...
有時(shí)候在事務(wù)中需要對(duì)某一個(gè)key進(jìn)行先讀后寫,此時(shí)則不能在寫時(shí)才進(jìn)行該key的獨(dú)占及沖突檢測(cè)操作,所以使用GetForUpdate接口讀取該key并進(jìn)行獨(dú)占
【4. SetSnapshot】
txn?=?txn_db->BeginTransaction(write_options);//?設(shè)置事務(wù)txn使用的snapshot為當(dāng)前全局Sequence?Numbertxn-> SetSnapshot();//?使用TransactionDB::Put接口在事務(wù)外部寫abc//?此時(shí)全局Sequence?Number會(huì)加1db-> Put(write_options,?“key1”,?“value0”); assert(s.ok());//?事務(wù)txn寫入abcs?=?txn->Put(“abc”,?“value1”); s?=?txn->Commit();//?這里會(huì)失敗,因?yàn)樵谑聞?wù)設(shè)置了snapshot之后,事務(wù)后來寫的key//?在事務(wù)外部有過其他寫操作, 所以這里不會(huì)成功//?Pessimistic會(huì)在Put時(shí)失敗,Optimistic會(huì)在Commit時(shí)失敗
前面說過,TransactionDB在事務(wù)中需要寫入某個(gè)key時(shí)才對(duì)其進(jìn)行獨(dú)占或沖突檢測(cè),有時(shí)希望在事務(wù)一開始就對(duì)其之后所有要寫入的所有key進(jìn)行獨(dú)占,此時(shí)可以通過SetSnapshot來實(shí)現(xiàn),設(shè)置了Snapshot后,外部一旦對(duì)事務(wù)中將要進(jìn)行寫操作key做過修改,則該事務(wù)最終會(huì)失敗(失敗點(diǎn)取決于是Pessimistic還是Optimistic,Pessimistic因?yàn)樵赑ut時(shí)就進(jìn)行沖突檢測(cè),所以Put時(shí)就失敗,而Optimistic則會(huì)在Commit是檢測(cè)到?jīng)_突,失敗)
3. 實(shí)現(xiàn)
3.1 WriteBatch & WriteBatchWithIndex
WriteBatch就不展開說了,事務(wù)會(huì)將所有的寫操作追加進(jìn)同一個(gè)WriteBatch,直到Commit時(shí)才向DB原子寫入。
WriteBatchWithIndex在WriteBatch之外,額外搞一個(gè)Skiplist來記錄每一個(gè)操作在WriteBatch中的offset等信息。在事務(wù)沒有commit之前,數(shù)據(jù)還不在Memtable中,而是存在WriteBatch里,如果有需要,這時(shí)候可以通過WriteBatchWithIndex來拿到自己剛剛寫入的但還沒有提交的數(shù)據(jù)。
事務(wù)的SetSavePoint和RollbackToSavePoint也是通過WriteBatch來實(shí)現(xiàn)的,SetSavePoint記錄當(dāng)前WriteBatch的大小及統(tǒng)計(jì)信息,若干操作之后,若想回滾,則只需要將WriteBatch truncate到之前記錄的大小并恢復(fù)統(tǒng)計(jì)信息即可。
3.2 PessimisticTransaction
PessimisticTransactionDB通過TransactionLockMgr進(jìn)行行鎖管理。事務(wù)中的每次寫入操作之前都需要TryLock進(jìn)Key鎖的獨(dú)占及沖突檢測(cè),以Put為例:
Status?TransactionBaseImpl::Put(ColumnFamilyHandle*?column_family,???????????????????????????????? const?Slice&?key,?const?Slice&?value)?{??//?調(diào)用TryLock搶鎖及沖突檢測(cè) ??Status?s?= ??????TryLock(column_family,?key,?false?/*?read_only?*/,?true?/*?exclusive?*/);??if?(s.ok())?{ ????s?=?GetBatchForWrite()->Put(column_family,?key,?value);????if?(s.ok())?{ ??????num_puts_++; ????} ??}??return?s; }
可以看到Put接口定義在TransactionBase中,無論P(yáng)essimistic還是Optimistic的Put都是這段邏輯,二者的區(qū)別是在對(duì)TryLock的重載。先看Pessimistic的,TransactionBaseImpl::TryLock通過TransactionBaseImpl::TryLock -> PessimisticTransaction::TryLock -> PessimisticTransactionDB::TryLock -> TransactionLockMgr::TryLock一路調(diào)用到TransactionLockMgr的TryLock,在里面完成對(duì)key加鎖,加鎖成功便實(shí)現(xiàn)了對(duì)key的獨(dú)占,此時(shí)直到事務(wù)commit之前,其他事務(wù)是無法修改這個(gè)key的。
鎖是加成功了,但這也只能說明從此刻起到事務(wù)結(jié)束前這個(gè)key不會(huì)再被外部修改,但如果事務(wù)在最開始執(zhí)行SetSnapshot設(shè)置了快照,如果在打快照和Put之間的過程中外部對(duì)相同key進(jìn)行了修改(并commit),此時(shí)已經(jīng)打破了snapshot的保證,所以事務(wù)之后的Put也不能成功,這個(gè)沖突檢測(cè)也是在PessimisticTransaction::TryLock中做的,如下:
Status?PessimisticTransaction::TryLock(ColumnFamilyHandle*?column_family,?????? const?Slice&?key,?bool?read_only,???????????????????????????????????????bool?exclusive,?bool?skip_validate)?{ ??...??//?加鎖 ??if?(!previously_locked?||?lock_upgrade)?{ ????s?=?txn_db_impl_->TryLock(this,?cfh_id,?key_str,?exclusive); ??} ??SetSnapshotIfNeeded(); ??...?? ????//?使用事務(wù)一開始拿到的snapshot的sequence1與這個(gè)key在DB中最新 ????//?的sequence2進(jìn)行比較,如果sequence2?>?sequence1則代表在snapshot ????//?之后,外部有對(duì)key進(jìn)行過寫入,有沖突! ????s?=?ValidateSnapshot(column_family,?key,?&tracked_at_seq);??????if?(!s.ok())?{????????//?檢測(cè)到?jīng)_突,解鎖 ????????//?Failed?to?validate?key ????????if?(!previously_locked)?{??????????//?Unlock?key?we?just?locked ??????????if?(lock_upgrade)?{ ????????????s?=?txn_db_impl_->TryLock(this,?cfh_id,?key_str,???????????????????????false?/*?exclusive?*/); ????????????assert(s.ok()); ??????????}?else?{ ????????????txn_db_impl_->UnLock(this,?cfh_id,?key.ToString()); ??????????} ????????} ??????}?? ??if?(s.ok())?{????//?如果加鎖及沖突檢測(cè)通過,記錄這個(gè)key以便事務(wù)結(jié)束時(shí)釋放掉鎖 ????//?We?must?track?all?the?locked?keys?so?that?we?can?unlock?them?later.?If ????//?the?key?is?already?locked,?this?func?will?update?some?stats?on?the ????//?tracked?key.?It?could?also?update?the?tracked_at_seq?if?it?is?lower?than ????//?the?existing?trackey?seq. ????TrackKey(cfh_id,?key_str,?tracked_at_seq,?read_only,?exclusive); ??} }
其中ValidateSnapshot就是進(jìn)行沖突檢測(cè),通過將事務(wù)設(shè)置的snapshot與key最新的sequence進(jìn)行比較,如果小于key最新的sequence,則代表設(shè)置snapshot后,外部事務(wù)修改過這個(gè)key,有沖突!獲取key最新的sequence也是簡(jiǎn)單粗暴,遍歷memtable,immutable memtable,memtable list history及SST文件來拿。總結(jié)如下圖:
GetForUpdate的邏輯和Put差不多,無非就是以Get之名行Put之事(加鎖及沖突檢測(cè)),如下圖:
接著介紹下TransactionLockMgr,如下圖:
最外層先是一個(gè)std::unordered_map,將每個(gè)ColumnFamily映射到一個(gè)LockMap,每個(gè)LockMap默認(rèn)有16個(gè)LockMapStripe,然后每個(gè)LockMapStripe里包含一個(gè)std::unordered_map keys,這就是存放每個(gè)key對(duì)應(yīng)的鎖信息的。所以每次加鎖過程大致如下:
首先通過ThreadLocal拿到lock_maps指針
通過column family ID 拿到對(duì)應(yīng)的LockMap
對(duì)key hash映射到某個(gè)LockMapStripe,對(duì)該LockMapStripe加鎖(同一LockMapStripe下的所有key會(huì)搶同一把鎖,粒度略大)
操作LockMapStripe里的std::unordered_map完成加鎖
3.3 OptimisticTransaction
OptimisticTransactionDB不使用鎖進(jìn)行key的獨(dú)占,只在commit是進(jìn)行沖突檢測(cè)。所以O(shè)ptimisticTransaction::TryLock如下:
Status?OptimisticTransaction::TryLock(ColumnFamilyHandle*?column_family,?????????????????????????????????????? const?Slice&?key,?bool?read_only,?????????????????????????????????????? bool?exclusive,?bool?untracked)?{??if?(untracked)?{????return?Status::OK(); ??}??uint32_t?cfh_id?=?GetColumnFamilyID(column_family); ??SetSnapshotIfNeeded();??//?如果設(shè)置了之前事務(wù)snapshot,這里使用它作為key的seq ??//?如果沒有設(shè)置snapshot,則以當(dāng)前全局的sequence作為key的seq ??SequenceNumber?seq;??if?(snapshot_)?{ ????seq?=?snapshot_->GetSequenceNumber(); ??}?else?{ ????seq?=?db_->GetLatestSequenceNumber(); ??}??std::string?key_str?=?key.ToString();??//?記錄這個(gè)key及其對(duì)應(yīng)的seq,后期在commit時(shí)通過使用這個(gè)seq和 ??//?key當(dāng)前的最新sequence比較來做沖突檢測(cè) ??TrackKey(cfh_id,?key_str,?seq,?read_only,?exclusive);??//?Always?return?OK.?Confilct? ??checking?will?happen?at?commit?time. ??return?Status::OK(); }
這里TryLock實(shí)際上就是給key標(biāo)記一個(gè)sequence并記錄,用作commit時(shí)的沖突檢測(cè),commit實(shí)現(xiàn)如下:
Status?OptimisticTransaction::Commit()?{??//?Set?up?callback?which?will?call?CheckTransactionForConflicts()?to ??//?check?whether?this?transaction?is?safe?to?be?committed. ??OptimisticTransactionCallback?callback(this); ??DBImpl*?db_impl?=?static_cast_with_check(db_->GetRootDB());??//?調(diào)用WriteWithCallback進(jìn)行沖突檢測(cè), ??如果沒有沖突就寫入DB ??Status?s?=?db_impl->WriteWithCallback( ??????write_options_,?GetWriteBatch()->GetWriteBatch(),?&callback);??if?(s.ok())?{ ????Clear(); ??}??return?s; }
沖突檢測(cè)的實(shí)現(xiàn)在OptimisticTransactionCallback里,和設(shè)置了snapshot的PessimisticTransaction一樣,最終還是會(huì)調(diào)用TransactionUtil::CheckKeysForConflicts來檢測(cè),也就是比較sequence。整體如下圖:
3.4 兩階段提交(Two Phase Commit)
在分布式場(chǎng)景下使用PessimisticTransaction時(shí),我們可能需要使用兩階段提交(2PC)來確保一個(gè)事務(wù)在多個(gè)節(jié)點(diǎn)上執(zhí)行成功,所以PessimisticTransaction也支持2PC。具體做法也不難,就是將之前commit拆分為prepare和commit,prepare階段進(jìn)行WAL的寫入,commit階段進(jìn)行Memtable的寫入(寫入后其他事務(wù)方可見),所以現(xiàn)在一個(gè)事務(wù)的操作流程如下:
BeginTransaction GetForUpdate Put ... Prepare Commit
使用2PC,我們首先要通過SetName為一個(gè)事務(wù)設(shè)置唯一的標(biāo)識(shí)并注冊(cè)到全局映射表里,這里記錄著所有未完成的2PC事務(wù),當(dāng)Commit后再從映射表里刪除。
接下來具體2PC實(shí)現(xiàn)無非就是在WriteBatch上做文章,通過特殊的標(biāo)記來控制寫WAL和Memtable,簡(jiǎn)單說一下:
正常的WriteBatch結(jié)構(gòu)如下:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC一開始的WriteBatch如下:
Sequence(0);NumRecords(0);Noop;
先使用一個(gè)Noop占位,至于為什么,后面再說。緊接著就是一些操作,操作后,WriteBatch如下:
Sequence(0);NumRecords(3);Noop;Put(a,1);Merge(a,1);Delete(a);
然后執(zhí)行Prepare,寫WAL,在寫WAL之前,先會(huì)隊(duì)WriteBatch做一些改動(dòng),插入Prepare和EndPrepare記錄,如下:
Sequence(0);NumRecords(3);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid)
可以看到這里將之前的Noop占位換成Prepare,然后在結(jié)尾插入EndPrepare(xid),構(gòu)造好WriteBatch后就直接調(diào)用WriteImpl寫WAL了。注意,此時(shí)往WAL里寫的這條日志的sequence雖然比VersionSet的last_sequence大,但寫入WAL之后并不會(huì)調(diào)用SetLastSequence來更新VersionSet的last_sequence,它只有在最后寫入Memtable之后才更新,具體做法就是給VersionSet除了last_sequence_之外,再加一個(gè)last_allocated_sequence_,初始相等,寫WAL是加后者,后者對(duì)外不可見,commit后再加前者。所以一旦PessimisticTransactionDB使用了2PC,就要求所有都是2PC,不然last_sequence_可能會(huì)錯(cuò)亂(更正:如果使用two_write_queues_,不管是Prepare -> Commit還是直接Commit,sequence的增長(zhǎng)都是以last_allocated_sequence_為準(zhǔn),最后用它來調(diào)整last_sequence_;如果不使用two_write_queues_則直接以last_sequence_為準(zhǔn),總之不會(huì)出現(xiàn)sequence混錯(cuò),所以可以Prepare -> Commit和Commit混用)。
WAL寫完之后,即使沒有commit就宕機(jī)也沒事,重啟后Recovery會(huì)將事務(wù)從WAL恢復(fù)記錄到全局recovered_transaction中,等待Commit
最后就是Commit,Commit階段會(huì)使用一個(gè)新的CommitTime WriteBatch,和之前的WriteBatch合并整理后最終使用CommitTime WriteBatch寫Memtable
整理后的CommitTime WriteBatch如下:
Sequence(0);NumRecords(3);Commit(xid);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid);
將CommitTime WriteBatch的WALTerminalPoint設(shè)置到Commit(xid)處,告訴Writer寫WAL時(shí)寫到這里就可以停了,其實(shí)就是只將Commit記錄寫進(jìn)WAL(因?yàn)槠浜蟮挠涗浽赑repare階段就已經(jīng)寫到WAL了);
在最后就是MemTableInserter遍歷這個(gè)CommitTime WriteBatch向memtable寫入,具體就不說了。寫入成功后,更新VersionSet的last_sequence_,至此,事務(wù)成功提交。
4. WritePrepared & WriteUnprepared
我們可以看到無論是Pessimistic還是Optimistic,都有一個(gè)共同缺點(diǎn),那就是在事務(wù)最終Commit之前,所以數(shù)據(jù)都是緩存在內(nèi)存(WriteBatch)里,對(duì)于很大的事務(wù)來說,這非常耗費(fèi)內(nèi)存并且將所有實(shí)際寫入壓力都扔給Commit階段來搞,性能有瓶頸,所以RocksDB正在支持WritePolicy為WritePrepared和WriteUnprepared的PessimisticTransaction,主要思想就是將對(duì)Memtable的寫入提前,
如果放到Prepare階段那就是WritePrepared
如果再往前,每次操作直接寫Memtable那就是WriteUnprepared
可以看到WriteUnprepared無論內(nèi)存占用還是寫入壓力點(diǎn)的分散都做的最好,WritePrepared稍遜。
支持這倆新的WritePolicy的難點(diǎn)在于如何保證寫入到Memtable但還未Commit的數(shù)據(jù)不被其他事物看到,這里就需要在Sequence上大做文章了,目前Rocksdb支持了WritePrepare、而WriteUnprepared還未支持,期待后續(xù)...
5. 隔離級(jí)別
看了前面的介紹,這里就不用展開說了
TransactionDB支持ReadCommitted和RepeatableReads級(jí)別的隔離
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載
評(píng)論
查看更多