如何实现延迟队列(JDK/mysql/redis/Rabbit)
mhr18 2024-11-27 12:09 23 浏览 0 评论
何为延迟队列
队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重的最小堆结构。
那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:
- 订单超时30分钟未付款将自动关闭
- 会议系统中,会议开始前10分钟,发送会议提醒
- 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
- 再比如微波炉、烤箱、等等
可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。
延迟队列实现方式
延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列和分布式延迟队列。
1. 单机实现
JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:
可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:
public class JDKDelayQueueTest {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DelayQueue<Order> DELAY_QUEUE = new DelayQueue<>();
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
EXECUTOR_SERVICE.submit(() -> {
while (true) {
if (!DELAY_QUEUE.isEmpty()) {
Order order = DELAY_QUEUE.poll();
if (order != null) {
System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
}
}
TimeUnit.MILLISECONDS.sleep(1000);
}
});
EXECUTOR_SERVICE.submit(() -> {
try {
DELAY_QUEUE.add(new Order("黄焖鸡订单"));
TimeUnit.SECONDS.sleep(5);
DELAY_QUEUE.add(new Order("麻辣香锅订单"));
TimeUnit.SECONDS.sleep(10);
DELAY_QUEUE.add(new Order("石锅拌饭订单"));
} catch (Exception e) {
}
});
}
public static class Order implements Delayed {
private final LocalDateTime expireTime;
private final String orderId;
public Order(String orderId) {
this.expireTime = LocalDateTime.now().plusSeconds(30);
this.orderId = orderId;
System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
}
@Override
public long getDelay(TimeUnit unit) {
return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
}
@Override
public int compareTo(Delayed targetOrder) {
// 谁的过期时间最早谁就排最前面
return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
}
public String getOrderId() {
return orderId;
}
public LocalDateTime getExpireTime() {
return expireTime;
}
}
}
输出:
黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15
DelayQueue实现方式小结
这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性,内存限制、无持久化机制,数据容易丢失。
分布式实现
2. 数据库轮询
数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:
1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行
2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中
3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:
select * from order where ${now} >= timeout limit ${start},100;
数据库轮询实现方式小结
采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。
3.通过Redis实现
Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。
那么如果通过Redis来实现延迟队列,大概会有如下几步:
1) 增加任务
zadd tasks ${过期时间戳} ${任务相关数据}
2)捞取任务
ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES
捞取过期时间早于当前时间的这部分任务
3)执行任务 接下来就是执行,这个就没什么好说的了
关于redis zset数据结构以及命令可以看这里:https://www.runoob.com/redis/redis-sorted-sets.html
一些优化点
1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能
2.每个key独自拥有一个线程处理
3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑
redis实现方式小结
redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点
4.使用消息队列
我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列
- RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
- lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能.
Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。
Dead Letter Exchange
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
- 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
- 消息因为设置了TTL而过期。
- 消息进入了一条已经达到最大长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。
不同实现方式的对比
实现方式 | 复杂度 | 数据量 | 持久化,数据丢失 | 扩展性 | 实时性 |
jdk DelayQueue | 简单 | 由于程序内存限制,适用于少数据量 | 无持久化 | 差 | 高 |
mysql 轮询 | 稍微复杂 | 可支持大数据量 | 可保证持久化,保证任务不丢失 | 可扩展 | 由于查询开销,稍弱 |
redis zet | 稍微复杂 | 可支持大数据量 | 可尽量保证持久化,不保证任务不丢失 | 可扩展 | 高 |
RabbitMQ | 稍微复杂 | 可支持大数据量 | 可保证持久化,保证任务不丢失 | 可扩展 | 高 |
结语
除了以上实现方式,还有其他比如通过Rabbit MQ的TTL和死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。
相关推荐
- Spring Boot 分布式事务实现简单得超乎想象
-
环境:SpringBoot2.7.18+Atomikos4.x+MySQL5.71.简介关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。本篇文章将基于SpringBoot...
- Qt编写可视化大屏电子看板系统15-曲线面积图
-
##一、前言曲线面积图其实就是在曲线图上增加了颜色填充,单纯的曲线可能就只有线条以及数据点,面积图则需要从坐标轴的左下角和右下角联合曲线形成完整的封闭区域路径,然后对这个路径进行颜色填充,为了更美观...
- Doris大数据AI可视化管理工具SelectDB Studio重磅发布!
-
一、初识SelectDBStudioSelectDBStudio是专为ApacheDoris湖仓一体典型场景实战及其兼容数据库量身打造的GUI工具,简化数据开发与管理。二、Select...
- RAD Studio 、Delphi或C++Builder设计代码编译上线缩短开发时间
-
#春日生活打卡季#本月,Embarcadero宣布RADStudio12.3Athens以及Delphi12.3和C++Builder12.3,提供下载。RADStudio12.3A...
- Mybatis Plus框架学习指南-第三节内容
-
自动填充字段基本概念MyBatis-Plus提供了一个便捷的自动填充功能,用于在插入或更新数据时自动填充某些字段,如创建时间、更新时间等。原理自动填充功能通过实现com.baomidou.myba...
- 「数据库」Sysbench 数据库压力测试工具
-
sysbench是一个开源的、模块化的、跨平台的多线程性能测试工具,可以用来进行CPU、内存、磁盘I/O、线程、数据库的性能测试。目前支持的数据库有MySQL、Oracle和PostgreSQL。以...
- 如何选择适合公司的ERP(选erp系统的经验之谈)
-
很多中小公司想搞ERP,但不得要领。上ERP的目的都是歪的,如提高效率,减少人员,堵住财务漏洞等等。真正用ERP的目的是借机提升企业管理能力,找出管理上的问题并解决,使企业管理更规范以及标准化。上ER...
- Manus放开注册,但Flowith才是Agent领域真正的yyds
-
大家好,我是运营黑客。前天,AIAgent领域的当红炸子鸡—Manus宣布全面放开注册,终于,不需要邀请码就能体验了。于是,赶紧找了个小号去确认一下。然后,额……就被墙在了外面。官方解释:中文版...
- 歌浓酒庄总酿酒师:我们有最好的葡萄园和最棒的酿酒师
-
中新网1月23日电1月18日,张裕董事长周洪江及总经理孙健一行在澳大利亚阿德莱德,完成了歌浓酒庄股权交割签约仪式,这也意味着张裕全球布局基本成型。歌浓:澳大利亚年度最佳酒庄据悉,此次张裕收购的...
- 软件测试进阶之自动化测试——python+appium实例
-
扼要:1、了解python+appium进行APP的自动化测试实例;2、能根据实例进行实训操作;本课程主要讲述用python+appium对APP进行UI自动化测试的例子。appium支持Androi...
- 为什么说Python是最伟大的语言?看图就知道了
-
来源:麦叔编程作者:麦叔测试一下你的分析能力,直接上图,自己判断一下为什么Python是最好的语言?1.有图有真相Java之父-JamesGoshlingC++之父-BjarneStrou...
- 如何在Eclipse中配置Python开发环境?
-
Eclipse是著名的跨平台集成开发环境(IDE),最初主要用来Java语言开发。但是我们通过安装不同的插件Eclipse可以支持不同的计算机语言。比如说,我们可以通过安装PyDev插件,使Eclip...
- 联合国岗位上新啦(联合国的岗位)
-
联合国人权事务高级专员办事处PostingTitleIntern-HumanRightsDutyStationBANGKOKDeadlineOct7,2025CategoryandL...
- 一周安全漫谈丨工信部:拟定超1亿条一般数据泄露属后果严重情节
-
工信部:拟定超1亿条一般数据泄露属后果严重情节11月23日,工信部官网公布《工业和信息化领域数据安全行政处罚裁量指引(试行)(征求意见稿)》。《裁量指引》征求意见稿明确了行政处罚由违法行为发生地管辖、...
- oracle列转行以及C#执行语句时报错问题
-
oracle列转行的关键字:UNPIVOT,经常查到的怎么样转一列,多列怎么转呢,直接上代码(sshwomeyourcode):SELECTsee_no,diag_no,diag_code,...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)