精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Xline源碼解讀(二)—Lease的機制與實現

冬至子 ? 來源:達坦科技DatenLord ? 作者:達坦科技DatenLord ? 2023-10-08 14:21 ? 次閱讀

01、簡介

Xline 是一款開源的分布式 KV 存儲引擎,用于管理少量的關鍵性數據,其核心目標是實現高性能的數據訪問,以及保證跨數據中心場景下的強一致性。 Xline 對外提供了一系列兼容 etcd 的訪問接口,比如 KV、Watch、Lease 等等。本文將會著重介紹一下其中的 Lease 接口

Lease 是一種客戶端和服務端之間的租約機制。類似于我們現實生活中的租車服務,當我們需要使用一輛車時,我們可以向租車公司申請一個 lease,租車公司會給我們分配一輛車,并且保證在我們和租車公司約定的有效期內不會把這輛車分配給其他人,如果我們想要延長使用時間,我們可以向租車公司續租,如果我們不再需要使用這輛車,我們可以主動歸還并取消,或者等待 lease 過期后自動歸還。

在 Xline 中對 lease 的使用和現實生活中的租車服務很相似,客戶端可以向服務點申請一個 lease,服務端會保證在 lease 的有效期內不會刪除這個 lease,客戶端也可以通過相應的接口提前結束或者延長 lease 的時間,與現實中租車不同的是,我們可以在這個 lease 上綁定一些 key-value,這些 key-value 會隨著 lease 的過期被刪除。

根據以上介紹的 lease 的能力,我們可以在很多場景下使用 lease 來實現我們的目的,以下是幾個常見的 lease 應用場景:

  • 分布式鎖: 分布式鎖是通過多個機制一同實現的,lease 在分布式鎖中起到避免死鎖的作用。客戶端在請求分布式鎖的時候,會創建一個 lease 并不斷續租,并且寫入 key-value 并附加該 lease,這個 key-value 代表分布式鎖的占用狀態,如果占用該鎖的客戶端因故障無法主動釋放鎖,lease 機制也會保證在 lease 過期后自動刪除對應的 key-value 來釋放當前鎖。
  • 服務注冊中心: 注冊新服務時創建 lease,并寫入服務相關信息的 key-value 附加該 lease,在服務存活期間,對應服務會一直對其 lease 續租,服務故障后無法自動續租,對應 key-value 自動刪除,相應的服務就會在注冊中心中注銷。
  • 分布式系統中的授權管理: 客戶端通過申請 lease 來獲取資源的訪問權限,如果客戶端失去與服務端的連接,或者由于故障沒有及時續租,導致 lease 過期,該客戶端就會失去相應的權限

02、架構

上圖是一個 lease 實現的簡單架構圖,外部 Client 可以通過兩種方式向Xline集群發送請求,一種是直接通過 Curp 協議向集群內所有節點廣播請求,Curp 模塊達成共識后,會把這個請求應用到狀態機,也就是將其寫入存儲層;另一種發送請求的方式就是 Client 直接將請求發送到集群中一個節點的 LeaseServer,這也是與 etcd 兼容的請求方式,請求到達 LeaseServer 后,會有兩條不同的處理路徑,多數請求會通過 Server 端綁定的 Curp client 廣播給集群中所有節點,剩下的少部分請求可能只有部分節點能夠處理,這些請求就會被轉發到這些節點的 LeaseServer,然后應用到狀態機。

03、源碼分析

源碼組織

Lease 相關的源碼主要保存在以下文件中,大致分為三個部分:

  1. RPC 定義:
  • xlineapi/proto/rpc.proto:Xline 內各 Server 的 rpc 接口定義,包括 LeaseServer接口定義。
  • xlineapi/proto/lease.proto:lease 的 rpc message 定義。
  1. LeaseServer實現:
  • xline/src/server/lease_server.rs:負責提供 Lease RPC service 的具體實現,主要目的是提供 etcd 兼容接口,如果使用外部的 curp client 直接發送 propose 可以不經過此接口,但也有部分不經過共識協議的請求必須通過 LeaseServer 處理。
    LeaseStore實現:
  • xline/src/storage/lease_store/lease.rs:定義了Lease數據結構,用于保存 Lease相關的信息,比如 Lease 上綁定的所有 Key, Lease 的過期時間,Lease 的剩余 TTL 長度等。并為其實現了一些實用的方法。
  • xline/src/storage/lease_store/lease_queue.rs:定義了LeaseQueue和相關的方法,LeaseQueue 是一個由 lease id 以及 lease 過期時間組成的優先隊列,一個后臺常駐 task 會定時通過此結構獲取所有過期 lease 的 id。
  • xline/src/storage/lease_store/lease_collection.rs:定義了LeaseCollection和相關的方法,LeasCollection是 lease 核心數據結構的集合,提供 lease 機制的核心能力。結構內部主要包含三個部分,lease_map 保存所有 lease 結構;item_map 緩存 key 到 lease id 映射;expired_queue 管理 lease 過期時間,expired_queue只在 leader 節點上有意義,其它節點上為空。
  • xline/src/storage/lease_store/mod.rsLeaseStore 的定義及方法實現。負責提供 lease 的存儲層抽象,對外提供所有 lease 相關操作的存儲層接口。其內部包含 LeaseCollection 以及和 KvStore共享的一些數據結構。

Lease 的創建

想要使用 lease,首先就要創建一個 lease,創建 lease 時需要使用 LeaseServer 提供的 LeaseGrant 接口。LeaseServer 中對 LeaseGrant 的處理很簡單,就是分配一個 lease id,然后通過 propose 把請求交給共識協議處理,達成共識后,請求會在 LeaseStore 中被執行。

LeaseStore 會在 LeaseCollection 中創建并插入一個新的 Lease,其核心代碼邏輯如下:

...if is_leader {    let expiry = lease.refresh(Duration::ZERO);    let _ignore = inner.expired_queue.insert(lease_id, expiry);} else {    lease.forever();}let _ignore = inner.lease_map.insert(lease_id, lease.clone());...

需要注意的是,如果當前節點是 leader 節點的話,還需要承擔管理 lease 過期時間的任務,所以需要通過refresh 方法計算 Lease 的過期時間,并將其插入到 expired_queue 中。其他節點則不需要這一步處理,只需要將新的 Lease 插入到 lease_map 中。計算過期時間使用的 refresh 定義如下:

Lease 創建完成后,服務端會給客戶端返回一個包含 lease id 的響應。

Lease的使用

獲取到 lease id 后,客戶端就可以通過 lease id 來使用這個 lease,在 Put 一對 key value 時可以附加 lease id,這個 Put 請求被應用到狀態機時,除了直接在 KvStoreIndexDB 中寫入 key-value 以外,還會通過LeaseCollection 提供的 detach方法分離當前 key 和舊的 lease ,并通過 attach 將需要 put 的 key 附加到新的 lease id 上。

pub(crate) fn attach(&self, lease_id: i64, key: Vec< u8 >) - > Result< (), ExecuteError > {    let mut inner = self.inner.write();    let Some(lease) = inner.lease_map.get_mut(&lease_id) else {        return  Err(ExecuteError::lease_not_found(lease_id));    };    lease.insert_key(key.clone());    let _ignore = inner.item_map.insert(key, lease_id);    Ok(())}

attach 的具體實現就是通過 lease id 找到對應的 Lease,并將 key 附加到 Lease上,以及在 item_map中添加 key 到 lease id 的映射關系。detach 的實現與 attach的相反,它會移除 attach 時插入的內容。

經過以上的過程,我們已經成功將 key 和 lease id 關聯在一起,此時如果這個 Lease 被主動 revoke 或者超時,那么這個 Lease以及它關聯的所有 key,都會被刪除。

Lease 的主動刪除

刪除一個 lease 需要調用 LeaseRevoke接口,這個接口在 LeaseServer 中的處理與 LeaseGrant基本相同,都是將請求交給共識協議處理,唯一的不同是 LeaseRevoke 不需要分配 lease id。

let del_keys = match self.lease_collection.look_up(req.id) {    Some(l) = > l.keys(),    None = > return Err(ExecuteError::lease_not_found(req.id)),};if del_keys.is_empty() {    let _ignore = self.lease_collection.revoke(req.id);    return Ok(Vec::new());}// delete keys ...let _ignore = self.lease_collection.revoke(req.id);

LeaseRevoke 被執行時,首先會嘗試查找 Lease 是否有關聯的 key,如果沒有,那么就可以直接通過 LeaseCollection 上的 revoke方法將 Lease 刪除,如果有關聯的 key 的話那么就需要將關聯的所有 key 從 KvStore 中刪除,并清理 LeaseCollection中這些 key 和 lease id 的關系,然后才能從 LeaseCollectionreovke這個 Lease

Lease 的過期

Lease 過期時的處理流程如上圖所示,此處省略了共識的部分,在初始化 LeaseServer 時,會創建一個后臺常駐的 revoke_expired_leases_task,這個 task 的主體代碼如下:

loop {    // only leader will check expired lease    if lease_server.lease_storage.is_primary() {        for id in lease_server.lease_storage.find_expired_leases() {            let _handle = tokio::spawn({                let s = Arc::clone(&lease_server);                async move {                    let  request = tonic::Request::new(LeaseRevokeRequest { id });                    if let Err(e) = s.lease_revoke(request).await {                        warn!("Failed to revoke expired leases: {}", e);                    }                }            });        }    }    time::sleep(DEFAULT_LEASE_REQUEST_TIME).await;}

在負責管理 Lease 過期時間節點上,這個 task 會定時通過 find_expired_leases 獲取已經過期的所有 lease id, 然后調用 lease server 上的 lease_revoke 接口來刪除過期的 Lease,這個接口和客戶度主動刪除 Lease 時使用的是同一個接口。

find_expired_leasesLeaseCollection 上一個核心方法,具體實現如下:


pub(crate) fn find_expired_leases(&self) - > Vec< i64 > {    let mut expired_leases = vec![];    let mut inner = self.inner.write();    while let Some(expiry) = inner.expired_queue.peek() {        if *expiry <= Instant::now() {            #[allow(clippy::unwrap_used)] // queue.peek() returns Some            let id = inner.expired_queue.pop().unwrap();            if inner.lease_map.contains_key(&id) {                expired_leases.push(id);            }        } else {            break;        }    }    expired_leases}

在創建 Lease時,我們已經計算過了Lease過期的時間并將其插入了 expired_queue ,調用 find_expired_queue 時會一直嘗試從優先隊列隊頭拿出已經過期的 Lease ,直到遇到第一個不過期的 Lease 后停止嘗試,然后將拿到的所有 lease id 返回。

Lease 的續租

如果想要讓創建的 Lease 能夠持續更長時間,那就需要在客戶端和服務端之間維護一條 stream,客戶端定時向服務端發送 LeaseKeepAlive請求。和前面提到的請求不同,LeaseKeepAlive請求不需要經過共識協議,因為這個請求依賴只存在于 leader 節點上的 Lease過期時間,因此只有 leader 節點能夠處理 LeaseKeepAlive 請求,follower 節點會把請求轉發至 leader 節點上處理。具體的轉發邏輯可以參考 lease_server.rs 內的源碼。

在 leader 和 client 建立起 stream 后,每當 leader 從 stream 中收到 lease id,都會為這個 lease 續租,最終續租的邏輯是通過 LeaseCollection 提供的 renew 方法實現的。該方法定義如下:

pub(crate) fn renew(&self, lease_id: i64) - > Result< i64, ExecuteError > {    let mut inner = self.inner.write();    let (expiry, ttl) = {        let Some(lease) = inner.lease_map.get_mut(&lease_id) else {            return Err(ExecuteError::lease_not_found(lease_id));        };        if lease.expired() {            return Err(ExecuteError::lease_expired(lease_id));        }        let expiry = lease.refresh(Duration::default());        let ttl = lease.ttl().as_secs().cast();        (expiry, ttl)    };    let _ignore = inner.expired_queue.update(lease_id, expiry);    Ok(ttl)}

Renew 會先檢查對應 Lease 是否已經過期,沒有過期的話就會重新計算過期時間,然后更新它在 expired_queue 中的順序。

只要 client 和 server 之間的連接不中斷,client 就會一直通過 stream 向服務端發送 LeaseKeepAlive 請求,這個 lease 也就不會超時,前文提到的 lease 主要的應用場景中,幾乎都用到了這個特性來判斷客戶端是否在正常運行。

Lease 信息的讀取

Lease 有兩個讀取接口,一個是 LeaseTimeToLive,這個接口會讀取一個 lease 的詳細信息,包括它的過期時間,和 LeaseKeepAlive 一樣,因為過期時間只存在于 leader 節點,因此該請求需要轉發只 leader 處理;另一個讀取接口是 LeaseLeases,這個接口會列出系統中所有的 lease id,這個接口不需要 lease 過期時間的信息,因此可以直接交給共識協議處理,所以在 LeaseServer中的處理和 LeaseGrantLeaseRevoke 相似。此處不再贅述。

LeaseTimeToLiveLeaseLeases 讀取信息的能力最終由 LeaseCollection 實現,源碼如下:

pub(crate) fn look_up(&self, lease_id: i64) - > Option< Lease > {    self.inner.read().lease_map.get(&lease_id).cloned()}    pub(crate) fn leases(&self) - > Vec< Lease > {    let mut leases = self        .inner        .read()        .lease_map        .values()        .cloned()        .collect::< Vec< _ >>();    leases.sort_by_key(Lease::remaining);    leases}

04、總結

本文介紹了 Xline 下的一個重要接口 Lease,用戶可以通過 Lease 實現一組 key 的定時過期,并且能夠通過 KeepAlive 接口為 Lease 續租,服務端也能夠根據此特性探測客戶端是否在正常運作。依賴于 Lease 機制的這些特點,也誕生出了很多典型的應用場景,比如本文介紹過的分布式鎖、服務注冊中心,授權管理等等。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • RPC
    RPC
    +關注

    關注

    0

    文章

    111

    瀏覽量

    11515
收藏 人收藏

    評論

    相關推薦

    Faster Transformer v2.1版本源碼解讀

    寫在前面 :本文將對 Faster Transformer v2.1 版本源碼進行解讀,重點介紹該版本基于 v1.0 和 v2.0 所做的優化內容,剖析源碼作者優化意圖。 1 v2.1 版本發布背景
    的頭像 發表于 09-19 11:39 ?1339次閱讀
    Faster Transformer v2.1版本<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>

    OneFlow Softmax算子源碼解讀之WarpSoftmax

    寫在前面:近來筆者偶然間接觸了一個深度學習框架 OneFlow,所以這段時間主要在閱讀 OneFlow 框架的 cuda 源碼。官方源碼基于不同場景分三種方式實現 Softmax,本文主要介紹其中一種的
    的頭像 發表于 01-08 09:24 ?761次閱讀
    OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之WarpSoftmax

    OneFlow Softmax算子源碼解讀之BlockSoftmax

    寫在前面:筆者這段時間工作太忙,身心俱疲,博客停更了一段時間,現在重新撿起來。本文主要解讀 OneFlow 框架的第種 Softmax 源碼實現細節,即 block 級別的 Soft
    的頭像 發表于 01-08 09:26 ?662次閱讀
    OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之BlockSoftmax

    聊聊Dubbo - Dubbo可擴展機制源碼解析

    很不錯呀,接下來,我們就深入Dubbo的源碼,一睹廬山真面目。在Dubbo可擴展機制實戰中,我們了解了Dubbo擴展機制的一些概念,初探了Dubbo中LoadBalance的實現,并自
    發表于 06-05 18:43

    直播系統源碼選擇開的好處是什么

    所謂的源碼開就是在已經搭建好的直播平臺框架和基礎上進行新的功能增減或者更改機制,那么我們選擇源碼次開發有什么好處呢:第一、節省時間成本一
    發表于 06-13 15:34

    闡述FreeRTOS系統中機制實現原理

    2--嵌入式操作系統FreeRTOS的原理與實現摘自::FreeRTOS是一個源碼公開的免費的嵌入式實時操作系統,通過研究其內核可以更好地理解嵌入式操作系統的實現原理.本文主要闡述FreeRTOS系統中的任務調度
    發表于 12-22 07:15

    AP側中網相關的PLMN業務源碼流程解讀

    的運營商相關信息獲取的業務,比如我們常見的手機狀態欄上的運營商名稱顯示。下面來針對 AP 側中搜網相關的 PLMN 業務解讀源碼流程。
    發表于 03-24 15:48

    風河:用“商業機制”保護開放源碼的價值

    風河:用“商業機制”保護開放源碼的價值 有人說,高速公路之所以能實現快速,最主要的功勞在于它的封閉性而不是道路筆直。用這個觀念來理解微軟的成功,是很
    發表于 11-26 09:03 ?738次閱讀

    OC的消息轉發機制的深度解讀

    相信大家對Object-C的消息傳遞機制并不陌生(如果不熟悉,我后續會再寫一篇關于消息傳遞機制的文章),今天我來講解另外一個重要的問題,就是對象在收到無法解讀的消息之后會發生什么情況。 若想令類能
    發表于 09-25 17:33 ?0次下載

    基于EAIDK的人臉算法應用-源碼解讀(2)

    上一期介紹了基于EAIDK的人臉算法應用,本期從應用角度,解讀一下該案例源碼。本期案例源碼解讀,主要從源碼目錄結構、配置文件、模型目...
    的頭像 發表于 12-10 21:14 ?856次閱讀

    openharmony源碼解讀

    如何獲取OpenHarmony源碼并說明OpenHarmony的源碼目錄結構。OpenHarmony的代碼以組件的形式開放,開發者可以通過如下其中一種方式獲取:
    的頭像 發表于 06-24 09:29 ?3780次閱讀

    Faster Transformer v1.0源碼詳解

    解讀的內容僅限 Faster Transformer v1.0 版本,更高版本的源碼將在后續文章中繼續解讀
    的頭像 發表于 09-08 10:20 ?930次閱讀
    Faster Transformer v1.0<b class='flag-5'>源碼</b>詳解

    Xline源碼解讀(一)—初識CURP協議

    Xline 是一款開源的分布式 KV 存儲引擎,其核心目的是實現高性能的跨數據中心強一致性,提供跨數據中心的meatdata 管理
    的頭像 發表于 10-08 14:17 ?833次閱讀
    <b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(一)—初識CURP協議

    Xline源碼解讀(三)—CURP Server的實現

    現在,讓我們把目光集中在 curp 共識模塊上。在 Xline 中,curp 模塊是一個獨立的 crate,其所有的源代碼都保存在 curp 目錄下,curp 目錄中有以下組織
    的頭像 發表于 10-08 14:23 ?734次閱讀
    <b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(三)—CURP Server的<b class='flag-5'>實現</b>

    分布式系統中Membership Change 源碼解讀

    由于 Xline 使用 Raft 作為后端協議,因此想要為 Xline 添加動態變更成員的能力,就需要解決 Raft 協議自身會遇到的問題。
    的頭像 發表于 03-08 14:23 ?435次閱讀
    分布式系統中Membership Change <b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>