在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot :收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot :用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot :用于记录、统计不同纬度的 runtime 指标监控信息;
  • SystemSlot :通过系统的状态,例如 load1 等,来控制总的入口流量;
  • AuthoritySlot :根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • FlowSlot :用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • DegradeSlot :通过统计信息以及预设的规则,来做熔断降级;

总体的框架如下:

img

  从这个架构图可以发现,整个调用链中最核心的就是 StatisticSlot(用于记录、统计不同纬度的 runtime 指标监控信息) 以及FlowSlot(根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制).

  Chain是链条的意思,从build的方法可看出,ProcessorSlotChain是一个链表,里面添加了很多个Slot。具体的实现需要到DefaultProcessorSlotChain中去看。

着重注意两个 Slot ,就像我们使用的时候一样,我们需要配置规则,那么在Sentinel 中去校验这个规则的是 FlowSlot ,既然是一个做规则匹配的,那么进行匹配的数据是哪里来的呢? 在Sentinel中他提供了一个Slot 来统计这些数据,然后交给FlowSlot进行校验,他就是StatisticSlot。我们首先来看StatisticSlot的entry方法中的实现逻辑:代码分成了两部分,第一部分是entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。第二部分是在exit方法中,当退出该Entry入口时,会统计rt的时间,并减少线程数。

  我们可以看到 node.addPassRequest() 这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest()这行代码,我们跟进去看看:首先我们知道这里的node是一个 DefaultNode 实例,在第一个NodeSelectorSlot 的entry方法中对资源进行了封装,封装成了一个DefaultNode。

  • DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode

  • ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中

      从代码中我们可以看到,增加指标调用 addPass 是通过一个叫 ArrayMetric 的类,现在我们在进入 ArrayMetric 中看一下

private final LeapArray data;
// SAMPLE_COUNT=2 INTERVAL=1000
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

public void addPass(int count) {
WindowWrap wrap = data.currentWindow();
wrap.value().addPass(count);
}

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class LeapArray<T> {

// 样本窗口的长度
protected int windowLengthInMs;25
// 一个时间窗的样本数量
protected int sampleCount;4
// 时间窗长度
protected int intervalInMs;100

// 采样的时间窗口数组
protected AtomicReferenceArray<WindowWrap<T>> array;

/**
* LeapArray对象
* @param windowLength 时间窗口的长度,单位:毫秒
* @param intervalInSec 统计的间隔,单位:秒
*/
public LeapArray(int windowLength, int intervalInSec) {
this.windowLength = windowLength;
// 时间窗口的采样个数,默认为2个采样窗口
this.sampleCount = intervalInSec * 1000 / windowLength;
this.intervalInMs = intervalInSec * 1000;

this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}
}
1
2
3
4
5
6
7
8
9
public class WindowWrap<T> {
  // yang窗口的长度
private final long windowLengthInMs;
  // 时间窗口的开始时间,单位是毫秒
private long windowStart;
   //时间窗口的内容,在 WindowWrap 中是用泛型表示这个值的,但实际上就是 MetricBucket 类
private T value;
//......省略部分代码
}

image-20230329165726095.png

img

可以很清晰的看出来在 LeapArray 中创建了一个 AtomicReferenceArray 数组,用来对时间窗口中的统计值进行采样。通过采样的统计值再计算出平均值,就是我们需要的最终的实时指标的值了。可以看到我在上面的代码中通过注释,标明了默认采样的时间窗口的个数是2个,这个值是怎么得到的呢?我们回忆一下 LeapArray 对象创建,是通过在 StatisticNode 中,new了一个 ArrayMetric,然后将参数一路往上传递后创建的:

1
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);

我们跟进获取当前窗口的方法 data.currentWindow() 中:

代码很长,我们逐步将其分解,我们实际可以把他分成以下几步:

  1. 根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx。
  2. 根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位。
  3. 根据索引idx,在采样窗口数组中取得一个时间窗口。
  4. 循环判断直到获取到一个当前时间窗口 old 。
    1. 如果old为空,则创建一个时间窗口,并将它插入到array的第idx个位置,array上面已经分析过了,是一个 AtomicReferenceArray。
    2. 如果当前窗口的开始时间time与old的开始时间相等,那么说明old就是当前时间窗口,直接返回old。
    3. 如果当前窗口的开始时间time大于old的开始时间,则说明old窗口已经过时了,将old的开始时间更新为最新值:time,进入下一次得循环再判断当前窗口的开始时间time与old的开始时间相等的时候返回。
    4. 如果当前窗口的开始时间time小于old的开始时间,实际上这种情况是不可能存在的,因为time是当前时间,old是过去的一个时间。

  另外timeId是会随着时间的增长而增加,当前时间每增长一个windowLength的长度,timeId就加1。但是idx不会增长,只会在0和1之间变换,因为array数组的长度是2,只有两个采样时间窗口。至于为什么默认只有两个采样窗口,个人觉得因为sentinel是比较轻量的框架。时间窗口中保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多内存,另一方面时间窗口过多就意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动。

先来看一下其中的第一步及第二步:

1
2
3
4
5
6
7
8
9
10
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
long timeId = timeMillis / windowLengthInMs;
// idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
return (int)(timeId % array.length());
}

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}

以此类推随着时间的流逝,时间窗口也在发生变化,在当前时间点中进入的请求,会被统计到当前时间对应的时间窗口中,回到addpass 方法中:获取到窗口以后会进入到 wrap.value().addPass(count); QPS的增加。而这里的 wrap.value() 得到的是之前提到的 MetricBucket ,在 Sentinel 中QPS相关数据的统计结果是维护在这个类的 LongAdder[] 中,最终由这个指标来与我们实现设置好的规则进行匹配,查看是否限流,也就是 StatisticSlot

的entry 方法中的 fireEntry(context, resourceWrapper, node, count, prioritized, args); 都要先进入到 FlowSlot的entry方法进行限流过滤: