漏桶限流器原理及算法实现
1970 words - 10 mins对于大规模的分布式系统,面对不断波动的网络流量,有可能因瞬时巨额流量导致系统挂掉或雪崩。为保证分布式系统高可用,限流是一项必备手段。通常限流方式有限制总并发数、限制瞬时并发数和限制时间窗口内的平均速率等等。限流是依靠限流算法来实现,主要有固定时间窗口、滑动时间窗口、漏桶算法和令牌桶算法 4 种限流算法。本文只讨论漏桶算法原理及实现。
漏桶限流原理
假设有这样一只水桶,以固定的速度向外流出水的同时,接受外部不定速度的水流入,当水流入速度过大,桶中的水超出水桶容量时,多余的水直接溢出桶外。如下图所示(图片摘自网络)。
上述模型对应到分布式系统,水流相当于网络请求,水桶相当于服务器。网络请求到达服务器后,先将请求进行排队(流入水桶),然后以固定的速率处理请求(流出水桶)。同时,服务器能承受排队中的请求数也有最大容量,当因网络请求量过大,排队中的请求数超出最大容量时,后续到达服务器的网络请求不再处理,直接丢弃(溢出)。
漏桶限流算法可以粗略的认为就是注水漏水过程,往桶中以一定速率流出水,以任意速率流入水,当剩余水量超过桶容量则丢弃,因为桶容量是不变的,保证了整体的速率。所以漏桶算法天生不会出现临界问题。
漏桶算法实现
漏桶限流算法有很多种实现,本文基于计数原理来实现。
计数原理就是当请求到达时,基于当前时间和固定的处理速率来计算该请求应该等待多长时间(假设为
t
)才能被处理,当t
大于该请求可等待的最大时长,则直接将该请求丢弃。
现在基于计数原理进行算法建模,该模型主要有 3 个变量:
1、请求处理速率,即水桶漏出速率 leakRate
,该值代表间隔多久时间处理一个请求(注意不是处理请求的时间)。比如模型初始化时设定leakRate = 100ms
,意味着处理 1 个请求后,需要等待 100ms 才能处理下一个请求。
2、桶最大容量,即 bucketCapacity
,代表最多允许多少个请求排队,超过该值就直接返回,不用等待了。实际应用中,可以通过请求可等待的最大时长与请求处理速率估算出该值。
3、最近一个入桶的请求处理时间,设为lastTimestamp
。这里的时间是指最近一个进入桶排队的请求被处理的时间点,该请求可能已被处理,也可能未被处理,但不是指请求处理时长。该值有两个作用:
- 计算下一个请求(将)被处理的时间点;
- 结合当前时间和请求处理速率,估算当前还有多少个请求等待被处理;
那么,计数原理是如何实现漏桶限流呢?
假设有一个空的漏桶,容量大小为 bucketCapacity=5
,以 leakRate=100ms
的速率漏出(处理)请求。现在同时有 5 个请求流入桶中,我们通过计数原理,来计算每个请求预处理时间以及等待时间:
- 第 1 个请求,不用等待,直接被处理。处理时间就是当前时间 (
nowTime
),有lastTimestamp=nowTime
; - 第 2 个请求,因第 1 个请求已排队并被处理,那么需要按处理速率(
leakRate
)等待 100ms,即预处理时间就是第 1 个请求被处理时间 +leakRate
,即lastTimestamp=nowTime + 100ms
; - 第 3 个请求,因第 2 个请求仍在排队,所以应在第 2 个请求被处理后,再等待 100ms 才能被处理,相对于当前时间等待 200ms,即预处理时间就是
lastTimestamp=nowTime + 200ms
;
后续的第 4、5 个请求依次类推,可得出如下图示请求时间轴:
以上情况是假设在空桶的状态下,同时流入了 5 个请求,从第 2 个请求开始,预处理时间都在前一个请求被处理时间上 + leakRate
。但如果第 6 个请求是在第 5 个请求进入 50ms 后才进入,那么该请求的预处理时间如何计算?应先计算当前时间距离上次被处理时间的间隔(delta
),然后再跟leakRate
进行比较,得到差值,最后补全该差值。
以下为每次请求进入后,计算请求(预)处理时间的算法实现:
/// <summary>
/// Evaluation (current request) execution time
/// </summary>
/// <param name="nowTimestamp">current time</param>
/// <param name="lastTimestamp">last exection time</param>
long EvaluateExectionTimestamp(long nowTimestamp, long lastTimestamp)
{
if (nowTimestamp < lastTimestamp)
{
// It means that there are already requests in the queue,
// so the time for new requests to be processed after they come in and queue up is after the _leakRate
lastTimestamp += _leakRate;
}
else
{
// Indicates that the bucket is empty, maybe the initial state, or maybe all requests have been processed.
//How long to wait for the time to wait for the request to be processed
long offset = 0;
//Represents how long the current time has passed since the last time the request was processed
long delta = nowTimestamp - lastTimestamp;
if (delta < _leakRate)
{
//Indicates that it is not yet time to process the next request, you need to wait for the offset
offset = _leakRate - delta;
}
//If delta >= _leakRate, indicates that the current time has exceeded the _leakRate time since the last time the request was processed,
//the offset should be 0, and the new request should be processed immediately.
//Update the time when the request should be processed
lastTimestamp = nowTimestamp + offset;
}
return lastTimestamp;
}
如何计算桶满了?
根据当前时间nowTime
,和桶中排队的最后一个请求被处理时间lastTimestamp
之间的差值wait = lastTimestamp - nowTime
,再除以请求处理速率leakRate
,计算出正在等待被处理的请求数量qty = wait / leakRate
,最后与bucketCapacity
进行比较即可。
接下来定义一个Limit
函数,用于计算流入的请求被处理前要等待的时长,以及桶是否满了。
public int Limit(out bool limitExhausted)
{
//Lock here to ensure that each request is processed in order
_semaphore.Wait();
try
{
//current time in milliseconds
long nowTimestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
var executionTimestamp = EvaluateExectionTimestamp(nowTimestamp, _lastTimestamp);
//It returns limitExhausted(true) if the the request overflows the bucket's capacity.
limitExhausted = false;
// Calculate if the bucket's capacity is full
long wait = executionTimestamp - nowTimestamp;
if (wait >= _bucketCapacity * _leakRate)
{
//Because the request is to be discarded here, all must keep the state before
//the new request is queued
limitExhausted = true;
}
_lastTimestamp = executionTimestamp;
return (int)wait;
}
finally
{
_semaphore.Release();
}
}
针对该函数使用,调用者须遵循以下两点说明:
- 每个接收到的请求被处理前,均要调用该函数,每个请求调用一次就相当于该请求流入桶中;
- 该函数返回值代表要等待多长时间才能处理请求,调用者通常需要 Sleep 该时长才能执行处理,从而控制了请求“流出”速度。
可参考以下调用示例代码:
int wait = _leakyBucketLimiter.Limit(out bool isFull);
if(isFull)
{
//Because the bucket is full, the request is discarded.
AbandonRequest();
}
if(wait > 0)
{
Thread.Sleep(wait);
}
// process request
ProcessRequest();
结尾
漏桶限流的核心思想就是按固定的速率处理请求,不支持突增流量,因为即使有再多的请求流量,也是按固定的速率被处理。算法实现本质,是按固定的处理速率计算该请求预处理时间以及需要等待的时长。
漏桶限流器算法实现完整代码地址:前往 github。
参考资料: