延迟任务的多种实现方案(延迟机制)
mhr18 2025-05-10 23:35 4 浏览 0 评论
场景
- 订单超时自动取消:延迟任务典型的使用场景是订单超时自动取消。
功能
- 精确的时间控制:延时任务的时间控制要尽量准确。
- 可靠性:延时任务的处理要是可靠的,确保所有任务最终都能被执行。这通常要求延时任务的方案使用的中间件最好要具备任务持久化的能力,以防系统故障导致任务丢失。
方案
1. 基于消息队列
1.1 RabbitMQ TTL + 死信队列
原理
RabbitMQ 可以针对 Queue 和 Message 设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为 dead letter。 RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了 dead letter,则按照这两个参数重新路由。
两个概念
- TTL:即消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL,如果对队列设置,则队列中所有的消息都具有相同的过期时间。超过了这个时间,可以认为这个消息就死了,称之为死信。
- DLX(死信Exchange):一个消息在满足以下条件会进入死信交换机:
(1)一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)TTL到期的消息。
(3)队列满了被丢弃的消息。
流程图
(1)定义一个BizQueue,用来接收死信消息,多个消费者进行业务消费。
(2)定义一个死信交换机(DLXExchange),绑定BizQueue,接收延时队列的消息,并转发给BizQueue。
(3)定义一组延时队列DelayQueue_xx,分别配置不同的TTL,用来处理固定延时5s、10s、30s等延时等级,并绑定到DLXExchange。
(4)定义DelayExchange,用来接收业务发过来的延时消息,并根据延时时间转发到不同的延时队列中。
优缺点
- 优点可以支持海量延时消息支持分布式处理。
- 缺点不灵活,只能支持固定延时等级。使用复杂,要配置一堆延时队列。
1.2 RabbitMQ 延迟队列插件(*推荐)
原理
- 延迟交换类型:该插件引入了一个新的交换类型 - x-delayed-message。这种类型的交换机类似于常规的直接(direct)、主题(topic)或扇形(fanout)交换机,但它增加了对消息延迟的支持。 参数x-delayed-type 定义了分发方式,可以是:direct、topic、fanout
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
- 消息延迟属性:当生产者发送消息到一个延迟类型的交换机时,它可以在消息的头部信息中指定一个延迟时间(通常是以毫秒为单位的)。这是通过设置一个特殊的属性 x-delay来实现的。 如果x-delay未设置,则消息没有延迟。
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
- 内部存储和调度:当延迟交换机接收到一个带有延迟的消息时,它不会立即将消息路由到绑定的队列中。相反,它会将消息存储在内部的数据结构(Mnesia 数据库,分布式数据库管理系统))中,并根据其延迟时间进行调度。
- 延迟过期:一旦消息的延迟时间到达,交换机就会将消息路由到适当的队列。从此刻起,消息就变成了可供消费者正常消费的状态。
- 时间精度:延迟的精度和实际的延迟时间可能受到系统负载和RabbitMQ服务器的性能限制。
地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
优缺点
- 优点RabbitMQ消息服务可靠性高,消息处理速度快支持大数据量支持分布式横向扩展方便。
- 缺点:最大支持延时时间约为49天,超过这个时间的消息会被立即消费;超过百万的数据量不要使用(和RabbitMQ自身有关),内存和CPU使用量急剧上升;需要确保版本兼容,不要使用过高版本的容易出现问题;
1.3 RocketMQ 定时消息(*推荐)
RocketMQ 4.x 只支持固定时长(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)的延时消息
RocketMQ 5.x 支持任意时长的的延时消息,推荐使用
原理
(1)在RocketMQ中,使用了经典的时间轮算法。通过TimerWheel来描述时间轮不同的时刻,通过TimerLog来记录不同时刻的消息。
(2)TimerWheel中的每一格代表着一个时刻,同时会有一个firstPos指向这个刻度下所有定时消息的首条TimerLog记录的地址,一个lastPos指向这个刻度下所有定时消息最后一条TimerLog的记录的地址。并且,对于所处于同一个刻度的的消息,其TimerLog会通过prevPos串联成一个链表。
(3)当需要新增一条记录的时候,例如现在我们要新增一个 “1-4”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “1-3”,然后修改 lastPos 指向 “1-4”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。
优缺点
- 优点:精度高,支持任意时刻。使用门槛低,和使用普通消息一样。
- 缺点:时长的使用限制: 定时和延时消息的msg.setStartDeliverTime参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。海量 消息场景,存储成本高: 在海量订单场景中,如果每个订单需要新增一个定时消息,且不会马上消费,额外给MQ带来很大的存储成本。如果将大量定时消息的定时时间设置为同一时刻,会造成系统压力过大,导致消息分发延迟,影响定时精度。
2. 基于Redis
2.1 Redis Key过期监听
用 Redis 的 Keyspace Notifications。中文翻译就是键空间机制,就是利用该机制可以在 key 失效之后,提供一个回调,实际上是 Redis 会给客户端发送一个消息。是需要 Redis 版本 2.8 以上。
Redis过期通知也是不可靠的,不建议使用。
2.2 定时轮询 zset
利用 Redis 的 zset。zset 是一个有序集合,每一个元素(member)都关联了一个 score,通过 score 排序来取集合中的值。
原理
- 添加延时消息
//延迟3秒
Calendar cal1 = Calendar.getInstance();
cal1.add(Calendar.SECOND, 3);
int second3later = (int) (cal1.getTimeInMillis() / 1000);
AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i);
- 定时轮询消费消息
Jedis jedis = AppTest.getJedis();
while(true){
Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
if(items == null || items.isEmpty()){
System.out.println("当前没有等待的任务");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
continue;
}
int score = (int) ((Tuple)items.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if(nowSecond >= score){
String orderId = ((Tuple)items.toArray()[0]).getElement();
Long num = jedis.zrem("OrderId", orderId);
if( num != null && num>0){
consumeMsg(orderId);
}
}
}
- 注意此处代码,在并发场景下,删除Key,要判断是否真正删除成功。否则会造成消息的重复消费。
Long num = jedis.zrem("OrderId", orderId);
if( num != null && num>0){
consumeMsg(orderId);
}
- 消息处理
void consumeMsg(orderId){
//具体业务逻辑
}
优缺点
- 优点
- 实现简单,redis内存操作,速度快,性能高,集群扩展方便
- 可存储大量订单数据,持久化机制使得故障时通过AOF或RDB方式恢复
- 适合对延迟精度要求不高的业务场景
- 缺点
- 轮询线程如果不带休眠或休眠时间短,可能导致空轮询,CPU飙高,带休眠时间,休眠多久不好评估,休眠时间过长可能导致延迟不准确。
- 处理消息异常时可能要实现重试机制
- 可靠性问题,比如是先删数据在处理订单还是先处理订单再删除数据,处理异常时可能会导致数据丢失。
2.3 Redisson 分布式延迟队列 RDelayedQueue (*推荐)
原理
- 数据结构
Redisson在实现延时队列的过程中用到以下数据结构:
key | value | 类型 | 用处 |
redisson_delay_queue_timeout:{target_queue} | 任务 和 到期时间 | zset | 消息延时排序功能 |
redisson_delay_queue:{target_queue} | 任务的原始顺序存储 | list | 如查询、删除指定任务 |
redisson_delay_queue_channel:{target_queue} | 最新消息的到期时间 | 发布订阅 | 通知客户端启动同步数据的任务,该任务来获取过期任务存放到target_queue中 |
target_queue | 过期的任务 | list | 获取具体的过期任务 |
- 流程图
在整个流程图,主要有三个重要角色,Redisson客户端、生产者、消费者。Redis的核心操作都是通过脚本的方式执行Redis命令,确保Redis操作的原子性。
(1)生产者
生产者发布任务时,调用Redis命令,将Task添加到
redisson_delay_queue_timeout:{target_queue}、redisson_delay_queue:{target_queue}。
同时判断
redisson_delay_queue_timeout:{target_queue}中的最新任务是不是自己添加及你去的任务,如果是,则向
redisson_delay_queue_channel:{target_queue}发布最新的 过期时间。
核心Redis操作脚本代码如下:
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
(2)Redisson客户端
无论是生产者和还是消费者,都是Redisson的客户端,客户端在启动的时候,会订阅
redisson_delay_queue_channel:{target_queue}。
通过订阅该Topic,会收到订阅成功的通知和新消息到达的通知,这两个通知中会做两个很重要的事情:定时任务和同步任务。
- 同步任务(pushTaskAsync)
- 使用Redis命令zrangebyscore,取zset的score小于当前时间的最新100条,复制给 expiredValues
- 判断expiredValues 是否有数据,有数据,说明有到期数据,则执行具体任务同步的操作,将过期的任务从 redisson_delay_queue_timeout:{target_queue} 和 redisson_delay_queue:{target_queue} 中移除,添加到 target_queue中。
- 任务同步完成后,取 redisson_delay_queue_timeout:{target_queue}中最新的Task,将该Task的过期时间发布到Topic中 redisson_delay_queue_channel:{target_queue}
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
- 定时任务(scheduleTask)
- 得到最新消息过期时间 startTime
- 计算 delay时间,delay = startTime - System.currentTimeMillis()
- 如果 delay <= 10 则执行同步任务
- 如果 delay >10 则 开启一个定时任务,定时任务时间到了之后,执行同步任务
long delay = startTime - System.currentTimeMillis();
if (delay > 10) {
Timeout timeout = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();
TimeoutTask currentTimeout = lastTimeout.get();
if (currentTimeout.getTask() == timeout) {
lastTimeout.compareAndSet(currentTimeout, null);
}
}
}, delay, TimeUnit.MILLISECONDS);
if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
timeout.cancel();
}
} else {
pushTask();
}
(3)消费者
消费者启动一个线程,循环从 target_queue 获取任务,如果任务队列为空,则会阻塞,知道有数据时返回 。
- 阻塞方式从 target_queue 获取任务,使用Redis命令 bpop
- 获取到任务之后,做具体的任务执行
3. 定时轮询
用分布式任务中间件开启一个调度任务,某个一定的时间段执行一次任务,从数据库中获取过期数据,然后执行具体的业务逻辑。
- 适合场景:时间精度要求不高、数据量不大
- 常用的一些分布式任务调度中间件:XXL-JobElastic-Job 优缺点
- 优点:这个方案的优点也是比较简单,实现起来比较容易
- 缺点:时间不精准、无法处理大订单量、对数据库造成压力、分库分表问题。
4. 内存队列
4.1 JDK DelayQueue
DelayQueue 是一个 BlockingQueue (无界阻塞)队列,它本质就是封装了一个 PriorityQueue (优先级队列),并加上了延时功能。 DelayQueue 就是一个使用优先队列(PriorityQueue)实现的 BlockingQueue,优先队列的比较基准值是时间。即:
DelayQueue = BlockingQueue + PriorityQueue + Delayed
使用比较简单
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
DelayedTask task1 = new DelayedTask("data 5 seconds", 5L);
DelayedTask task2 = new DelayedTask("data 1 seconds", 1L);
DelayedTask task3 = new DelayedTask("data 3 seconds", 3L);
// 2.向队列中添加延迟执行的任务
delayQueue.put(task1);
delayQueue.put(task2);
delayQueue.put(task3);
// 3.尝试执行任务
log.info("消息发送完成");
// 如果队列中没有到期的元素,take会阻塞等待
while (true) {
DelayedTask task = delayQueue.take();
log.info("获取延迟任务:task:{}", task.getData());
}
DelayQueue 实现了一个高效的本地延时队列, 但是缺点就是 不支持多节点部署,多节点部署时,不能同步消息,同步消费,也不能持久化
5.时间轮算法
5.1 Netty的HashedWheelTimer
5.2 Kafka的TimingWheel
工具
推荐
- 1.2 RabbitMQ 延迟队列插件
- 1.3 RocketMQ 定时消息
- 2.3 Redisson 分布式延迟队列 RDelayedQueue
ArchManual 分布式系统整体架构手册 概览图
高清图片在线地址:
https://www.processon.com/view/link/65b74ca5ec61176de3760fc3
相关推荐
- Docker安装详细步骤及相关环境安装配置
-
最近自己在虚拟机上搭建一个docker,将项目运行在虚拟机中。需要提前准备的工具,FinallShell(远程链接工具),VM(虚拟机-配置网络)、CentOS7(Linux操作系统-在虚拟机上安装)...
- Linux下安装常用软件都有哪些?做了一个汇总列表,你看还缺啥?
-
1.安装列表MySQL5.7.11Java1.8ApacheMaven3.6+tomcat8.5gitRedisNginxpythondocker2.安装mysql1.拷贝mysql安装文件到...
- Nginx安装和使用指南详细讲解(nginx1.20安装)
-
Nginx安装和使用指南安装1.检查并安装所需的依赖软件1).gcc:nginx编译依赖gcc环境安装命令:yuminstallgcc-c++2).pcre:(PerlCompatibleRe...
- docker之安装部署Harbor(docker安装hacs)
-
在现代软件开发和部署环境中,Harbor作为一个企业级的容器镜像仓库,提供了高效、安全的镜像管理解决方案。通过Docker部署Harbor,可以轻松构建私有镜像仓库,满足企业对镜像存储、管理和安全性...
- 成功安装 Magento2.4.3最新版教程「技术干货」
-
外贸独立站设计公司xingbell.com经过多次的反复实验,最新版的magento2.4.3在oneinstack的环境下的详细安装教程如下:一.vps系统:LinuxCentOS7.7.19...
- 【Linux】——从0到1的学习,让你熟练掌握,带你玩转Linu
-
学习Linux并掌握Java环境配置及SpringBoot项目部署是一个系统化的过程,以下是从零开始的详细指南,帮助你逐步掌握这些技能。一、Linux基础入门1.安装Linux系统选择发行版:推荐...
- cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务
-
cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务(yum选择的,时间不同,版本不同)如果对运维课程感兴趣,可以在b站上搜索我的账号:运维实战课程,可以关注我,学习更多免费的运...
- 时隔三月,参加2020秋招散招,终拿字节跳动后端开发意向书.
-
3个月前头条正式批笔试4道编程题只AC了2道,然后被刷了做了200多道还是太菜了,本来对字节不抱太大希望,毕竟后台竞争太大,而且字节招客户端开发比较多。后来看到有散招免笔试,抱着试一试的心态投了,然而...
- Redisson:Java程序员手中的“魔法锁”
-
Redisson:Java程序员手中的“魔法锁”在这个万物互联的时代,分布式系统已经成为主流。然而,随着系统的扩展,共享资源的争夺成为了一个棘手的问题。就比如你想在淘宝“秒杀”一款商品,却发现抢的人太...
- 【线上故障复盘】RPC 线程池被打满,1024个线程居然不够用?
-
1.故障背景昨天晚上,我刚到家里打开公司群,就看见群里有人讨论:线上环境出现大量RPC请求报错,异常原因:被线程池拒绝。虽然异常量很大,但是异常服务非核心服务,属于系统旁路,服务于数据核对任务,即使...
- 小红书取消大小周,有人不高兴了!
-
小红书宣布五一节假日之后,取消大小周,恢复为正常的双休,乍一看工作时长变少,按道理来说大家应该都会很开心,毕竟上班时间缩短了,但是还是有一些小红书的朋友高兴不起来,心情很复杂。因为没有了大小周,以前...
- 延迟任务的多种实现方案(延迟机制)
-
场景订单超时自动取消:延迟任务典型的使用场景是订单超时自动取消。功能精确的时间控制:延时任务的时间控制要尽量准确。可靠性:延时任务的处理要是可靠的,确保所有任务最终都能被执行。这通常要求延时任务的方案...
- 百度java面试真题(java面试题下载)
-
1、SpingBoot也有定时任务?是什么注解?在SpringBoot中使用定时任务主要有两种不同的方式,一个就是使用Spring中的@Scheduled注解,另一个则是使用第三方框架Q...
- 回归基础:访问 Kubernetes Pod(concurrent.futures访问数据库)
-
Kubernetes是一头巨大的野兽。在它开始有用之前,您需要了解许多概念。在这里,学习几种访问集群外pod的方法。Kubernetes是一头巨大的野兽。在它开始有用之前,您需要了解许多不同的...
- Spring 缓存神器 @Cacheable:3 分钟学会优化高频数据访问
-
在互联网应用中,高频数据查询(如商品详情、用户信息)往往成为性能瓶颈。每次请求都触发数据库查询,不仅增加服务器压力,还会导致响应延迟。Spring框架提供的@Cacheable注解,就像给方法加了一...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Docker安装详细步骤及相关环境安装配置
- Linux下安装常用软件都有哪些?做了一个汇总列表,你看还缺啥?
- Nginx安装和使用指南详细讲解(nginx1.20安装)
- docker之安装部署Harbor(docker安装hacs)
- 成功安装 Magento2.4.3最新版教程「技术干货」
- 【Linux】——从0到1的学习,让你熟练掌握,带你玩转Linu
- cent6.5安装gitlab-ce最新版本-11.8.2并配置邮件服务
- 时隔三月,参加2020秋招散招,终拿字节跳动后端开发意向书.
- Redisson:Java程序员手中的“魔法锁”
- 【线上故障复盘】RPC 线程池被打满,1024个线程居然不够用?
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)