一口气整理一波延时队列实现方案
mhr18 2024-11-20 18:39 22 浏览 0 评论
作者:kevinkrcai,腾讯IEG后台开发工程师
一、前言
前段时间参与了海外客服系统相关需求的开发工作,其中需要实现客服对用户消息回复超时的相关处理策略。比如:当客服接收到用户消息后,如果在3分钟内没有回复用户,则需要给用户推送一个问卷表单;当客服30分钟还没有回复用户消息,则需要给对应的客服发告警消息等。这类属于在当前时间点往后推迟指定时间执行的任务,可以采用延时队列实现此类功能,本文对延时队列的相关实现方案做一个简单的整理和总结。
二、相关实现
延时队列,就是具有延时功能的数据结构
2.1 基于内存数据结构实现
基于链表实现
基于链表实现的话,链表的每一个元素都是一个延迟任务,包含延时执行时间和延时任务的创建时间,旁路开启一个时间检测线程去轮询遍历链表,只要有当前时间减去延时任务的创建时间大于或等于延迟时间,任务就可以执行,执行完后从链表删除该任务,因此,新增任务的时间复杂度可以做到O(1),但是删除和检测任务是否能够执行时间复杂度则是O(N)。
上面是比较简单的实现思路,但是每次检测任务是否能够执行需要遍历整个链表,时间复杂度位O(N),如果在新增任务时,就根据执行时间的先后做排序,让延时时间最短的任务排在链表头部,那么检测任务是否能够执行和删除延时任务的时间复杂度能优化为O(1),相对前者较好,但是在新增任务时因需要做排序,所以时间复杂度会变为O(N)。
基于最小堆实现
基于排序链表的实现可以给我们提供了一种思路,将任务按照执行时间排序,所以我们可以考虑排序时间复杂度更优的数据结构:最小堆。JDK JUC包内置的延时队列DelayQueue就是基于最小堆实现。最小堆能保证每次新增任务,延时时间最近的排在堆的根结点,检测任务是否能够执行只需要检查根节点的任务是否到期了就行,检测任务时间复杂度是O(1),但是新增和删除任务,因为要做堆排序,所以时间复杂度是O(logN)
基于时间轮实现
什么是时间轮?
时间轮是一个存储定时任务的环状数组,相对上面提到的基于最小堆和链表的实现,时间轮可以做到新增,删除和检测任务的时间复杂度都是O(1)。
最简单的时间轮是单层时间轮,如下图所示,图中有8个时间格,每个时间格表示1s,那么这个时间轮就是表示周期为8s的时间轮,定时任务存放在时间轮每一个格子上,时间过去1s,则当前时间的指针则会向前移动一个单位时间。
延时任务如何存放在时间轮上:
计算某一个定时任务应该存放在时间轮哪一个下标下:
((延时任务执行时间 - 当前时间) / 时间轮单位时间 + 当前时间在时间轮的索引下标) % 时间周期
比如:当前时间00:03,现在有一个在00:04执行的任务,那么计算后得出,(1 / 1 + 3) % 8 = 4,定时任务应该存放在时间轮下标为4的格子上。同样道理,00:10执行的任务,就需要放在下标为2的格子上,可以复用之前的时间轮格子。
如果一个时间格子上刚好有多个任务,怎么执行?
如果延时执行的时间不同,但是下标计算的结果相同,放在了同一个时间格中,可以在这个任务中多一个round字段,表示要多转几圈,例如:当前时间是00:03,有两个延时任务,一个是00:10执行,一个是00:18执行,那他们都会放在下标为2的格子上,但是00:18执行的任务,round = 1,表示要多转一圈。只有round = 0的任务才是当前时间周期可以执行的。这些任务可以在对应的时间格中通过链表连接起来,每次对时间格的处理还需要遍历链表更新任务的round,超出当前时间周期的任务的round字段可以在减1,所以这块的时间复杂度是O(n)。
上面在检测任务的时候,还需要更新对应时间格链表的round字段,时间复杂度是O(n),通过层级时间轮可以将该复杂度优化为O(1),层级时间轮就是有多个时间轮,但是每个时间轮的单位时间是不一样的,如图所示:第一层时间轮的单位时间是1s,时间周期是0到8s,第二层的单位时间等于第一层的时间周期即8s,第二层的时间周期是0到64秒。这样对于超出第一层时间轮时间周期的任务就可以放在第二层。第一层转一圈,第二层转一格。
如下图所示:可以一个三层时间轮表示一天24小时,根据定时任务执行时间放置在不同的时间轮上,比如:02时00分01秒执行的任务比下面两个时间周期大,需要放在hours时间轮上,当current hour指向2,当前时间轮需要下降到下一层时间轮,直到下降到最后一层时间轮为止。
这类基于内存数据结构实现的延迟队列不支持分布式或者持久化,进程重启后数据会丢失,并且当延时任务很多时,队列中对象也会很多,不适合任务量比较大的情况,只适合不需要持久化且任务量相对较少的单机任务,并且能容忍丢失。
2.2 基于DB定时轮询实现
基于DB+定时任务轮询的实现方案通常就是通过一个线程定时去扫描数据库表,找到可以执行的任务触发执行。基于DB轮询,虽然可以解决任务持久化,但是也有以下几个缺点:
- 首先,定时轮询就会存在延迟,这个延迟的误差就在于你设定的轮询的间隔时间。比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 第二,扫描数据表对DB有压力,如果任务数量多,表的数据量大,定时扫表会增加DB磁盘IO压力。
- 第三,对自身服务器,定时轮询也会增加CPU的开销。
2.3 基于Redis实现
基于Redis发布订阅实现
延时消息事件通过 Redis 的pub/sub来进行分发, 设置key的过期时间是延迟时间,利用key过期事件通知来实现延迟消息。因为事件监听配置默认关闭,故开启 redis 的事件监听与发布,需修改redis.conf配置文件:
# notify-keyspace-events ""
notify-keyspace-events Ex
连接redis,订阅监听expire事件:当key过期后,就会监听到过期的key
PSUBSCRIBE __keyevent@*__:expired
如图所示:监听5秒后过期的key:
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func SetExpireEvent() {
// 设置一个键,并且3秒钟之后过期
redisCli.Set(context.Background(), "test_expire_event_notify", "测试键值过期通知", 3*time.Second)
fmt.Println(time.Now())
}
func SubExpireEvent() {
// 订阅key过期事件
sub := redisCli.PSubscribe(context.Background(), "__keyevent@*__:expired")
for {
msg := <- sub.Channel()
fmt.Println("Channel ", msg.Channel)
fmt.Println("pattern ", msg.Pattern)
fmt.Println("pattern ", msg.Payload) // 过期的key
fmt.Println(time.Now())
}
}
func main() {
SetExpireEvent()
go SubExpireEvent()
select {}
}
这种方案缺点在于:
- 第一:redis官方文档中也明确指出,redis从未保证key过期后会立马删除key并立即发送过期通知,因为redis实现key过期的方式是定时随机扫描key,发现过期则删除或者在访问key的时候再去检查key是否过期,过期则删除,所以消息的延迟是必然存在的。
- 第二:redis的事件消息是通过Pub/Sub发出来的,Pub/Sub消息不会做持久化,也没有消息的Ack机制,那样如果消费者没有ack就挂掉或者中间断链,重启后无法再处理这个消息,导致消息丢失。
基于Redis的ZSet实现
Redis的ZSet是基于score进行排序的有序数据结构,因此可以将延迟消息的到期时间戳作为score放到zset中,定时轮询存放延时任务的Zset,取出ZSet中最近要执行的延时任务和当前时间比较,小于等于当前时间即可以执行该任务。新增和查询任务的时间复杂度都是O(logN)。
小demo:
var redisCli *redis.Client
func init() {
// 连接redis
redisCli = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func addTask() {
// 添加一个三秒的定时任务
redisCli.ZAdd(context.Background(),"first-job",&redis.Z{Score:float64(time.Now().Add(3 * time.Second).Unix()),Member: 1})
fmt.Println("添加定时任务: ",time.Now())
}
func getTaskFromZset() {
for {
luaScript := `
local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES', 'LIMIT', 0, 1);
if #message > 0 then
redis.call('ZREM', KEYS[1], message[1]);
return message[1];
else
return nil;
end
`
res, err := redis.NewScript(luaScript).Run(context.Background(), redisCli, []string{"first-job"},strconv.FormatInt(time.Now().Unix(), 10)).Result()
if err != nil {
continue
}
// 执行延迟任务
fmt.Println("res:",res)
fmt.Println("执行延迟任务:",time.Now())
}
}
func main() {
addTask()
go getTaskFromZset()
select {}
}
基于Redis ZSet实现相对简单,但也有几点需要注意:
- 第一就是延时消息处理的原子性:轮询线程从Zset获取最近要执行的任务执行并将该任务移除出Zset这几个步骤整体需要封装为一个原子操作,否则服务多实例环境下,可能被其他服务的轮询线程消费到同一任务。
- 第二,定时轮训Zset可能会出现很多空轮训消耗服务器CPU
2.4 基于MQ实现
基于Redis ZSet实现延时队列是一种理解起来比较简单直观并且可以快速落地的方案,但是需要我们自己去实现延迟消息的检测,利用消息队列对延迟消息的支持,我们可以只关心延迟消息的业务逻辑,而不需要关心延迟消息什么时候可以执行。
基于Pulsar延时消费实现
Pulsar最早是在2.4.0引入了延迟消息投递的特性,在Pulsar中使用延迟消息,可以精确指定延迟投递的时间,有deliverAfter和deliverAt两种方式。其中deliverAt可以指定具体的时间戳;deliverAfter可以指定在当前多长时间后执行。
Pulsar延迟消息投递:
// 发送消息,5分钟后执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAfter: 5 * time.Minute,
})
// 发送消息,在指定时间执行
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// 消息内容
Payload: []byte("hello go client, this is a message."),
DeliverAt: time.Now().Add(time.Second * 10),
})
Pulsar延迟消息实现原理:Pulsar中的延迟消息是基于Netty中的时间轮去实现的,通过时间轮去推动时间,到达对应时间后去寻找对应的消息索引,延迟消息的索引通过优先级队列维护,优先队列根据延时时间排序,维护在Broker内存中。找到消息后在发给Consumer去消费。 Pulsar延迟消息有以下不足之处:
- 第一:延迟消息索引维护在内存中,一条延迟消息的消息索引由三个long类型的字段组成<延迟执行的时间戳以及用于定位消息的ledgerId和entryId,所以当有大量的延迟消息需要执行或者延迟消息的延迟时间越长,对内存的开销也会越大。
- 第二:如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建延迟消息索引,对于大规模的延时消息,索引重建的时间也会比较长。
我们目前是为海外的超核玩家服务,用户基数比较小,所以我们也选用了腾讯云的TDMQ-Pulsar作为延时队列的实现方案。
基于RabbitMQ实现
基于RabbitMQ实现有两种方式:
- 第一种是基于RabbitMQ的死信队列实现
- 第二种是基于RabbitMQ官方的延迟插件实现
其实RabbitMQ本身是不支持延迟消息的,但是RabbitMQ支持消息和队列TTL和死信队列,因此,可以利用这两个特性实现延迟队列的效果。
什么是死信消息?
在消息生产和消费过程中,当消息出现以下情况,则会变为死信消息:
- 消息被拒绝且不再重复投递
- 消息超时未被消费,TTL过期
- 队列达到最大长度无法正常投递消息
死信消息会被投递到Dead-Letter-Exchange,然后根据绑定的规则转发到队列的死信队列上,监听这些队列就消费消息就可以实现延迟消息的效果。
但是基于这种方式实现的延迟队列有个缺陷:RabbitMQ只会检查第一个消息是否过期,比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,RabbitMQ只会等到第一个消息过期了,才会让接下来的第二个消息过期,因此,可能会出现延迟消息投递不按时。
第一种肯定是不能应用于生产环境的,因此RabbitMQ推出了官方的延迟插件
这个插件其实是在消息发到Exchange后,将延迟消息暂时存放在mnesia(一种分布式数据库),等消息到期后在投递到队列中给消费者去消费。
使用延时插件实现延时队列也存在一些局限:
- 延迟消息存储在 Mnesia 数据库,在集群情况下,数据只会分布在其中一个节点,如果该节点宕机,延时消息不可用,但是恢复之后消息不会丢
- 官方文档也有提到,该插件依赖 Erlang 计时器,设计上并 不适合大量的延迟消息,如:百万级以上的延时消息
- 插件一旦被禁用了,没有被消费的延时消息会丢失。
三、总结
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)