百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

可能要用心学高并发核心编程,限流原理与实战,分布式令牌桶限流

mhr18 2024-11-27 12:00 26 浏览 0 评论

实战:分布式令牌桶限流

本节介绍的分布式令牌桶限流通过Lua+Java结合完成,首先在Lua脚本中完成限流的计算,然后在Java代码中进行组织和调用。

分布式令牌桶限流Lua脚本

分布式令牌桶限流Lua脚本的核心逻辑和Java令牌桶的执行逻辑类似,只是限流计算相关的统计和时间数据存放于Redis中。

这里将限流的脚本命名为rate_limiter.lua,该脚本既使用Redis存储令牌桶信息,自身又执行于Redis中,所以笔者将该脚本放置于base-redis基础模块中,它的代码如下:

---此脚本的环境:redis内部,不是运行在Nginx内部
---方法:申请令牌
----1:failed
---1:success
---@param key:key限流关键字
---@param apply:申请的令牌数量
local function acquire(key, apply)
 local times = redis.call('TIME');
 --times[1] 秒数 --times[2] 微秒数
 local curr_mill_second = times[1] *1000000 + times[2];
 curr_mill_second = curr_mill_second / 1000;
 local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
 ---局部变量:上次申请的时间
 local last_mill_second = cacheInfo[1];
 ---局部变量:之前的令牌数
 local curr_permits = tonumber(cacheInfo[2]);
 ---局部变量:桶的容量
 local max_permits = tonumber(cacheInfo[3]);
 ---局部变量:令牌的发放速率
 local rate = cacheInfo[4];
 ---局部变量:本次的令牌数
 local local_curr_permits = max_permits;
 if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
 --计算时间段内的令牌数
 local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) *rate);
 --令牌总数
 local expect_curr_permits = reverse_permits + curr_permits;
 --可以申请的令牌总数
 local_curr_permits = math.min(expect_curr_permits, max_permits);
 else
 --第一次获取令牌
 redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
 end
 local result = -1;
 --有足够的令牌可以申请
 if (local_curr_permits - apply >= 0) then
 --保存剩余的令牌
 redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);
 --保存时间,下次令牌获取时使用
 redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
 --返回令牌获取成功
 result = 1;
 else
 --保存令牌总数
 redis.pcall("HSET", key, "curr_permits", local_curr_permits);
 --返回令牌获取失败
 result = -1;
 end
 return result
end
---方法:初始化限流器
---1 success
---@param key key
---@param max_permits 桶的容量
---@param rate 令牌的发放速率
local function init(key, max_permits, rate)
 local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
 local org_max_permits = tonumber(rate_limit_info[3])
 local org_rate = rate_limit_info[4]
 if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
 redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)
 end
 return 1;
end
---方法:删除限流Key
local function delete(key)
 redis.pcall("DEL", key) return 1;
end
local key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' then
 return acquire(key, ARGV[2], ARGV[3])
elseif method == 'init' then
 return init(key, ARGV[2], ARGV[3])
elseif method == 'delete' then
 return delete(key)
else
 --ignore
end

该脚本有3个方法,其中两个方法比较重要,分别说明如下:

(1)限流器初始化方法init(key,max_permits,rate),此方法在限流开始时被调用。

(2)限流检测的方法acquire(key,apply),此方法在请求到来时被调用。

Java分布式令牌桶限流

rate_limiter.lua脚本既可以在Java中调用,又可以在Nginx中调用。本小节先介绍其在Java中的使用,第10章再介绍其在Nginx中的使用。

Java分布式令牌桶限流器的实现就是通过Java代码向Redis加载rate_limiter.lua脚本,然后封装其令牌桶初始化方法init(...)和限流监测方法acquire(...),以供外部调用。它的代码如下:

package com.crazymaker.springcloud.standard.ratelimit;
...
/**
 *实现:令牌桶限流服务
 *create by尼恩 @ 疯狂创客圈
 **/
@Slf4j
public class RedisRateLimitImpl implements RateLimitService, InitializingBean
{
 /**
 *限流器的redis key前缀
 */
 private static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter:";
//private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
 private RedisRateLimitProperties redisRateLimitProperties;
 private RedisTemplate redisTemplate;
 //lua脚本的实例
 private static RedisScript<Long> rateLimiterScript = null;
 //lua脚本的类路径
 private static String rateLimitLua = "script/rate_limiter.lua";
 static
 {
 //从类路径文件中加载令牌桶lua脚本
 String script = IOUtil.loadJarFile(RedisRateLimitImpl.class.getClassLoader(), rateLimitLua);
 if (StringUtils.isEmpty(script))
 {
 log.error("lua script load failed:" + rateLimitLua);
 } else
 {
 //创建Lua脚本实例
 rateLimiterScript = new DefaultRedisScript<>(script, Long.class);
 }
 }
 public RedisRateLimitImpl(
 RedisRateLimitProperties redisRateLimitProperties,
 RedisTemplate redisTemplate)
 {
 this.redisRateLimitProperties = redisRateLimitProperties;
 this.redisTemplate = redisTemplate;
 }
 private Map<String, LimiterInfo> limiterInfoMap = new HashMap<>();
 /**
 *限流器的信息
 */
 @Builder
 @Data
 public static class LimiterInfo
 {
 /**
 *限流器的key,如秒杀的id
 */ private String key;
 /**
 *限流器的类型,如seckill
 */
 private String type = "default";
 /**
 *限流器的最大桶容量
 */
 private Integer maxPermits;
 /**
 *限流器的速率
 */
 private Integer rate;
 /**
 *限流器的redis key
 */
 public String fullKey()
 {
 return RATE_LIMITER_KEY_PREFIX + type + ":" + key;
 }
 /**
 *限流器在map中的缓存key
 */
 public String cashKey()
 {
 return type + ":" + key;
 }
 }
 /**
 *限流检测:是否超过redis令牌桶限速器的限制
 *
 *@param cacheKey计数器的key
 *@return true or false
 */
 @Override
 public Boolean tryAcquire(String cacheKey)
 {
 if (cacheKey == null)
 {
 return true;
 }
 if (cacheKey.indexOf(":") <= 0)
 {
 cacheKey = "default:" + cacheKey;
 }
 LimiterInfo limiterInfo = limiterInfoMap.get(cacheKey);
 if (limiterInfo == null)
 {
 return true;
 }
 Long acquire = (Long) redisTemplate.execute(rateLimiterScript,
 ImmutableList.of(limiterInfo.fullKey()),
 "acquire",
 "1");
 if (acquire == 1)
 {
 return false;
 }
 return true;
 }
 /**
 *重载方法:限流器初始化
 *
 *@param limiterInfo限流的类型
 */
 public void initLimitKey(LimiterInfo limiterInfo)
 {
 if (null == rateLimiterScript)
 {
 return;
 }
 String maxPermits = limiterInfo.getMaxPermits().toString();
 String rate = limiterInfo.getRate().toString();
 //执行redis脚本
 Long result = (Long) redisTemplate.execute(rateLimiterScript,
 ImmutableList.of(limiterInfo.fullKey()),
 "init",
 maxPermits,
 rate); limiterInfoMap.put(limiterInfo.cashKey(), limiterInfo);
 }
 /**
 *限流器初始化
 *
 *@param type类型
 *@param key id
 *@param maxPermits上限
 *@param rate 速度
 */
 public void initLimitKey(String type, String key,
 Integer maxPermits, Integer rate)
 {
 LimiterInfo limiterInfo = LimiterInfo.builder()
 .type(type)
 .key(key)
 .maxPermits(maxPermits)
 .rate(rate)
 .build();
 initLimitKey(limiterInfo);
 }
 /**
 *获取redis lua脚本的sha1编码,并缓存到redis
 */
 public String cacheSha1()
 {
 String sha1 = rateLimiterScript.getSha1();
 redisTemplate.opsForValue().set("lua:sha1:rate_limiter", sha1);
 return sha1;
 }
}

Java分布式令牌桶限流的自验证

自验证的工作:首先初始化分布式令牌桶限流器,然后使用两条

线程不断进行限流的检测。自验证的代码如下:

package com.crazymaker.springcloud.ratelimit;
...
@Slf4j
@RunWith(SpringRunner.class)
//指定启动类
@SpringBootTest(classes = {DemoCloudApplication.class})
/**
 *redis分布式令牌桶测试类
 */
public class RedisRateLimitTest
{
 @Resource(name = "redisRateLimitImpl")
 RedisRateLimitImpl limitService;
 //线程池,用于多线程模拟测试
 private ExecutorService pool = Executors.newFixedThreadPool(10);
 @Test
 public void testRedisRateLimit()
 {
 //初始化分布式令牌桶限流器
 limitService.initLimitKey(
 "seckill", //redis key中的类型
 "10000", //redis key中的业务key,比如商品id
 2, //桶容量
 2); //每秒令牌数
 AtomicInteger count = new AtomicInteger();
 long start = System.currentTimeMillis();
 //线程数
 final int threads = 2;
 //每条线程的执行轮数
 final int turns = 20;
 //同步器
 CountDownLatch countDownLatch = new CountDownLatch(threads);
 for (int i = 0; i < threads; i++)
 {
 pool.submit(() ->
 {
 try
 {
 //每个用户访问turns次
 for (int j = 0; j < turns; j++)
 {
 boolean limited = limitService.tryAcquire
 ("seckill:10000");
 if (limited)
 {
 count.getAndIncrement();
 }
 Thread.sleep(200);
 }
 } catch (Exception e)
 { e.printStackTrace();
 }
 countDownLatch.countDown();
 });
 }
 try
 {
 countDownLatch.await();
 } catch (InterruptedException e)
 {
 e.printStackTrace();
 }
 float time = (System.currentTimeMillis() - start) / 1000F;
 //输出统计结果
 log.info("限制的次数为:" + count.get() + " 时长为:" + time);
 log.info("限制的次数为:" + count.get() +
 ",通过的次数为:" + (threads *turns - count.get()));
 log.info("限制的比例为:" +
 (float) count.get() / (float) (threads *turns));
 log.info("运行的时长为:" + time);
 try
 {
 Thread.sleep(Integer.MAX_VALUE);
 } catch (InterruptedException e)
 {
 e.printStackTrace();
 }
 }
}

两条线程各运行20次,每一次运行休眠200毫秒,总计耗时4秒,

运行40次,部分输出结果如下:

[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次数为:32 时长为:4.015
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次数为:32,通过的次数为:8
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的比例为:0.8
[main] INFO c.c.s.r.RedisRateLimitTest - 运行的时长为:4.015

大家可以自行调整参数,运行以上自验证程序并观察实验结果,体验一下分布式令牌桶限流的效果。

本文给大家讲解的内容是高并发核心编程,限流原理与实战,实战:分布式令牌桶限流

  1. 下篇文章给大家讲解的是高并发核心编程,Spring Cloud+Nginx秒杀实战;
  2. 觉得文章不错的朋友可以转发此文关注小编;
  3. 感谢大家的支持!

相关推荐

Spring Boot 分布式事务实现简单得超乎想象

环境:SpringBoot2.7.18+Atomikos4.x+MySQL5.71.简介关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。本篇文章将基于SpringBoot...

Qt编写可视化大屏电子看板系统15-曲线面积图

##一、前言曲线面积图其实就是在曲线图上增加了颜色填充,单纯的曲线可能就只有线条以及数据点,面积图则需要从坐标轴的左下角和右下角联合曲线形成完整的封闭区域路径,然后对这个路径进行颜色填充,为了更美观...

Doris大数据AI可视化管理工具SelectDB Studio重磅发布!

一、初识SelectDBStudioSelectDBStudio是专为ApacheDoris湖仓一体典型场景实战及其兼容数据库量身打造的GUI工具,简化数据开发与管理。二、Select...

RAD Studio 、Delphi或C++Builder设计代码编译上线缩短开发时间

#春日生活打卡季#本月,Embarcadero宣布RADStudio12.3Athens以及Delphi12.3和C++Builder12.3,提供下载。RADStudio12.3A...

Mybatis Plus框架学习指南-第三节内容

自动填充字段基本概念MyBatis-Plus提供了一个便捷的自动填充功能,用于在插入或更新数据时自动填充某些字段,如创建时间、更新时间等。原理自动填充功能通过实现com.baomidou.myba...

「数据库」Sysbench 数据库压力测试工具

sysbench是一个开源的、模块化的、跨平台的多线程性能测试工具,可以用来进行CPU、内存、磁盘I/O、线程、数据库的性能测试。目前支持的数据库有MySQL、Oracle和PostgreSQL。以...

如何选择适合公司的ERP(选erp系统的经验之谈)

很多中小公司想搞ERP,但不得要领。上ERP的目的都是歪的,如提高效率,减少人员,堵住财务漏洞等等。真正用ERP的目的是借机提升企业管理能力,找出管理上的问题并解决,使企业管理更规范以及标准化。上ER...

Manus放开注册,但Flowith才是Agent领域真正的yyds

大家好,我是运营黑客。前天,AIAgent领域的当红炸子鸡—Manus宣布全面放开注册,终于,不需要邀请码就能体验了。于是,赶紧找了个小号去确认一下。然后,额……就被墙在了外面。官方解释:中文版...

歌浓酒庄总酿酒师:我们有最好的葡萄园和最棒的酿酒师

中新网1月23日电1月18日,张裕董事长周洪江及总经理孙健一行在澳大利亚阿德莱德,完成了歌浓酒庄股权交割签约仪式,这也意味着张裕全球布局基本成型。歌浓:澳大利亚年度最佳酒庄据悉,此次张裕收购的...

软件测试进阶之自动化测试——python+appium实例

扼要:1、了解python+appium进行APP的自动化测试实例;2、能根据实例进行实训操作;本课程主要讲述用python+appium对APP进行UI自动化测试的例子。appium支持Androi...

为什么说Python是最伟大的语言?看图就知道了

来源:麦叔编程作者:麦叔测试一下你的分析能力,直接上图,自己判断一下为什么Python是最好的语言?1.有图有真相Java之父-JamesGoshlingC++之父-BjarneStrou...

如何在Eclipse中配置Python开发环境?

Eclipse是著名的跨平台集成开发环境(IDE),最初主要用来Java语言开发。但是我们通过安装不同的插件Eclipse可以支持不同的计算机语言。比如说,我们可以通过安装PyDev插件,使Eclip...

联合国岗位上新啦(联合国的岗位)

联合国人权事务高级专员办事处PostingTitleIntern-HumanRightsDutyStationBANGKOKDeadlineOct7,2025CategoryandL...

一周安全漫谈丨工信部:拟定超1亿条一般数据泄露属后果严重情节

工信部:拟定超1亿条一般数据泄露属后果严重情节11月23日,工信部官网公布《工业和信息化领域数据安全行政处罚裁量指引(试行)(征求意见稿)》。《裁量指引》征求意见稿明确了行政处罚由违法行为发生地管辖、...

oracle列转行以及C#执行语句时报错问题

oracle列转行的关键字:UNPIVOT,经常查到的怎么样转一列,多列怎么转呢,直接上代码(sshwomeyourcode):SELECTsee_no,diag_no,diag_code,...

取消回复欢迎 发表评论: