百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何实现延迟队列(JDK/mysql/redis/Rabbit)

mhr18 2024-11-27 12:09 16 浏览 0 评论

何为延迟队列

队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重最小堆结构

那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:

  • 订单超时30分钟未付款将自动关闭
  • 会议系统中,会议开始前10分钟,发送会议提醒
  • 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
  • 再比如微波炉、烤箱、等等

可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。

延迟队列实现方式

延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列分布式延迟队列

1. 单机实现

JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:

可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue<Order> DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黄焖鸡订单"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香锅订单"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石锅拌饭订单"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 谁的过期时间最早谁就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}

输出:

黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15

DelayQueue实现方式小结

这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性内存限制无持久化机制,数据容易丢失。


分布式实现

2. 数据库轮询

数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:

1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行

2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中

3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;

数据库轮询实现方式小结

采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。

3.通过Redis实现

Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。

那么如果通过Redis来实现延迟队列,大概会有如下几步:

1) 增加任务

zadd tasks ${过期时间戳} ${任务相关数据}

2)捞取任务

ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES

捞取过期时间早于当前时间的这部分任务

3)执行任务 接下来就是执行,这个就没什么好说的了

关于redis zset数据结构以及命令可以看这里:https://www.runoob.com/redis/redis-sorted-sets.html

一些优化点

1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能

2.每个key独自拥有一个线程处理

3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑

redis实现方式小结

redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点


4.使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能.

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

不同实现方式的对比

实现方式

复杂度

数据量

持久化,数据丢失

扩展性

实时性

jdk DelayQueue

简单

由于程序内存限制,适用于少数据量

无持久化

mysql 轮询

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

由于查询开销,稍弱

redis zet

稍微复杂

可支持大数据量

可尽量保证持久化,不保证任务不丢失

可扩展

RabbitMQ

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

结语

除了以上实现方式,还有其他比如通过Rabbit MQTTL死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。

相关推荐

B站收藏视频失效?mybili 收藏夹备份神器完整部署指南

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...

中间件推荐初始化配置

Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...

Redis中缓存穿透问题与解决方法

缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...

后端开发必看!Redis 哨兵机制如何保障系统高可用?

你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...

Redis合集-大Key处理建议

以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...

深入解析跳跃表:Redis里的&quot;老六&quot;数据结构,专治各种不服

大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...

Redis 中 AOF 持久化技术原理全解析,看完你就懂了!

你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...

Redis合集-必备的几款运维工具

Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...

别再纠结线程池大小 + 线程数量了,没有固定公式的!

我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...

网络编程—IO多路复用详解

假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...

5分钟学会C/C++多线程编程进程和线程

前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...

尽情阅读,技术进阶,详解mmap的原理

1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...

C++11多线程知识点总结

一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...

微服务高可用的2个关键技巧,你一定用得上

概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...

Java线程间如何共享与传递数据

1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...

取消回复欢迎 发表评论: