PZY

Enjoy software architecture and programming

29 Sep 2022

漏桶限流器原理及算法实现

对于大规模的分布式系统,面对不断波动的网络流量,有可能因瞬时巨额流量导致系统挂掉或雪崩。为保证分布式系统高可用,限流是一项必备手段。通常限流方式有限制总并发数、限制瞬时并发数和限制时间窗口内的平均速率等等。限流是依靠限流算法来实现,主要有固定时间窗口、滑动时间窗口、漏桶算法和令牌桶算法 4 种限流算法。本文只讨论漏桶算法原理及实现。

漏桶限流原理

假设有这样一只水桶,以固定的速度向外流出水的同时,接受外部不定速度的水流入,当水流入速度过大,桶中的水超出水桶容量时,多余的水直接溢出桶外。如下图所示(图片摘自网络)。

漏桶限流图示

上述模型对应到分布式系统,水流相当于网络请求,水桶相当于服务器。网络请求到达服务器后,先将请求进行排队(流入水桶),然后以固定的速率处理请求(流出水桶)。同时,服务器能承受排队中的请求数也有最大容量,当因网络请求量过大,排队中的请求数超出最大容量时,后续到达服务器的网络请求不再处理,直接丢弃(溢出)。

从漏桶限流原理,可得知其特点:从出口处限制请求速率(固定),精准过滤突发请求,请求曲线始终平滑,没有任何突发流量。

漏桶算法实现

漏桶限流算法有很多种实现,本文基于计数原理来实现。

计数原理就是当请求到达时,基于当前时间和固定的处理速率来计算该请求应该等待多长时间(假设为 t)才能被处理,当 t 大于该请求可等待的最大时长,则直接将该请求丢弃。

现在基于计数原理进行算法建模,该模型主要有 3 个变量:

1、请求处理速率,即水桶漏出速率 leakRate,该值代表间隔多久时间处理一个请求(注意不是处理请求的时间)。比如模型初始化时设定leakRate = 100ms,意味着处理 1 个请求后,需要等待 100ms 才能处理下一个请求。

2、桶最大容量,即 bucketCapacity,代表最多允许多少个请求排队,超过该值就直接返回,不用等待了。实际应用中,可以通过请求可等待的最大时长与请求处理速率估算出该值。

3、最近一个入桶的请求处理时间,设为lastTimestamp。这里的时间是指最近一个进入桶排队的请求被处理的时间点,该请求可能已被处理,也可能未被处理,但不是指请求处理时长。该值有两个作用:

  • 计算下一个请求(将)被处理的时间点;
  • 结合当前时间和请求处理速率,估算当前还有多少个请求等待被处理;

那么,计数原理是如何实现漏桶限流呢?

假设有一个空的漏桶,容量大小为 bucketCapacity=5,以 leakRate=100ms 的速率漏出(处理)请求。现在同时有 5 个请求流入桶中,我们通过计数原理,来计算每个请求预处理时间以及等待时间:

  • 第 1 个请求,不用等待,直接被处理。处理时间就是当前时间 (nowTime),有lastTimestamp=nowTime
  • 第 2 个请求,因第 1 个请求已排队并被处理,那么需要按处理速率(leakRate)等待 100ms,即预处理时间就是第 1 个请求被处理时间 + leakRate ,即 lastTimestamp=nowTime + 200ms
  • 第 3 个请求,因第 2 个请求仍在排队,所以应在第 2 个请求被处理后,再等待 100ms 才能被处理,相对于当前时间等待 200ms,即预处理时间就是lastTimestamp=nowTime + 300ms

后续的第 4、5 个请求依次类推,可得出如下图示请求时间轴:

漏桶限流请求时间轴图示

以上情况是假设在空桶的状态下,同时流入了 5 个请求,从第 2 个请求开始,预处理时间都在前一个请求被处理时间上 + leakRate。但如果第 6 个请求是在第 5 个请求进入 50ms 后才进入,那么该请求的预处理时间如何计算?应先计算当前时间距离上次被处理时间的间隔(delta),然后再跟leakRate进行比较,得到差值,最后补全该差值。

以下为每次请求进入后,计算请求(预)处理时间的算法实现:

/// <summary>
/// Evaluation (current request) execution time
/// </summary>
/// <param name="nowTimestamp">current time</param>
/// <param name="lastTimestamp">last exection time</param>
long EvaluateExectionTimestamp(long nowTimestamp, long lastTimestamp)
{
    if (nowTimestamp < lastTimestamp)
    {
        // It means that there are already requests in the queue,
        // so the time for new requests to be processed after they come in and queue up is after the _leakRate
        lastTimestamp += _leakRate;
    }
    else
    {
        // Indicates that the bucket is empty, maybe the initial state, or maybe all requests have been processed.
        //How long to wait for the time to wait for the request to be processed
        long offset = 0;

        //Represents how long the current time has passed since the last time the request was processed
        long delta = nowTimestamp - lastTimestamp;

        if (delta < _leakRate)
        {
            //Indicates that it is not yet time to process the next request, you need to wait for the offset
            offset = _leakRate - delta;
        }
        //If delta >= _leakRate, indicates that the current time has exceeded the _leakRate time since the last time the request was processed,
        //the offset should be 0, and the new request should be processed immediately.

        //Update the time when the request should be processed
        lastTimestamp = nowTimestamp + offset;
    }

    return lastTimestamp;
}

如何计算桶满了?

根据当前时间nowTime,和桶中排队的最后一个请求被处理时间lastTimestamp之间的差值wait = lastTimestamp - nowTime,再除以请求处理速率leakRate,计算出正在等待被处理的请求数量qty = wait / leakRate,最后与bucketCapacity进行比较即可。

接下来定义一个Limit函数,用于计算流入的请求被处理前要等待的时长,以及桶是否满了。

public int Limit(out bool limitExhausted)
{
    //Lock here to ensure that each request is processed in order
    _semaphore.Wait();

    try
    {
        //current time in milliseconds
        long nowTimestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
        var executionTimestamp = EvaluateExectionTimestamp(nowTimestamp, _lastTimestamp);
        //It returns limitExhausted(true) if the the request overflows the bucket's capacity.
        limitExhausted = false;

        // Calculate if the bucket's capacity is full
        long wait = executionTimestamp - nowTimestamp;
        if (wait >= _bucketCapacity * _leakRate)
        {
            //Because the request is to be discarded here, all must keep the state before 
            //the new request is queued
            limitExhausted = true;
        }

        _lastTimestamp = executionTimestamp;
        return (int)wait;
    }
    finally
    {
        _semaphore.Release();
    }
}

针对该函数使用,调用者须遵循以下两点说明:

  1. 每个接收到的请求被处理前,均要调用该函数,每个请求调用一次就相当于该请求流入桶中;
  2. 该函数返回值代表要等待多长时间才能处理请求,调用者通常需要 Sleep 该时长才能执行处理,从而控制了请求“流出”速度。

可参考以下调用示例代码:

int wait = _leakyBucketLimiter.Limit(out bool isFull);

if(isFull)
{
    //Because the bucket is full, the request is discarded.
    AbandonRequest();
}

if(wait > 0)
{
    Thread.Sleep(wait);
}

// process request
ProcessRequest();

结尾

漏桶限流的核心思想就是按固定的速率处理请求,不支持突增流量,因为即使有再多的请求流量,也是按固定的速率被处理。算法实现本质,是按固定的处理速率计算该请求预处理时间以及需要等待的时长。

漏桶限流器算法实现完整代码地址:前往 github

参考资料:

  1. 图解漏桶(LeakyBucket)限流器的实现原理