SpringBoot集成阿里云RocketMQ 实现延时队列
mhr18 2024-12-09 12:15 29 浏览 0 评论
前言
消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。具体详情参考 消息队列RocketMQ版 阿里云官网介绍。
在这一篇里,我们将介绍SpringBoot如何集成阿里云版RocketMQ,通过本篇的学习,你将收获:
- 什么是延时队列
- 延时队列在设班排课领域内的使用场景
- SpringBoot整合RocketMQ
什么是延时队列
顾名思义,延迟队列是指被延迟消费的队列。首先,队列意味着内部的元素是有序的,元素出队和入队是有方向性的。 普通的队列,消息一旦入队后就会被消费者马上消费。而延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,延时队列中的元素则是希望被在指定时间取出和处理,可以说延时队列中的元素是都是带时间属性的。简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延时队列使用场景
延时队列的运用场景比较多,在设班排课领域有如下场景:
- 班级某节课到达开课时间后,不允许插班,需要变更班级状态
- 预定未来教室后,到达预定教室使用时间之前,给预订者发提醒消息
- 商品过售卖期后自动下架
大家应该都发现了这些场景都有一个共同特点,需要在指定的时间点完成某一项任务。这个时候就有人会说了,这些功能可以使用定时任务轮询数据库,固定时间周期取出所需的数据进行处理。如果数据量比较少,确实可以这样做;如果对数据实时性不是有严格的限制,那么每小时执行下定时任务出来数据,确实也是一个可行的方案。但是对于数据量比较大,并且时效性较强的场景,这种轮询方式很难在一个周期内处理完业务数据,这就会给系统和数据库带来很大压力,无法满足业务要求而且性能低下。
这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地。延迟队列的实现方式比较多,比如DelayQueue延时队列、RabbitMQ队列、Redis键通知或Redis zset等等。
阿里云版RocketMQ 支持任意时间等级延时,简单易用并省去运维工作,所以本文将介绍如何用阿里云版RocketMQ来实现延时队列。
首先需要在阿里云开通RocketMQ服务,具体请参考阿里云官方文档
下面讲解SpringBoot整合RocketMQ来实现延时队列的,话不多说,上代码
- 引入依赖包
<!-- aliyun business version rocketmq-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
- yml配置文件添加属性
nepenthe: #服务名,替换成自己的
rocketmq:
accessKey: #连接AccessKey id ,替换成自己的
secretKey: #连接AccessKey secret,替换成自己的
nameSrvAddr: #生产者ons接入域名,替换成自己的
delayTopic: #生产主题,替换成自己的
groupId: #生产者id(旧版本是生产者id,新版本是groupid),替换成自己的
timeoutMillis: #发送过期时间,自定义
- 封装RocketMQ配置文件
@Configuration
public class RocketMqConfig {
/**
*鉴权需要的AccessKey ID
*/
@Value("${nepenthe.rocketmq.accessKey}")
private String accessKey;
/**
*鉴权需要的AccessKey Secret
*/
@Value("${nepenthe.rocketmq.secretKey}")
private String secretKey;
/**
* 实例TCP 协议公网接入地址(实际项目,填写自己阿里云MQ的公网地址)
*/
@Value("${nepenthe.rocketmq.nameSrvAddr}")
private String nameSrvAddr;
/**
* 延时队列topic
*/
@Value("${nepenthe.rocketmq.delayTopic}")
private String delayTopic;
/**
* 延时队列group
*/
@Value("${nepenthe.rocketmq.groupId}")
private String groupId;
/**
* 设置发送超时时间
*/
@Value("${nepenthe.rocketmq.timeoutMillis}")
private String timeoutMillis;
public Properties getRocketMqProperty() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);
return properties;
}
//TODO get set方法,自己实现
}
- 配置生产者
@Component
public class RocketMqProducerInit {
@Autowired
private RocketMqConfig mqConfig;
private static Producer producer;
@PostConstruct
public void init(){
System.out.println("启动RocketMq生产者!");
producer = ONSFactory.createProducer(mqConfig.getRocketMqProperty());
// 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
producer.start();
}
/**
* 初始化生产者
* @return
*/
public Producer getProducer(){
return producer;
}
}
- 生产者发送消息的方法
@Component
public class ProducerHandler {
private Logger logger = LoggerFactory.getLogger(ProducerHandler.class);
@Autowired
private RocketMqConfig config;
@Autowired
private RocketMqProducerInit producer;
/**
* 同步发送延时消息
* @param msgTag 标签,可用于消息小分类标注,对消息进行再归类
* @param messageBody 消息body内容,
* @param msgKey 消息key值,
* @param delayTime 服务端发送消息时间
* @return success:SendResult or error:null
*/
public SendResult sendTimeMsg(String msgTag, byte[] messageBody, String msgKey, long delayTime) {
Message msg = new Message(config.getDelayTopic(),msgTag,msgKey,messageBody);
msg.setStartDeliverTime(delayTime);
return this.send(msg);
}
/**
* 消息发送
* @param msg 消息
*/
private SendResult send(Message msg) {
try {
SendResult sendResult = producer.getProducer().send(msg);
//获取发送结果,不抛异常即发送成功
if (sendResult != null) {
logger.info("发送RocketMQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, message:{}"
,msg.getTopic(),sendResult.getMessageId(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
return sendResult;
}else {
logger.error("发送RocketMQ消息失败-- Topic:{}, Key:{}, tag:{}, message:{}"
,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
return null;
}
} catch (Exception e) {
logger.error("发送RocketMQ消息失败-- Topic:{}, Key:{}, tag:{}, message:{},错误原因:{}"
,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()),e.getMessage());
return null;
}
}
- 配置消费者
@Component
public class RocketmqConsumerInit {
@Autowired
private RocketMqConfig mqConfig;
private static Consumer consumer;
@PostConstruct
public void init(){
System.out.println("启动消费者!");
consumer = ONSFactory.createConsumer(mqConfig.getRocketMqProperty());
//监听第一个topic,new对应的监听器
consumer.subscribe(mqConfig.getDelayTopic(), "*", new DelayQueueMessageListener());
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return
*/
public Consumer getConsumer(){
return consumer;
}
}
- 消费者消息监听
@Service
public class DelayQueueMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), new String(message.getBody()));
try {
//TODO 处理业务
//消费成功,继续消费下一条消息
return Action.CommitMessage;
} catch (Exception e) {
logger.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
return Action.ReconsumeLater;
}
}
}
- 运行结果
@Autowired
private ProducerHandler producerHandler;
@RequestMapping("delayMsg")
public void delayMsg2(String msg, Integer delayTime) {
JSONObject body = new JSONObject();
body.put("userId", "this is userId");
body.put("notice", "同步消息");
producerHandler.sendTimeMsg("userMessage", msg.getBytes(), "messageId", System.currentTimeMillis()+delayTime);
}
- 日志如下
2021-02-25 18:05:20.584 [http-nio-8080-exec-7] 1591467 INFO c.x.n.m.mq.rocketmq.ProducerUtil - TraceId[1614247520XAkUQkT0qv6] 发送MQ消息成功 -- Topic:auto_publish_delay_queue_topic ,msgId:0A4922E703D914DAD5DC7F7A4908000D , Key:messageId, tag:userMessage, body:我是延时队列延时10秒
2021-02-25 18:05:26.389 [http-nio-8080-exec-9] 1597272 INFO c.x.n.m.mq.rocketmq.ProducerUtil - TraceId[1614247526jymlRyaX9m6] 发送MQ消息成功 -- Topic:auto_publish_delay_queue_topic ,msgId:0A4922E703D914DAD5DC7F7A5F780010 , Key:messageId, tag:userMessage, body:我是延时队列延时12秒
2021-02-25 18:05:30.620 [ConsumeMessageThread_3] 1601503 INFO c.x.n.m.m.r.DelayQueueMessageListener - 接收到MQ消息 -- Topic:auto_publish_delay_queue_topic, tag:userMessage,msgId:bf58a99274cfadeddbec720c12d8607d , Key:messageId, body:我是延时队列延时10秒
2021-02-25 18:05:38.871 [ConsumeMessageThread_4] 1609754 INFO c.x.n.m.m.r.DelayQueueMessageListener - 接收到MQ消息 -- Topic:auto_publish_delay_queue_topic, tag:userMessage,msgId:d44270736cdfa2927194650c451584c3 , Key:messageId, body:我是延时队列延时12秒
两条消息的发送时间与消费时间,符合预期。至此,阿里云版RocketMQ实现延时队列的部分就完成了。如果考虑阿里云版RocketMQ费用,也可以考虑开源版RocketMQ,默认支持18个延时等级,或RabbitMQ加延时插件。
相关推荐
- 保持SSH隧道活跃:一个实用的Bash监控脚本
-
引言如果您正在使用AWSDocumentDB或任何位于堡垒主机后面的云托管服务等远程资源,您可能正在使用SSH隧道来安全地访问它们。虽然设置SSH隧道很简单,但保持其活跃状态并监控其状态可能会有些棘...
- 京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?
-
京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?我应该如何理解。首先,我需要回忆一下连接池和微服务的基本概念,然后思考它们在微服务架构中的作用和重要性。连接池,数据库连接池,用来管理数据库...
- OOM 血案:5 小时绝地求生,MAT+Arthas 终极排查指南
-
一、血案现场:线上服务突然暴毙2025年4月12日凌晨3点15分,服务突发大规模OOM,三个Pod在10分钟内连续崩溃,Prometheus告警显示JVM堆内存使用率...
- 记Tomcat优化方案
-
Tomcat服务吞吐量评估方案问题:评估方案在一台8核16G的linux服务器上,使用tomcat容器部署服务。在正常情况下如何评估这个tomcat服务可处理的连接数,即服务的吞吐量,请在正常情况下考...
- Java高级面试,常见数据结构的实现原理详细说明及面试总结
-
一、List接口实现类1.ArrayList底层结构:动态数组(Object[]数组)。核心原理:o动态扩容:初始容量为10(JDK1.8),当元素超过容量时,新容量为原容量的1.5倍(old...
- SpringBoot敏感配置项加密与解密实战
-
一、为什么要加密配置?先说说SpringBoot的配置加载机制。我们知道,SpringBoot支持多种配置加载方式,优先级从高到低大概是:命令行参数环境变量application-{profile}....
- 【面试题】nacos 配置管理类型-主配置、共享配置、扩展配置
-
nacos配置管理类型-主配置、共享配置、扩展配置Nacos的配置管理支持多种类型,其中共享配置及其扩展机制(如shared-configs和extension-configs)是微服...
- Spring Boot 的 RedisAutoConfiguration 配置:自动装配到自定义扩展
-
在SpringBoot开发中,Redis作为高性能缓存和分布式数据存储方案被广泛使用。而RedisAutoConfiguration作为SpringBoot自动装配体系的重要组成部分,能...
- Docker图像处理:扩展您的优化工作流程
-
随着应用程序的增长和图像处理需求的增加,传统的优化方法遇到了扩展瓶颈。内存限制、环境不一致和处理瓶颈将图像优化从一个已解决的问题变成了生产环境的噩梦。Docker改变了游戏规则。通过容器化图像处理工作...
- 掌握 Spring 框架这 10 个扩展点,让你的能力更上一层楼
-
当我们提到Spring时,或许首先映入脑海的是IOC(控制反转)和AOP(面向切面编程)。它们可以被视为Spring的基石。正是凭借其出色的设计,Spring才能在众多优秀框架中脱颖而出...
- 简简单单在线文件浏览的功能搞起来很头疼
-
您的系统支持在线预览文件吗?一个小小的问题,背后是无数程序员的爆肝研究,有人说了,我平时打开个文件不是很容易吗?其实不然。文件格式代表着软件行业的底层、高端产出,也代表着经久不衰的使用场景,也是我国底...
- 没硬盘、网盘也能看片自由!NAS一键部署MoonTV,随时随地爽看。
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙有没有一个应用服务,能满足既没有足够预算购置硬盘,也不想依托网盘的朋友的家庭观影需求?之前我介绍过LibreTV,本篇再来看看另一个更...
- 阿里云ECS代理商:如何使用ECS部署Node.js应用?
-
Node.js作为一种高性能、事件驱动的JavaScript运行环境,广泛用于构建实时通信、微服务接口、后台管理系统等现代Web应用。而阿里云ECS服务器以高可用性、灵活配置、安全稳定等优势,为部署N...
- 阿里云数据库代理商:如何提高数据库的查询效率?
-
在现代企业应用中,数据库查询效率对整体系统性能的影响巨大。特别是随着数据量的不断增加,如何提升数据库查询的响应速度,成为了数据库优化的关键任务。阿里云提供了一系列工具和策略,帮助用户提升数据库的查询效...
- 阿里云代理商:阿里云G6ne实例如何承载1.4亿QPS?
-
一、阿里云G6ne实例概述1.1G6ne实例的背景与定位阿里云G6ne实例是基于阿里云自主研发的“飞天”架构设计的高性能云服务器实例,专为大规模、需要高IOPS和低延迟的业务场景设计。它采用了更强大的...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (83)
- php redis (97)
- redis 存储 (67)
- redis 锁 (74)
- 启动 redis (73)
- redis 时间 (60)
- redis 删除 (69)
- redis内存 (64)
- redis并发 (53)
- redis 主从 (71)
- redis同步 (53)
- redis结构 (53)
- redis 订阅 (54)
- redis 登录 (62)
- redis 面试 (58)
- redis问题 (54)
- 阿里 redis (67)
- redis的缓存 (57)
- lua redis (59)
- redis 连接池 (64)