百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

常见的4种限流算法与实现(限流方法原理)

mhr18 2025-04-09 18:00 21 浏览 0 评论

限流的实现常见的限流算法

常见的限流算法

限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。

常见的限流算法有三种:

计数器限流(固定窗口)

「原理:」

  • 时间线划分为多个独立且固定大小窗口;
  • 落在每一个时间窗口内的请求就将计数器加1;
  • 如果计数器超过了限流阈值,则后续落在该窗口的请求都会被拒绝。但时间达到下一个时间窗口时,计数器会被重置为0。

计数器限流(固定窗口)

「案例:」

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: FixedWindow
 * @Description: 固定窗口算法
 **/
public class FixedWindow {

    /**
     * 阈值
     */
    private static Integer QPS = 2;
    
    /**
     * 时间窗口(毫秒)
     */
    private static long TIME_WINDOWS = 1000;
    
    /**
     * 计数器
     */
    private static AtomicInteger REQ_COUNT = new AtomicInteger();

    /**
     * 窗口开始时间
     */
    private static long START_TIME = System.currentTimeMillis();

    public synchronized static boolean tryAcquire() {
        //超时窗口
        if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) {
            REQ_COUNT.set(0);
            START_TIME = System.currentTimeMillis();
        }
        return REQ_COUNT.incrementAndGet() <= QPS;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(250);
            LocalTime now = LocalTime.now();
            if (!tryAcquire()) {
                System.out.println(now + " 被限流");
            } else {
                System.out.println(now + " 做点什么");
            }
        }
    }
}

「问题:」

虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次。

计数器限流(固定窗口)

滑动窗口

滑动窗口算法是对固定窗口算法的改进

滑动窗口

「原理:」

将单位时间划分为多个区间,一般都是均分为多个小的时间段;

每一个区间内都有一个计数器,有一个请求落在该区间内,则该区间内的计数器就会加一;

每过一个时间段,时间窗口就会往右滑动一格,抛弃最老的一个区间,并纳入新的一个区间;

计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器,计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。

滑动窗口

上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生「时间窗口的临界突变问题」

「代码案例:」

package com.example.studyproject.algorithm;

import lombok.Data;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: SlidingWindow
 * @Description: 滑动窗口
 **/
public class SlidingWindow {
    /**
     * 阈值
     */
    private int qps = 2;
    /**
     * 时间窗口总大小(毫秒)
     */
    private long windowSize = 1000;
    /**
     * 多少个子窗口
     */
    private Integer windowCount = 10;
    /**
     * 窗口列表
     */
    private WindowInfo[] windowArray = new WindowInfo[windowCount];

    public SlidingWindow(int qps) {
        this.qps = qps;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < windowArray.length; i++) {
            windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
        }
    }

    /**
     * 1. 计算当前时间窗口
     * 2. 更新当前窗口计数 & 重置过期窗口计数
     * 3. 当前 QPS 是否超过限制
     * @return 是否被限流
     */
    public synchronized boolean tryAcquire() {
        long currentTimeMillis = System.currentTimeMillis();
        // 1. 计算当前时间窗口
        int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
        // 2.  更新当前窗口计数 & 重置过期窗口计数
        int sum = 0;
        for (int i = 0; i < windowarray.length i windowinfo windowinfo='windowArray[i];' if currenttimemillis - windowinfo.gettime> windowSize) {
                windowInfo.getNumber().set(0);
                windowInfo.setTime(currentTimeMillis);
            }
            if (currentIndex == i && windowInfo.getNumber().get() < qps) {
                windowInfo.getNumber().incrementAndGet();
            }
            sum = sum + windowInfo.getNumber().get();
        }
        // 3. 当前 QPS 是否超过限制
        return sum <= qps;
    }

    @Data
    private class WindowInfo {
        // 窗口开始时间
        private Long time;
        // 计数器
        private AtomicInteger number;

        public WindowInfo(long time, AtomicInteger number) {
            this.time = time;
            this.number = number;
        }
        // get...set...
    }

    public static void main(String[] args) throws InterruptedException {
        int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
        System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));
        success = 0;
        SlidingWindow myRateLimiter = new SlidingWindow(qps);
        for (int i = 0; i < count; i++) {
            Thread.sleep(sleep);
            if (myRateLimiter.tryAcquire()) {
                success++;
                if (success % qps == 0) {
                    System.out.println(LocalTime.now() + ": success, ");
                } else {
                    System.out.print(LocalTime.now() + ": success, ");
                }
            } else {
                System.out.println(LocalTime.now() + ": fail");
            }
        }
        System.out.println();
        System.out.println("实际测试成功次数:" + success);
    }
}

输出结果:

已连接到目标 VM, 地址: ''127.0.0.1:50101',传输: '套接字''
当前QPS限制为:2,当前测试次数:20,间隔:300ms,预计成功次数:12
14:20:38.833: success, 14:20:39.142: success, 
14:20:39.455: success, 14:20:39.766: success, 
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success, 
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success, 
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success, 
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success, 

实际测试成功次数:12
与目标 VM 断开连接, 地址为: ''127.0.0.1:50101',传输: '套接字''

进程已结束,退出代码0

漏桶算法

漏桶算法

漏桶算法思路很简单,我们把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。

由介绍可以知道,漏桶模式中的消费处理总是能以恒定的速度进行,可以很好的「保护自身系统」不被突如其来的流量冲垮;但是这也是漏桶模式的缺点,假设 QPS 为 2,同时 2 个请求进来,2 个请求并不能同时进行处理响应,因为每 1s / 2= 500ms 只能处理一个请求。

「代码案例:」

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: LeakyBucket
 * @Description: 漏桶算法
 **/
public class LeakyBucket {

    /**
     * 水桶的大小
     */
    private final int bucket;

    /**
     * qps,水露出的速度
     */
    private int qps;

    /**
     * 当前水量
     */
    private long water;

    private long timeStamp = System.currentTimeMillis();

    public LeakyBucket(int bucket, int qps) {
        this.bucket = bucket;
        this.qps = qps;
    }

    /**
     * 桶是否已经满了
     * @return true未满
     */
    public boolean tryAcquire(){
        //1.计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp)/1000;
        water = Math.max(0,water-timeGap*qps);
        timeStamp = now;

        // 如果未满,放行
        if(water< bucket){
            water += 1;
            return true;
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService singleThread = Executors.newSingleThreadExecutor();

        LeakyBucket rateLimiter = new LeakyBucket(20, 2);
        // 存储流量的队列
        Queue queue = new LinkedList<>();
        // 模拟请求  不确定速率注水
        singleThread.execute(() -> {
            int count = 0;
            while (true) {
                count++;
                boolean flag = rateLimiter.tryAcquire();
                if (flag) {
                    queue.offer(count);
                    System.out.println(count + "--------流量被放行--------");
                } else {
                    System.out.println(count + "流量被限制");
                }
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模拟处理请求 固定速率漏水
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            if (!queue.isEmpty()) {
                System.out.println(queue.poll() + "被处理");
            }
        }, 0, 100, TimeUnit.MILLISECONDS);

        // 保证主线程不会退出
        while (true) {
            Thread.sleep(10000);
        }
    }

}

令牌桶算法

令牌桶算法

令牌桶算法的原理也比较简单,我们可以理解成医院的挂号看病,只有拿到号以后才可以进行诊病。

系统会维护一个令牌(token)桶,以一个恒定的速度往桶里放入令牌(token),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token),当桶里没有令牌(token)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。

「原理:」

  • 令牌桶的实现思路类似于生产者和消费之间的关系。
  • 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 2,每 500ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
  • 请求执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝本次请求,由此达到限流目的。

「思考:」

  • 1s / 阈值(QPS) = 令牌添加时间间隔。
  • 桶的容量等于限流的阈值,令牌数量达到阈值时,不再添加。
  • 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。
  • 有启动过程,令牌桶启动时桶中无令牌,然后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这类问题。

「代码案例」

// 使用Google封装的令牌桶RateLimiter   

/**
 * 代码中限制 QPS 为 2,也就是每隔 500ms 生成一个令牌,但是程序每隔 250ms 获取一次令牌,所以两次获取中只有一次会成功。
 */
public static void main(String[] args) throws InterruptedException {
    RateLimiter rateLimiter = RateLimiter.create(2);

    for (int i = 0; i < 10; i++) {
        String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        System.out.println(time + ":" + rateLimiter.tryAcquire());
        Thread.sleep(250);
    }
}

限流的实现

限流有很多种方法实现:

  • 基于Guava工具类实现限流
  • 基于AOP实现限流
  • 基于Redis实现限流(适用于分布式)
  • 使用Redisson实现限流(适用于分布式)
  • Sentinel限流(适用于分布式)
  • Nginx、Gateway限流.....(适用于分布式)

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: