精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

基于Sentinel自研組件的系統限流、降級、負載保護最佳實踐探索

京東云 ? 來源:jf_75140285 ? 作者:jf_75140285 ? 2024-09-25 11:19 ? 次閱讀

一、Sentinel簡介

Sentinel 以流量為切入點,從流量控制熔斷降級系統負載保護等多個維度保護服務的穩定性。

Sentinel 具有以下特征:

?豐富的應用場景:Sentinel 承接了阿里巴巴近 10 年的雙十一大促流量的核心場景,例如秒殺(即突發流量控制在系統容量可以承受的范圍)、消息削峰填谷、集群流量控制、實時熔斷下游不可用應用等。

?完備的實時監控:Sentinel 同時提供實時的監控功能。您可以在控制臺中看到接入應用的單臺機器秒級數據,甚至 500 臺以下規模的集群的匯總運行情況。

?廣泛的開源生態:Sentinel 提供開箱即用的與其它開源框架/庫的整合模塊,例如與 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相應的依賴并進行簡單的配置即可快速地接入 Sentinel。同時 Sentinel 提供 Java/Go/C++ 等多語言的原生實現。

?完善的 SPI 擴展機制:Sentinel 提供簡單易用、完善的 SPI 擴展接口。您可以通過實現擴展接口來快速地定制邏輯。例如定制規則管理、適配動態數據源等。

wKgZombzgRGALNbkAAAzsBYDrHM39.webp

有關Sentinel的詳細介紹以及和Hystrix的區別可以自行網上檢索,推薦一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw

本次主要使用了Sentinel的降級、限流、系統負載保護功能

二、Sentinel關鍵技術源碼解析

wKgaombzgRKAIZK8AABZyKrIB_c14.webp

無論是限流、降級、負載等控制手段,大致流程如下:

?StatisticSlot 則用于記錄、統計不同維度的 runtime 指標監控信息

?責任鏈依次觸發后續 slot 的 entry 方法,如 SystemSlot、FlowSlot、DegradeSlot 等的規則校驗;

?當后續的 slot 通過,沒有拋出 BlockException 異常,說明該資源被成功調用,則增加執行線程數和通過的請求數等信息。

關于數據統計,主要會牽扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等類。

項目結構

wKgZombzgRKALhSoAAAvgD8b-ZI56.webp

以下主要分析core包里的內容

2.1注解入口

wKgaombzgROAL5a3AAAs0tNz23k95.webp

2.1.1 Entry、Context、Node

SphU門面類的方法出參都是Entry,Entry可以理解為每次進入資源的一個憑證,如果調用SphO.entry()或者SphU.entry()能獲取Entry對象,代表獲取了憑證,沒有被限流,否則拋出一個BlockException。

Entry中持有本次對資源調用的相關信息:

?createTime:創建該Entry的時間戳。

?curNode:Entry當前是在哪個節點。

?orginNode:Entry的調用源節點。

?resourceWrapper:Entry關聯的資源信息。

?

wKgZombzgROAbMCbAABk9EVQ72k83.webp



Entry是一個抽象類,CtEntry是Entry的實現,CtEntry持有Context和調用鏈的信息

Context的源碼注釋如下,

This class holds metadata of current invocation

Node的源碼注釋

Holds real-time statistics for resources

Node中保存了對資源的實時數據的統計,Sentinel中的限流或者降級等功能就是通過Node中的數據進行判斷的。Node是一個接口,里面定義了各種操作request、exception、rt、qps、thread的方法。

wKgaombzgRSASnnRAAAYlC9u3iA54.webp

在細看Node實現時,不難發現LongAddr的使用,關于LongAddr和DoubleAddr都是java8 java.util.concurrent.atomic里的內容,感興趣的小伙伴可以再深入研究一下,這兩個是高并發下計數功能非常優秀的數據結構,實際應用場景里需要計數時可以考慮使用。
關于Node的介紹后續還會深入,此處大致先提一下這個概念。

2.2 初始化

wKgZombzgRSAQmvcAABtPD1xi6A07.webp

2.2.1 Context初始化

在初始化slot責任鏈部分前,還執行了context的初始化,里面涉及幾個重要概念,需要解釋一下:

wKgaombzgRSAEGvUAABJKhCBpiI15.webp

可以發現在Context初始化的過程中,會把EntranceNode加入到Root子節點中(實際Root本身是一個特殊的EntranceNode),并把EntranceNode放到contextNameNodeMap中。

之前簡單提到過Node,是用來統計數據用的,不同Node功能如下:

?Node:用于完成數據統計的接口

?StatisticNode:統計節點,是Node接口的實現類,用于完成數據統計

?EntranceNode:入口節點,一個Context會有一個入口節點,用于統計當前Context的總體流量數據

?DefaultNode:默認節點,用于統計一個資源在當前Context中的流量數據

?ClusterNode:集群節點,用于統計一個資源在所有Context中的總體流量數據

wKgZombzgRWANjQ2AAA9_HkVGyk79.webp

protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            Map localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map newMap = new HashMap(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

2.2.2 通過SpiLoader默認初始化8個slot

wKgaombzgRWAFQ8_AAA6Rj9j4tY53.webpwKgZombzgRaANK94AABSYJ9BpwY35.webpwKgaombzgReAP7nBAABZrA4Bskc57.webpwKgZombzgRiAFgliAABJEvL7C2A31.webpwKgaombzgRiATBAZAAAhYjwq5vg48.webp

每個slot的主要職責如下:

?NodeSelectorSlot 負責收集資源的路徑,并將這些資源的調用路徑,以樹狀結構存儲起來,用于根據調用路徑來限流降級

?ClusterBuilderSlot 則用于存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據

?StatisticSlot 則用于記錄、統計不同緯度的 runtime 指標監控信息

?FlowSlot 則用于根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制

?AuthoritySlot 則根據配置的黑白名單和調用來源信息,來做黑白名單控制

?DegradeSlot 則通過統計信息以及預設的規則,來做熔斷降級

?SystemSlot 則通過系統的狀態,例如 集群QPS、線程數、RT、負載 等,來控制總的入口流量

2.3 StatisticSlot

2.3.1 Node

深入看一下Node,因為統計信息都在里面,后面不論是限流、熔斷、負載保護等都是結合規則+統計信息判斷是否要執行

wKgZombzgR6AS5jlAAAYogpVfdk95.webp

從Node的源碼注釋看,它會持有資源維度的實時統計數據,以下是接口里的方法定義,可以看到totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps等很多request、qps、thread的相關方法:

/**
 * Holds real-time statistics for resources.
 *
 * @author qinan.qn
 * @author leyou
 * @author Eric Zhao
 */
public interface Node extends OccupySupport, DebugSupport {
    long totalRequest();
    long totalPass();
    long totalSuccess();
    long blockRequest();
    long totalException();
    double passQps();
    double blockQps();
    double totalQps();
    double successQps();
    ……
}

2.3.2 StatisticNode

我們先從最基礎的StatisticNode開始看,源碼給出的定位是:

The statistic node keep three kinds of real-time statistics metrics:
metrics in second level ({@code rollingCounterInSecond})
metrics in minute level ({@code rollingCounterInMinute})
thread count

StatisticNode只有四個屬性,除了之前提到過的LongAddr類型的curThreadNum外,還有兩個屬性是Metric對象,通過入參已經屬性命名可以看出,一個用于秒級,一個用于分鐘級統計。接下來我們就要看看Metric

// StatisticNode持有兩個Metric,一個秒級一個分鐘級,由入參可知,秒級統計劃分了兩個時間窗口,窗口程度是500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

// 分鐘級統計劃分了60個時間窗口,窗口長度是1000ms
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

/**
 * The counter for thread count.
 */
private LongAdder curThreadNum = new LongAdder();

/**
 * The last timestamp when metrics were fetched.
 */
private long lastFetchTime = -1;

ArrayMetric只有一個屬性LeapArray,其余都是用于統計的方法,LeapArray是sentinel中統計最基本的數據結構,這里有必要詳細看一下,總體就是根據timeMillis去獲取一個bucket,分為:沒有創建、有直接返回、被廢棄后的reset三種場景。

//以分鐘級的統計屬性為例,看一下時間窗口初始化過程
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);


public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        // windowLengthInMs = 60*1000 / 60 = 1000 滑動窗口時間長度,可見sentinel默認將單位時間分為了60個滑動窗口進行數據統計
        this.windowLengthInMs = intervalInMs / sampleCount;
        // 60*1000
        this.intervalInMs = intervalInMs;
        // 60
        this.intervalInSecond = intervalInMs / 1000.0;
        // 60
        this.sampleCount = sampleCount;
        // 數組長度60
        this.array = new AtomicReferenceArray(sampleCount);
    }

/**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根據當前時間戳算一個數組索引
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        // timeMillis % 1000
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket.
         */
        while (true) {
            WindowWrap old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                // newEmptyBucket 方法重寫,秒級和分鐘級統計對象實現不同
                WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
// 持有一個時間窗口對象的數據,會根據當前時間戳除以時間窗口長度然后散列到數組中
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

WindowWrap持有了windowLengthInMs, windowStart和LeapArray(分鐘統計實現是BucketLeapArray,秒級統計實現是OccupiableBucketLeapArray),對于分鐘級別的統計,MetricBucket維護了一個longAddr數組和一個配置的minRT

/**
 * The fundamental data structure for metric statistics in a time span.
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 * @see LeapArray
 */
public class BucketLeapArray extends LeapArray {

    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}
wKgaombzgSCAYGG_AABzoOt499Y88.webp

對于秒級統計,QPS=20場景下,如何準確統計的問題,此處用到了另外一個LeapArry實現FutureBucketLeapArray,至于秒級統計如何保證沒有統計誤差,讀者可以再研究一下FutureBucketLeapArray的上下文就好。

wKgZombzgSGAU7VSAABTSteXAvw46.webp

2.4 FlowSlot

2.4.1 常見限流算法

介紹sentinel限流實現前,先介紹一下常見限流算法,基本分為三種:計數器、漏斗、令牌桶。

計數器算法

顧名思義,計數器算法就是統計某個時間段內的請求,每單位時間加1,然后與配置的限流值(最大QPS)進行比較,如果超出則觸發限流。但是這種算法不能做到“平滑限流”,以1s為單位時間,100QPS為限流值為例,如下圖,會出現某時段超出限流值的情況

wKgaombzgSGAWjwTAAAp9pHc3f818.webp

因此在單純計數器算法上,又出現了滑動窗口計數器算法,我們將統計時間細分,比如將1s統計時長分為5個時間窗口,通過滾動統計所有時間窗口的QPS作為系統實際的QPS的方式,就能解決上述臨界統計問題,后續我們看sentinel源碼時也能看到類似操作。

wKgZombzgSKAW0KoAAAogmJaPUo10.webp

漏斗算法

wKgaombzgSKALhyFAAAgXgW9Rdw45.webp

不論流量有多大都會先到漏桶中,然后以均勻的速度流出。如何在代碼中實現這個勻速呢?比如我們想讓勻速為100q/s,那么我們可以得到每流出一個流量需要消耗10ms,類似一個隊列,每隔10ms從隊列頭部取出流量進行放行,而我們的隊列也就是漏桶,當流量大于隊列的長度的時候,我們就可以拒絕超出的部分。

漏斗算法同樣的也有一定的缺點:無法應對突發流量。比如一瞬間來了100個請求,在漏桶算法中只能一個一個的過去,當最后一個請求流出的時候時間已經過了一秒了,所以漏斗算法比較適合請求到達比較均勻,需要嚴格控制請求速率的場景。

令牌桶算法

令牌桶算法和漏斗算法比較類似,區別是令牌桶存放的是令牌數量不是請求數量,令牌桶可以根據自身需求多樣性得管理令牌的生產和消耗,可以解決突發流量的問題。

2.4.2 單機限流模式

接下來我們看一下Sentinel中的限流實現,相比上述基本限流算法,Sentinel限流的第一個特性就是引入“資源”的概念,可以細粒度多樣性的支持特定資源、關聯資源、指定鏈路的限流。

wKgZombzgSOAYFyrAABktp8pMbw95.webp

FlowSlot的主要邏輯都在FlowRuleChecker里,介紹之前,我們先看一下Sentinel關于規則的模型描述,下圖分別是限流、訪問控制規則、系統保護規則(Linux負載)、降級規則

wKgaombzgSOACzi2AAAn8sSas2Y78.webp

    /**
     * 流量控制兩種模式 
     *   0: thread count(當調用該api的線程數達到閾值的時候,進行限流)
     *   1: QPS(當調用該api的QPS達到閾值的時候,進行限流)
     */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流量控制閾值,值含義與grade有關
     */
    private double count;

    /**
     * 調用關系限流策略(可以支持關聯資源或指定鏈路的多樣性限流需求)
     *  直接(api 達到限流條件時,直接限流)
     *  關聯(當關聯的資源達到限流閾值時,就限流自己)
     *  鏈路(只記錄指定鏈路上的流量)
     * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);
     * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);
     * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * Reference resource in flow control with relevant resource or context.
     */
    private String refResource;

    /**
     * 流控效果:
     * 0. default(reject directly),直接拒絕,拋異常FlowException
     * 1. warm up, 慢啟動模式(根據coldFactor(冷加載因子,默認3)的值,從閾值/coldFactor,經過預熱時長,才達到設置的QPS閾值)
     * 2. rate limiter  排隊等待
     * 3. warm up + rate limiter
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    private int warmUpPeriodSec = 10;

    /**
     * Max queueing time in rate limiter behavior.
     */
    private int maxQueueingTimeMs = 500;

    /**
    *  是否集群限流,默認為否
    */
    private boolean clusterMode;
    /**
     * Flow rule config for cluster mode.
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * The traffic shaping (throttling) controller.
     */
    private TrafficShapingController controller;

接著我們繼續分析FlowRuleChecker

wKgaombzgSSAfI8jAABAeub7pc889.webpwKgZombzgSWAarFcAAA4NoiKEZs31.webp

canPassCheck第一步會好看limitApp,這個是結合訪問授權限制規則使用的,默認是所有。

wKgaombzgSWAG5V5AAAsbkbEFOk54.webp

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 根據策略選擇Node來進行統計(可以是本身Node、關聯的Node、指定的鏈路)
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }


static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // limitApp是訪問控制使用的,默認是default,不限制來源
        String limitApp = rule.getLimitApp();
        // 拿到限流策略
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
        // 基于調用來源做鑒權
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }
            // 
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE) {
            return ClusterBuilderSlot.getClusterNode(refResource);
        }

        if (strategy == RuleConstant.STRATEGY_CHAIN) {
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        // No node.
        return null;
    }

// 此代碼是load限流規則時根據規則初始化流量整形控制器的邏輯,rule.getRater()返回TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                // 預熱模式返回WarmUpController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                // 排隊模式返回ThrottlingController
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount());
                // 預熱+排隊模式返回WarmUpRateLimiterController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 默認是DefaultController
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

Sentinel單機限流算法

wKgZombzgSaAKpayAAAYlKZtJTE28.webp

上面我們看到根據限流規則controlBehavior屬性(流控效果),會初始化以下實現:

?DefaultController:是一個非常典型的滑動窗口計數器算法實現,將當前統計的qps和請求進來的qps進行求和,小于限流值則通過,大于則計算一個等待時間,稍后再試

?ThrottlingController:是漏斗算法的實現,實現思路已經在源碼片段中加了備注

?WarmUpController:實現參考了Guava的帶預熱的RateLimiter,區別是Guava側重于請求間隔,類似前面提到的令牌桶,而Sentinel更關注于請求數,和令牌桶算法有點類似

?WarmUpRateLimiterController:低水位使用預熱算法,高水位使用滑動窗口計數器算法排隊。

DefaultController

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

ThrottlingController

 public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) {
        this(queueingTimeoutMs, maxCountPerStat, 1000);
    }

    public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) {
        AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive");
        AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0");
        AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0");
        this.maxQueueingTimeMs = queueingTimeoutMs;
        this.count = maxCountPerStat;
        this.statDurationMs = statDurationMs;
        // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate)
        // 可見配置限流值count大于1000時useNanoSeconds會是true否則是false
        if (maxCountPerStat > 0) {
            this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1;
        } else {
            this.useNanoSeconds = false;
        }
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) {
        final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
        long currentTime = System.nanoTime();
        // Calculate the interval between every two requests.
        final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat);

        // Expected pass time of this request.
        long expectedTime = costTimeNs + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            final long curNanos = System.nanoTime();
            // Calculate the time to wait.
            long waitTime = costTimeNs + latestPassedTime.get() - curNanos;
            if (waitTime > maxQueueingTimeNs) {
                return false;
            }

            long oldTime = latestPassedTime.addAndGet(costTimeNs);
            waitTime = oldTime - curNanos;
            if (waitTime > maxQueueingTimeNs) {
                latestPassedTime.addAndGet(-costTimeNs);
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0) {
                sleepNanos(waitTime);
            }
            return true;
        }
    }
    
    // 漏斗算法具體實現
    private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {
        long currentTime = TimeUtil.currentTimeMillis();
        // 計算兩次請求的間隔(分為秒級和納秒級)
        long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

        // 請求的期望的時間
        long expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // latestPassedTime是AtomicLong類型,支持volatile語義
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 計算等待時間
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 如果大于最大排隊時間,則觸發限流
            if (waitTime > maxQueueingTimeMs) {
                return false;
            }
            
            long oldTime = latestPassedTime.addAndGet(costTime);
            waitTime = oldTime - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                latestPassedTime.addAndGet(-costTime);
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0) {
                sleepMs(waitTime);
            }
            return true;
        }
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {
            return false;
        }
        if (useNanoSeconds) {
            return checkPassUsingNanoSeconds(acquireCount, this.count);
        } else {
            return checkPassUsingCachedMs(acquireCount, this.count);
        }
    }

    private void sleepMs(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
        }
    }

    private void sleepNanos(long ns) {
        LockSupport.parkNanos(ns);
    }



long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);

由上述計算兩次請求間隔的公式我們可以發現,當maxCountPerStat(規則配置的限流值QPS)超過1000后,就無法準確計算出勻速排隊模式下的請求間隔時長,因此對應前面介紹的,當規則配置限流值超過1000QPS后,會采用checkPassUsingNanoSeconds,小于1000QPS會采用checkPassUsingCachedMs,對比一下checkPassUsingNanoSeconds和checkPassUsingCachedMs,可以發現主體思路沒變,只是統計維度從毫秒換算成了納秒,因此只看checkPassUsingCachedMs實現就可以

WarmUpController

@Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 開始計算它的斜率
        // 如果進入了警戒線,開始調整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        long oldValue = storedTokens.get();
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判斷前提條件:
        // 當令牌的消耗程度遠遠低于警戒線的時候
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

2.4.3 集群限流

passClusterCheck方法(因為clusterService找不到會降級到非集群限流)

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            // 獲取當前節點是Token Client還是Token Server
            TokenService clusterService = pickClusterService();
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            // 根據獲取的flowId通過TokenService進行申請token。從上面可知,它可能是TokenClient調用的,也可能是ToeknServer調用的。分別對應的類是DefaultClusterTokenClient和DefaultTokenService
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

//獲取當前節點是Token Client還是Token Server。
//1) 如果當前節點的角色是Client,返回的TokenService為DefaultClusterTokenClient;
//2)如果當前節點的角色是Server,則默認返回的TokenService為DefaultTokenService。
private static TokenService pickClusterService() {
        if (ClusterStateManager.isClient()) {
            return TokenClientProvider.getClient();
        }
        if (ClusterStateManager.isServer()) {
            return EmbeddedClusterTokenServerProvider.getServer();
        }
        return null;
    }

集群限流模式

Sentinel 集群限流服務端有兩種啟動方式:

?嵌入模式(Embedded)適合應用級別的限流,部署簡單,但對應用性能有影響

?獨立模式(Alone)適合全局限流,需要獨立部署

考慮到文章篇幅,集群限流有機會再展開詳細介紹。

集群限流模式降級

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService();
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        // 可以看到如果集群限流有異常,會降級到單機限流模式,如果配置不允許降級,那么直接會跳過此次校驗
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

2.5 DegradeSlot

wKgaombzgSaAMxx9AABsXDTthU836.webp

CircuitBreaker

大神對斷路器的解釋:https://martinfowler.com/bliki/CircuitBreaker.html

首先就看到了根據資源名稱獲取斷路器列表,Sentinel的斷路器有兩個實現:RT模式使用ResponseTimeCircuitBreaker、異常模式使用ExceptionCircuitBreaker

wKgZombzgSeAeuTYAAAlav0FYr086.webp

public interface CircuitBreaker {

    /**
     * Get the associated circuit breaking rule.
     *
     * @return associated circuit breaking rule
     */
    DegradeRule getRule();

    /**
     * Acquires permission of an invocation only if it is available at the time of invoking.
     *
     * @param context context of current invocation
     * @return {@code true} if permission was acquired and {@code false} otherwise
     */
    boolean tryPass(Context context);

    /**
     * Get current state of the circuit breaker.
     *
     * @return current state of the circuit breaker
     */
    State currentState();

    /**
     * Record a completed request with the context and handle state transformation of the circuit breaker.
     * Called when a passed invocation finished.
     *
     * @param context context of current invocation
     */
    void onRequestComplete(Context context);

    /**
     * Circuit breaker state.
     */
    enum State {
        /**
         * In {@code OPEN} state, all requests will be rejected until the next recovery time point.
         */
        OPEN,
        /**
         * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
         * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
         * will re-transform to the {@code OPEN} state and wait for the next recovery time point;
         * otherwise the resource will be regarded as "recovered" and the circuit breaker
         * will cease cutting off requests and transform to {@code CLOSED} state.
         */
        HALF_OPEN,
        /**
         * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
         * the circuit breaker will transform to {@code OPEN} state.
         */
        CLOSED
    }
}


以ExceptionCircuitBreaker為例看一下具體實現

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    
    // 異常模式有兩種,異常率和異常數
    private final int strategy;
    // 最小請求數
    private final int minRequestAmount;
    // 閾值
    private final double threshold;
    
    // LeapArray是sentinel統計數據非常重要的一個結構,主要封裝了時間窗口相關的操作
    private final LeapArray stat;

    public ExceptionCircuitBreaker(DegradeRule rule) {
        this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
    }

    ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) {
        super(rule);
        this.strategy = rule.getGrade();
        boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
        AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
        AssertUtil.notNull(stat, "stat cannot be null");
        this.minRequestAmount = rule.getMinRequestAmount();
        this.threshold = rule.getCount();
        this.stat = stat;
    }

    @Override
    protected void resetStat() {
        // Reset current bucket (bucket count = 1).
        stat.currentWindow().value().reset();
    }

    
    @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {
            counter.getErrorCount().add(1);
        }
        counter.getTotalCount().add(1);

        handleStateChangeWhenThresholdExceeded(error);
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        if (currentState.get() == State.OPEN) {
            return;
        }
        
        if (currentState.get() == State.HALF_OPEN) {
            // In detecting request
            if (error == null) {
                fromHalfOpenToClose();
            } else {
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        
        List counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            
 += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        if (totalCount < minRequestAmount) {
            return;
        }
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            // Use errorRatio
            curCount = errCount * 1.0d / totalCount;
        }
        if (curCount > threshold) {
            transformToOpen(curCount);
        }
    }

    static class SimpleErrorCounter {
        private LongAdder errorCount;
        private LongAdder totalCount;

        public SimpleErrorCounter() {
            this.errorCount = new LongAdder();
            this.totalCount = new LongAdder();
        }

        public LongAdder getErrorCount() {
            return errorCount;
        }

        public LongAdder getTotalCount() {
            return totalCount;
        }

        public SimpleErrorCounter reset() {
            errorCount.reset();
            totalCount.reset();
            return this;
        }

        @Override
        public String toString() {
            return "SimpleErrorCounter{" +
                "errorCount=" + errorCount +
                ", totalCount=" + totalCount +
                '}';
        }
    }

    static class SimpleErrorCounterLeapArray extends LeapArray {

        public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
            super(sampleCount, intervalInMs);
        }

        @Override
        public SimpleErrorCounter newEmptyBucket(long timeMillis) {
            return new SimpleErrorCounter();
        }

        @Override
        protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
            // Update the start time and reset value.
            w.resetTo(startTime);
            w.value().reset();
            return w;
        }
    }
}


2.6 SystemSlot

校驗邏輯主要集中在com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,可以看到,作為負載保護規則校驗,實現了集群的QPS、線程、RT(響應時間)、系統負載的控制,除系統負載以外,其余統計都是依賴StatisticSlot實現,系統負載是通過SystemRuleManager定時調度SystemStatusListener,通過OperatingSystemMXBean去獲取

/**
     * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.
     *
     * @param resourceWrapper the resource.
     * @throws BlockException when any system rule's threshold is exceeded.
     */
    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
        if (resourceWrapper == null) {
            return;
        }
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get()) {
            return;
        }

        // for inbound traffic only
        if (resourceWrapper.getEntryType() != EntryType.IN) {
            return;
        }

        // total qps 此處是拿到某個資源在集群中的QPS總和,相關概念可以會看初始化關于Node的介紹
        double currentQps = Constants.ENTRY_NODE.passQps();
        if (currentQps + count > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread 
        int currentThread = Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        double rt = Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

    private static boolean checkBbr(int currentThread) {
        if (currentThread > 1 &&
            currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
            return false;
        }
        return true;
    }

    public static double getCurrentSystemAvgLoad() {
        return statusListener.getSystemAverageLoad();
    }

    public static double getCurrentCpuUsage() {
        return statusListener.getCpuUsage();
    }


public class SystemStatusListener implements Runnable {

    volatile double currentLoad = -1;
    volatile double currentCpuUsage = -1;

    volatile String reason = StringUtil.EMPTY;

    volatile long processCpuTime = 0;
    volatile long processUpTime = 0;

    public double getSystemAverageLoad() {
        return currentLoad;
    }

    public double getCpuUsage() {
        return currentCpuUsage;
    }

    @Override
    public void run() {
        try {
            OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
            currentLoad = osBean.getSystemLoadAverage();

            /*
             * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:
             * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
             * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
             * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
             * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
             * system. If the system recent cpu usage is not available, the method returns a negative value.
             */
            double systemCpuUsage = osBean.getSystemCpuLoad();

            // calculate process cpu usage to support application running in container environment
            RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
            long newProcessCpuTime = osBean.getProcessCpuTime();
            long newProcessUpTime = runtimeBean.getUptime();
            int cpuCores = osBean.getAvailableProcessors();
            long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
                    .toMillis(newProcessCpuTime - processCpuTime);
            long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
            double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
            processCpuTime = newProcessCpuTime;
            processUpTime = newProcessUpTime;

            currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);

            if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
                writeSystemStatusLog();
            }
        } catch (Throwable e) {
            RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
        }
    }

    private void writeSystemStatusLog() {
        StringBuilder sb = new StringBuilder();
        sb.append("Load exceeds the threshold: ");
        sb.append("load:").append(String.format("%.4f", currentLoad)).append("; ");
        sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append("; ");
        sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append("; ");
        sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append("; ");
        sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append("; ");
        sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append("; ");
        sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append("; ");
        sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append("; ");
        RecordLog.info(sb.toString());
    }
}

三、京東版最佳實踐

3.1 使用方式

Sentinel使用方式本身非常簡單,就是一個注解,但是要考慮規則加載和規則持久化的方式,現有的方式有:

?使用Sentinel-dashboard功能:使用面板接入需要維護一個配置規則的管理端,考慮到偏后端的系統需要額外維護一個面板成本較大,如果是像RPC框架這種本身有管理端的接入可以考慮次方案。

?中間件(如:zookepper、nacos、eureka、redis等):Sentinel源碼extension包里提供了類似的實現,如下圖

wKgaombzgSeAd_CAAAB3zrQv8S460.webp

結合京東實際,我實現了一個規則熱部署的Sentinel組件,實現方式類似zookeeper的方式,將規則記錄到ducc的一個key上,在spring容器啟動時做第一次規則加載和監聽器注冊,組件也做一了一些規則讀取,校驗、實例化不同規則對象的工作

插件使用方式:注解+配置

第一步 引入組件


    com.jd.ldop.tools
    sentinel-tools
    1.0.0-SNAPSHOT


第二步 初始化sentinelProcess

支持ducc、本地文件讀取、直接寫入三種方式規則寫入方式

目前支持限流規則、熔斷降級規則兩種模式,系統負載保護模式待開發和驗證


    
        
            
                
            
        
    

    
    
        
        
        
    

ducc上配置如下:

wKgZombzgSmAYejKAAAvYKdtjbM29.webp

第三步 定義資源和關聯類型

通過@SentinelResource可以直接在任意位置定義資源名以及對應的熔斷降級或者限流方式、回調方法等,同時也可以指定關聯類型,支持直接、關聯、指定鏈路三種

    @Override
    @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade")
    public ExecutionResult> execute(@NotNull Model imodel) {
        // 業務邏輯處理
    }

    public ExecutionResult> executeDegrade(@NotNull Model imodel) {
        // 降級業務邏輯處理
    }

3.2 應用場景

組件支持任意的業務降級、限流、負載保護

四、Sentinel壓測數據

4.1 壓測目標

調用量:1.2W/m

應用機器內存穩定在50%以內

機器規格: 8C16G50G磁盤*2

Sentinel降級規則:

count=350-------慢調用臨界閾值350ms

timeWindow=180------熔斷時間窗口180s

grade=0-----降級模式 慢調用

statIntervalMs=60000------統計時長1min

4.2 壓測結果

wKgaombzgSmAFCvgAAAiBNLvvAI50.webp

應用機器監控:

壓測分為了兩個階段,分別是組件開啟和組件關閉兩次,前半部分是組件開啟的情況,后半部分是組件關閉的情況

wKgZombzgSqAX1LBAAAQbo138W896.webp

wKgaombzgSqAQU6kAAAMkKQ7TOc76.webp

wKgZombzgSuAAsj2AAARNolHUbE80.webp

應用進程內存分析,和sentinel有關的前三對象是

com.alibaba.csp.sentinel.node.metric.MetricNode

wKgaombzgSuAMa1vAAALjDJiKK006.webp

wKgZombzgS2AbDiuAAALjiYNxI871.webp

wKgaombzgS6AIs_yAAAMTrYk5i042.webp

wKgZombzgS6AQHNgAAAK_vyE3qs23.webp

wKgaombzgS-AazuHAAALpvNE-x030.webp

com.alibaba.csp.sentinel.CtEntry

wKgZombzgS-ABENlAAAKjlfL14k15.webp

wKgaombzgTCAdo6PAAAKHk-dJl471.webp

com.alibaba.csp.sentinel.context.Context

wKgZombzgTCADWRoAAALRjytYPM06.webp

wKgaombzgTGAX4NjAAAKgExNa5Y40.webp

wKgZombzgTGAFxzTAAALHGORMdk29.webp

4.3 壓測結論

使Sentinel組件實現系統服務自動降級或限流,由于sentinel會按照滑動窗口周期性統計數據,因此會占用一定的機器內存,使用時應設置合理的規則,如:合理的統計時長、避免過多的Sentinel資源創建等。

總體來說,使用sentinel組件對應用cpu和內存影響不大。

審核編輯 黃宇

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 負載
    +關注

    關注

    2

    文章

    560

    瀏覽量

    34253
  • Sentinel
    +關注

    關注

    0

    文章

    10

    瀏覽量

    7143
收藏 人收藏

    評論

    相關推薦

    開源分析和落地方案—Sentinel

    作者:京東物流 劉達 一、Sentinel是什么? Sentinel是從阿里技術體系內誕生并由相關社區從微服務到云原生階段持續孵化的流量治理組件,在服務熔斷限流以及秒級/分鐘級監控方面
    的頭像 發表于 11-08 10:10 ?845次閱讀
    開源分析和落地方案—<b class='flag-5'>Sentinel</b>篇

    最佳天線實踐、布局指南以及天線調試程序

    有限的硬幣型電池)獲得的無線射程主要取決于天線的設計、塑料外殼以及良好的PCB布局。對于芯片和電源相同但布局和天線設計實踐不同的系統,它們的RF(射頻)范圍變化超過50%也是正常的。本應用筆記介紹了最佳
    發表于 05-21 08:51

    BI軟件的ETL用開源的好,還是的好?

    更快的速度解決,并對系統做進一步優化。再加上ETL全面可視化這一優勢,顯然使用具有ETL的BI軟件更劃算。
    發表于 08-27 09:44

    USB限流IC,限流開關保護芯片

    PW1503和PW1502是超低RDS(ON)開關,具有可編程電流限制的USB限流IC,以保護電源于過電流負載和正極負極短路的保護。它具有過溫保護
    發表于 04-19 14:34

    USB限流IC,限流開關保護芯片

    PW1503和PW1502是超低RDS(ON)開關,具有可編程電流限制的USB限流IC,以保護電源于過電流負載和正極負極短路的保護。它具有過溫保護
    發表于 04-22 17:50

    如何根據負載輕松而精確地進行限流

    問題:我可以根據負載輕松而精確地進行限流嗎?答案:可以使用限流IC進行限流。在一些電源管理應用中,需要精確地限制電流。無論是要保護電源(例如
    發表于 11-07 11:27

    根據負載添加限流功能的兩種方式

    問題:我可以根據負載輕松而精確地進行限流嗎?答案:可以使用限流IC進行限流。在一些電源管理應用中,需要精確地限制電流。無論是要保護電源(例如
    發表于 11-15 09:29

    阿里巴巴宣布 Sentinel 開源,進一步完善 Dubbo 生態

    務的流行,服務和服務之間的穩定性變得越來越重要。Sentinel以流量為切入點,從流量控制、熔斷降級系統負載保護等多個維度
    發表于 08-14 17:32 ?210次閱讀

    Sentinel如何通過限流實現服務的高可用性

    為切入點,從流量控制、熔斷降級系統負載保護等多個維度來幫助用戶保障服務的穩定性。從本期開始,我們將圍繞Sentinel的使用場景、技術對比
    發表于 08-20 16:19 ?308次閱讀

    Sentinel 如何通過勻速請求和冷啟動來保障服務的穩定性

    同時處理 5 條消息,其余的全部被拒絕,即使后面的時間系統資源充足多余的請求也無法被處理,因而浪費了許多空閑資源。三、Sentinel 基于 Apache RocketMQ 的最佳實踐
    發表于 08-28 14:09 ?323次閱讀

    限流降級(下) | 如何打造平臺穩定性能力(二)

    四大功能。授權:通過配置白名單和黑名單的方式分布式系統的接口和方法進行調用權限的控制;限流:對特定資源進行調用的保護,防止資源的過度使用;降級:判斷依賴的資源的響應情況,但依賴的資源響
    發表于 09-03 14:52 ?65次閱讀
    <b class='flag-5'>限流</b>和<b class='flag-5'>降級</b>(下) | 如何打造平臺穩定性能力(二)

    設計模式最佳實踐探索—策略模式

    根據不同的應用場景與意圖,設計模式主要分為創建型模式、結構型模式和行為型模式三類。本文主要探索行為型模式中的策略模式如何更好地應用于實踐中。
    的頭像 發表于 10-31 14:24 ?921次閱讀

    部署Linux的最佳實踐探索

    編者按:本文節選自節選自《基于Linux的企業自動化》第五章。“第5章,使用Ansible構建用于部署的虛擬機模板,通過構建虛擬機模板來探索部署Linux的最佳實踐,虛擬機模板將以實際操作的方式大規模部署在虛擬機管理程序上。”
    的頭像 發表于 05-16 09:35 ?546次閱讀

    限流負載開關應用中的電流限制和短路保護

    電子發燒友網站提供《限流負載開關應用中的電流限制和短路保護.pdf》資料免費下載
    發表于 09-24 10:49 ?0次下載
    <b class='flag-5'>限流</b><b class='flag-5'>負載</b>開關應用中的電流限制和短路<b class='flag-5'>保護</b>

    MES系統最佳實踐案例

    效率、降低成本、保證產品質量。 MES系統最佳實踐案例 引言 在當今競爭激烈的制造業環境中,企業必須不斷尋求創新和改進的方法來保持競爭力。MES系統作為一種關鍵的信息技術工具,已經被
    的頭像 發表于 10-27 09:33 ?585次閱讀