目录

一、前言

二、流控架构

三、Sentinel 滑动窗口算法核心类图

四、计数原理

五、滑动窗口机制


一、前言

首先我们开门见山,什么是 Sentinel 流控?Sentinel 是面向分布式服务框架的轻量级流量控制框架,主要以流量为切入点,从流量控制,熔断降级,系统负载保护等多个维度来维护系统的稳定性。Sentinel 不仅仅可以做到流控,还可以做到其他熔断降级等方面。

那什么叫滑动窗口呢?滑动窗口(Sliding window)是一种流量控制技术。早期的网络通信中,通信双方不会考虑网络的拥挤情况直接发送数据。由于大家不知道网络拥塞状况,同时发送数据,导致中间节点阻塞掉包,谁也发不了数据,所以就有了滑动窗口机制来解决此问题。

二、流控架构

先来看下 Sentinel 的整体流控架构,有个全局的概念。

图片

Sentinel 整体工作流程采用责任链模式,功能定义 Slot,计数通过 Node,在 Slot 中通过选用不同的 Node 实现不同的流控模式。

  • Node 用于不同纬度的计数

  • Slot 实现不同的功能

  • Resource 受保护的资源

  • Rule 保护资源规则

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;

  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;

  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;

  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;

  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;

  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

  • clusterNode:资源唯一标识的 ClusterNode 的 runtime 统计

  • origin:根据来自不同调用者的统计信息

  • defaultnode: 根据上下文条目名称和资源 ID 的 runtime 统计

  • 入口流量的统计 Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

三、Sentinel 滑动窗口算法核心类图

图片

  • Metric

    指标收集核心接口,主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。

  • ArrayMetric

    对外使用的类,隐藏了时间窗口的具体实现,其有一个成员变量 LeapArray。

  • LeapArray

    时间窗的口底层实现,里面有一个时间窗的数组,数组里面的元素为 WindowWrap,即时间窗口。

  • WindowWrap

    时间窗口,T 表示要统计的数据,为 MetricBucket。

  • MetricBucket

    统计量,里面包含了多个具体统计的变量,变量的"类型"由 MetrciEvent 决定。

  • MetrciEvent

    统计量类型,和 MetricBucktet 里面保存的统计变量一一对应,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。

四、计数原理

StatisticSlot 是限流的基石,基本上后续限流所依赖的数据都在这里统计,看上面 Sentinel 滑动窗口算法核心类图其实也可以看得出来。

图片


StatisticSlot 主要就做了3件事:

  • 触发后续 slot 的 entry方法,进行规则校验

  • 校验通过则更新 node 实时指标数据

  • 校验不通过则更新 node 异常指标数据

注意:由于后续的 fireEntry 操作和更新本次统计信息是两个操作,不是原子的,会造成限流不准的小问题,比如设置的 FlowRule count 为 100,并发情况下可能稍大于 100,不过针对大部分场景来说,这点偏差是可以容忍的,毕竟我们要的是限流效果,而不是必须精确的限流操作。

4.1 更新 node 实时指标数据

我们可以看到 node.addPassRequest() 这段代码是在 fireEntry 执行之后执行的,这意味着,当前请求通过了 sentinel 的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest() 这行代码,具体的代码如下所示:

// DefaultNode
@Override
public void addPassRequest(int count) {
    super.addPassRequest(count);
    this.clusterNode.addPassRequest(count);
}

这里的 node 是一个 DefaultNode 实例,这里特别补充一个 DefaultNode 和 ClusterNode 的区别:

  • DefaultNode:保存着某个 resource 在某个 context 中的实时指标,每个 DefaultNode 都指向一个 ClusterNode。

  • ClusterNode:保存着某个 resource 在所有的 context 中实时指标的总和,同样的 resource 会共享同一个 ClusterNode,不管他在哪个 context 中。

上面代码不管是 DefaultNode 还是 ClusterNode ,走的都是 StatisticNode 对象的 addPassRequest 方法:

/**
 * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
 * by given {@code sampleCount}.
 */
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

/**
 * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
 * meaning each bucket per second, in this way we can get accurate statistics of each second.
 */
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

@Override
public void addPassRequest(int count) {
    // 对每秒指标统计
    rollingCounterInSecond.addPass(count);
    // 每分钟指标统计
    rollingCounterInMinute.addPass(count);
}

每一个通过的指标(pass)都是调用 Metric 的接口进行操作的,并且是通过 ArrayMetric 这种实现类,代码如下:

public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
    if (enableOccupy) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    } else {
        this.data = new BucketLeapArray(sampleCount, intervalInMs);
    }
}

@Override
public void addPass(int count) {
    // 获取当前时间窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 当前事件窗口计数
    wrap.value().addPass(count);
}

首先通过 currentWindow() 获取当前时间窗口,然后更新当前时间窗口对应的统计指标,以下代码重点关注几个判断逻辑:

// LeapArray
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

public static long currentTimeMillis() {
    // 由一个tick线程每个1s更新一次,具体逻辑在TimeUtil类中。
    return INSTANCE.getTime();
}

图片

图片


首先获取系统当前系统的时间戳然后再通过时间戳去获取时间窗口,注意这里并没有直接使用System.currentTimeMillis() 获取系统时间,因为这样获取系统时间会涉及到用户态到内核态的切换,在高并发时性能损耗会变大,使用 TimeUtil 获取时间,TimeUtil 在类加载时会创建一个线程去维护时间的获取这个线程主要做两件事:检查系统空闲和在系统繁忙时维护一个时间,在系统空闲时使用System.currentTimeMillis() 获取 OS 时间,在非空闲时使用程序维护的时间,其实一些高性能中间件都会有类似的设计比如 Redis。

接下来就是用拿到时间戳去获取时间窗口,获取时间窗口的逻辑如下,注意这里看起来窗口的时间都是连续的,实际上在内存中有可能是不连续的,对于过期的窗口只有流量落在这个窗口时窗口才会更新。

/**
 * Get bucket item at provided timestamp.
 *
 * @param timeMillis a valid timestamp in milliseconds
 * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算当前时间点落在滑动窗口的下标
    int idx = calculateTimeIdx(timeMillis);
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket.
     */
    while (true) {
        // 获取当前时间点对应的windowWrap,array为AtomicReferenceArray。
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            // 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中。
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            // 2.获取的时间窗口正好对应当前时间,直接返回。
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            // 3.获取的时间窗口为老的,进行reset操作复用。
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            // 4.时间回拨了,正常情况下不会走到这里。
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

获取当前时间窗口对应的 WindowWrap 之后,就可以进行更新操作了。

// wrap.value().addPass(count);
public void addPass(int n) {
    add(MetricEvent.PASS, n);
}
// MetricBucket
public MetricBucket add(MetricEvent event, long n) {
    // 对应MetricEvent枚举中值
    counters[event.ordinal()].add(n);
    return this;
}

图片

到这里为止,整个指标统计流程就完成了,下面重点看下滑动窗口机制。

五、滑动窗口机制

时间窗口是用 WindowWrap 对象表示的,其属性如下:

图片


sentinel 默认有每秒和每分钟的滑动窗口,对应的 LeapArray 类型,它们的初始化逻辑是:

图片


Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

图片

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐