【redis实战六】Springboot+Redis实现通用消息队列stater
mhr18 2024-12-09 12:15 30 浏览 0 评论
1、消息队列选择
其实除了主流的各大消息中间件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其实Redis也是支持消息队列功能的。
而有时候我们不需要引入消息队列中间件,跟缓存中间件Redis一起一起共用一个Redis作为消息中间件也是可以的,这样就少用了一个组件。
2、Redis能实现哪些消息模式?
- 1)、使用stream实现点对点消息模式
- 2)、使用publish/subscribe实现发布订阅模式
3、我们将如何封装发消息中间件功能在starter中
- 1)、通过配置文件配置消息队列名称和对应的消费者类列表
- 2)、读取配置文件看创建消息队列相关stream(已创建就不重复)
- 3)、根据stream和消费者类名做一一绑定,实现不同的消息队列绑定不同的消费者监听器
- 4)、实现一个通用的消息消费者抽象类,子类只需要继承该类并且注入spring容器即可实现消费者接受减小功能
- 5)、将配置类和业务处理类放入spring.factories中实现自动配置
- 6)、使用redis的key过期监听实现延时队列功能
这个过程皆基于封装好的redis-cache-spring-boot-starter,如果需要则先查看封装SpringCache为starter的模块。
4、代码实现
4.1、依赖引入
因为是基于它实现的,所以需要引入它的依赖。
<dependencies>
<!--redis cache-->
<dependency>
<groupId>com.itdl</groupId>
<artifactId>redis-cache-spring-boot-starter</artifactId>
</dependency>
</dependencies>
4.2、点对点和广播模式监听器配置
不废话,直接上代码,监听器配置类
/**
* 自定义Redis MQ配置文件
*/
@Configuration // 标识为一个配置项,注入Spring容器
@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class, AbstractStreamConsumerListener.class, AutowireCapableBeanFactory.class})
@EnableConfigurationProperties({CustomStreamMqProperties.class}) // 启动Redis配置文件
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Slf4j
@SuppressWarnings("all")
public class RedisMqConfig extends AbstractRedisConfig implements ApplicationContextAware {
// 消息队列和监听器绑定的配置文件
@Autowired
private CustomStreamMqProperties streamMqProperties;
// 发布订阅模式消息监听器列表(继承BaseFanoutMessageListener注入spring容器,这里就能获取到)
@Autowired(required = false)
private List<BaseFanoutMessageListener> messageListeners;
// 点对点模式消息监听器列表(继承StreamListener注入spring容器,这里就能获取到)
@Autowired(required = false)
private List<StreamListener> streamListeners;
// spring容器上下文,用于获取Bean
private ApplicationContext applicationContext;
/**
* 配置Stream消息监听容器Options
*/
@Bean
public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions(){
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// block读取超时时间
.pollTimeout(Duration.ofSeconds(30))
// count 数量(一次只获取一条消息)
.batchSize(1)
// 序列化规则
.serializer(stringRedisSerializer)
.build();
}
/**
* 配置stream监听器接收消息
*/
@Bean
public StreamMessageListenerContainer<String,MapRecord<String,String,String>> streamMessageListenerContainer(RedisConnectionFactory factory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions,
RedisTemplate<String, Object> redisTemplate){
StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
streamMessageListenerContainerOptions);
final List<CustomStreamMqProperties.StreamConfig> configs = streamMqProperties.getConfigs();
// 如果配置为空或者容器中没有监听器注入,则不启动
if (CollectionUtils.isEmpty(configs) || CollectionUtils.isEmpty(streamListeners)){
return listenerContainer;
}
// 遍历队列和监听器配置列表
for (CustomStreamMqProperties.StreamConfig config : configs) {
//如果 流不存在 创建 stream 流
String streamName = config.getQueueName() + "Stream";
// 组名
String groupName = config.getQueueName() + "Group";
// 如果stream不存在,则创建
if(!redisTemplate.hasKey(streamName)){
redisTemplate.opsForStream().add(streamName, Collections.singletonMap("", ""));
log.info("======>>>初始化stream {} success", streamName);
}
//创建消费者组名不存在,则创建
try {
redisTemplate.opsForStream().createGroup(streamName, groupName);
log.info("======>>>初始化group {} success", groupName);
} catch (Exception e) {
log.warn("======>>>消费者组 {} 已存在", groupName);
}
// 获取消费者监听器的全限定类名
String messageListenerFullName = config.getMessageListenerFullName();
// 使用自定义的消息监听器和stream进行绑定。
// 遍历所有的注入容器中的stream监听器
for (StreamListener messageListener : streamListeners) {
// 如果监听器名称和配置全限定名称一样
if (messageListener.getClass().getName().equalsIgnoreCase(messageListenerFullName.trim())){
// 则将监听器和stream和消费者组进行绑定,从而接收消息
listenerContainer.receive(
Consumer.from(groupName, config.getQueueName() + "Consumer"),
StreamOffset.create(streamName, ReadOffset.lastConsumed()),
messageListener
);
log.info("add stream listener {} success", messageListener.getClass().getName());
}
}
}
// 所以的stream和监听器绑定之后,启动strean监听器
listenerContainer.start();
return listenerContainer;
}
/**
* redis监听器配置(发布订阅模式)
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
// 创建一个Redis消息监听容器
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 将连接工厂设置到容器中
container.setConnectionFactory(connectionFactory);
// 配置广播/发布订阅模式消息监听器
List<CustomStreamMqProperties.TopicConfig> topics = streamMqProperties.getTopics();
// not start
if (CollectionUtils.isEmpty(topics) || CollectionUtils.isEmpty(messageListeners)){
return container;
}
// 果配置为空或者容器中没有监听器注入,则不启动
for (CustomStreamMqProperties.TopicConfig topic : topics) {
// 获取topic名称
String topicName = topic.getTopicName().trim();
// 获取消息监听器的全限定类名
String messageListenerFullName = topic.getMessageListenerFullName();
// 使用自定义的消息监听器
for (BaseFanoutMessageListener messageListener : messageListeners) {
// 如果当前消息监听器名称就是配置的名称
if (messageListener.getClass().getName().equalsIgnoreCase(messageListenerFullName.trim())){
// 手动注册一个消息监听适配器。名称以topic + "ListenerAdapter"为适配器名称
MessageListenerAdapter adapter = (MessageListenerAdapter)registerBean(topic + "ListenerAdapter", listenerAdapter(messageListener));
log.info("register {} mapping MessageListenerAdapter", messageListener.getClass().getName());
container.addMessageListener(adapter, new PatternTopic(topicName));
}
}
}
return container;
}
// 根据见日你刚起创建一个监听器适配器
public MessageListenerAdapter listenerAdapter(MessageListener listener) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(listener, "onMessage");
// 配置序列化器
listenerAdapter.setSerializer(buildRedisJackson());
return listenerAdapter;
}
// 手动注册Bean,因为需要AutowireCapableBeanFactory进行注册bean, 所以需要使用@AutoConfigureBefore({AutowireCapableBeanFactory.class}) 提前加载该bean
public Object registerBean(String beanName, Object singletonObject){
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
//动态注册bean.
defaultListableBeanFactory.registerSingleton(beanName, singletonObject);
//获取动态注册的bean.
return applicationContext.getBean(beanName);
}
// 设置上下文,因为DefaultListableBeanFactory是从上下文中获取的
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
其中AbstractRedisConfig是在redis-cache-spring-boot-starter中RedisConnectionFactory等对象的创建。
@AutoConfigureBefore表示要先自动配置容器中这几个类。
点对点模式的监听器配置步骤
- 1)、读取队列和监听器全限定类名列表配置
- 2)、配置不存在或者注入spring容器的监听器列表不存在,则不绑定关系
- 3)、遍历配置文件,获取到队列名,加后缀Stream表示收发消息的stream流名称,加后缀Group表示对应的消费者组名称,加后缀Consumer表示消费者组的消费者名称。
- 4)、如果遍历到的监听器名称等于配置的全限定类名,那么就将监听器和队列进行绑定
- 5)、启动监听器
发布订阅模式的监听器配置步骤
- 1)、读取队列和监听器全限定类名列表配置
- 2)、配置不存在或者注入spring容器的监听器列表不存在,则不绑定关系
- 3)、遍历配置文件,获取到topic名,topic对应的监听器全限定类名
- 4)、如果遍历到的监听器名称等于配置的全限定类名,那么就将监听器和队列进行绑定
- 5)、绑定时需要使用AutowireCapableBeanFactory去手动注册一个MessageListenerAdapter,而监听器适配器的名称则是topic + "ListenerAdapter"
- 6)、启动监听器
4.3、为各自的监听器编写抽象父类或者父接口
点对点模式
点对点模式监听器需要实现StreamMessageListener接口,而这个接口不会被其他类实现,所有直接使用抽象类实现即可
/**
* 抽象消费者监听器实现(点对点模式)
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractStreamConsumerListener implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
protected RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("=====>>>消费者收消息: StreamName[{}], 消息内容: {}", message.getStream(), message.getValue());
// 获取生产者消息
Map<String, String> msgMap = message.getValue();
// 消息处理
handleMessage(message);
// 使用stream进行消息确认
StreamOperations<String, String, String> streamOperations = redisTemplate.opsForStream();
streamOperations.acknowledge(message.getStream(),
getGroupNameByStreamName(message),
message.getId());
}
// 消息处理交给子类自己处理
protected abstract void handleMessage(MapRecord<String, String, String> message);
// 根据streamName找到groupName, 因为我们前缀都一样,只是后缀不一样,替换掉即可
protected String getGroupNameByStreamName(MapRecord<String, String, String> message){
// 获取到streamName
String streamName = message.getStream();
return streamName.substring(0, streamName.lastIndexOf("Stream")) + "Group";
}
}
发布订阅模式
发布订阅模式监听器需要实现MessageListener接口,而这个接口在Key过期监听中已经被使用了,我们以它为父类注入实现类列表则会包含Key过期监听的类.
所以这里我们抽象一层接口.
/**
* 广播消息监听器
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractFanoutListener implements BaseFanoutMessageListener {
public abstract void doHandle(Message message);
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
log.info("=====>>>订阅到key【{}】消息了", key);
doHandle(message);
}
}
而我们的配置类中注入的也正是这个类的列表作为发布订阅模式模式的监听器列表
@Autowired(required = false)
private List<BaseFanoutMessageListener> messageListeners;
同样编写一个抽象类接收消息, 但是把消息处理交给子类去实现.
/**
* 广播消息监听器
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractFanoutListener implements BaseFanoutMessageListener {
public abstract void doHandle(Message message);
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
log.info("=====>>>订阅到key【{}】消息了", key);
doHandle(message);
}
}
4.4、将RedisMqConfig放入spring.factories实现自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.mq.config.RedisMqConfig
4.5、在测试模块引入该starter并编写配置文件
<dependency>
<groupId>com.itdl</groupId>
<artifactId>redis-mq-spring-boot-starter</artifactId>
</dependency>
编写配置文件
server:
port: 8081
redis:
# Redis开关
enable: true
# Redis地址,格式ip:port,ip:port。集群使用逗号分割
host: 162.14.74.11:6379
# 密码
password:
# 数据库
database: 0
# 最大重试次数
max-redirects: 3
# 使用统一前缀管理
key-prefix: itdl
# 缓存空值开关开启
cache-null-values: true
# 缓存空值的key集合在内存中存储的最大数量
cacheNullValueKeySizeInMemory: 102400
mq:
# MQ点对点消息配置
configs:
# 队列名称
- queueName: testDemoQueue
# 队列绑定的消息监听器名称
messageListenerFullName: com.demo.consumer.TestConsumer1
# - queueName: testQueue2
# messageListenerFullName: com.demo.consumer.TestConsumer2
# - queueName: testQueue3
# messageListenerFullName: com.demo.consumer.TestConsumer3
# MQ发布订阅配置
topics:
# topic名称
- topicName: testDemoTopic
# topic绑定的消息监听器名称
messageListenerFullName: com.demo.subscriber.FanoutListener1
编写点对点模式监听器
/**
* test consumer
*/
@Component
@Slf4j
public class TestConsumer1 extends AbstractStreamConsumerListener {
@Override
protected void handleMessage(MapRecord<String, String, String> message) {
log.info("stream:{}, data:{}", message.getStream(), message.getValue());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("TestConsumer1 handle data");
}
}
可以复制一份,实现多个.
编写发布订阅模式监听器
/**
* 广播消息监听器
*/
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Slf4j
@Component
@SuppressWarnings("all")
public class FanoutListener1 extends AbstractFanoutListener {
@Override
public void doHandle(Message message) {
log.info("正在处理业务...:{}", message.toString());
}
}
编写测试接口
/**
* test
*/
@RestController
public class TestController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
// 测试点对点模式
@GetMapping("/testQueue")
public String testMq() {
// 创建Stream
StreamOperations streamOperations = redisTemplate.opsForStream();
//生产者发送消息 "emailStream"-是消费者配置的Stream队列名称
Map<String, String> map = new HashMap<>();
map.put("test1", "hello world0!");
// 发送两次testStream
Map<String, String> map0 = new HashMap<>();
map0.put("test1", "hello world1!");
streamOperations.add("testDemoQueueStream", map0);
return "success";
}
// 测试发布订阅模式
@GetMapping("/testPublish")
public String testMq2() throws InterruptedException {
Map<String, String> map4 = new HashMap<>();
map4.put("test4", "hello world4!");
redisTemplate.convertAndSend("testDemoTopic", JSONUtil.toJsonStr(map4));
return "success";
}
}
启动项目(启动三个实例)
测试点对点模式
调用10次http://localhost:8001/testQueue
测试发布订阅模式
调用3次http://localhost:8001/testPublish
5、小结
我们这里封装成了redis-mq-spring-boot-starter, 实现了点对点模式和发布订阅模式.
所以在业务量不太大的情况下,我们就不用太引入其他消息中间件了,一个reids就搞定了.
相关推荐
- 保持SSH隧道活跃:一个实用的Bash监控脚本
-
引言如果您正在使用AWSDocumentDB或任何位于堡垒主机后面的云托管服务等远程资源,您可能正在使用SSH隧道来安全地访问它们。虽然设置SSH隧道很简单,但保持其活跃状态并监控其状态可能会有些棘...
- 京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?
-
京东大佬问我,为什么说连接池是微服务的关键,你是如何理解的?我应该如何理解。首先,我需要回忆一下连接池和微服务的基本概念,然后思考它们在微服务架构中的作用和重要性。连接池,数据库连接池,用来管理数据库...
- OOM 血案:5 小时绝地求生,MAT+Arthas 终极排查指南
-
一、血案现场:线上服务突然暴毙2025年4月12日凌晨3点15分,服务突发大规模OOM,三个Pod在10分钟内连续崩溃,Prometheus告警显示JVM堆内存使用率...
- 记Tomcat优化方案
-
Tomcat服务吞吐量评估方案问题:评估方案在一台8核16G的linux服务器上,使用tomcat容器部署服务。在正常情况下如何评估这个tomcat服务可处理的连接数,即服务的吞吐量,请在正常情况下考...
- Java高级面试,常见数据结构的实现原理详细说明及面试总结
-
一、List接口实现类1.ArrayList底层结构:动态数组(Object[]数组)。核心原理:o动态扩容:初始容量为10(JDK1.8),当元素超过容量时,新容量为原容量的1.5倍(old...
- SpringBoot敏感配置项加密与解密实战
-
一、为什么要加密配置?先说说SpringBoot的配置加载机制。我们知道,SpringBoot支持多种配置加载方式,优先级从高到低大概是:命令行参数环境变量application-{profile}....
- 【面试题】nacos 配置管理类型-主配置、共享配置、扩展配置
-
nacos配置管理类型-主配置、共享配置、扩展配置Nacos的配置管理支持多种类型,其中共享配置及其扩展机制(如shared-configs和extension-configs)是微服...
- Spring Boot 的 RedisAutoConfiguration 配置:自动装配到自定义扩展
-
在SpringBoot开发中,Redis作为高性能缓存和分布式数据存储方案被广泛使用。而RedisAutoConfiguration作为SpringBoot自动装配体系的重要组成部分,能...
- Docker图像处理:扩展您的优化工作流程
-
随着应用程序的增长和图像处理需求的增加,传统的优化方法遇到了扩展瓶颈。内存限制、环境不一致和处理瓶颈将图像优化从一个已解决的问题变成了生产环境的噩梦。Docker改变了游戏规则。通过容器化图像处理工作...
- 掌握 Spring 框架这 10 个扩展点,让你的能力更上一层楼
-
当我们提到Spring时,或许首先映入脑海的是IOC(控制反转)和AOP(面向切面编程)。它们可以被视为Spring的基石。正是凭借其出色的设计,Spring才能在众多优秀框架中脱颖而出...
- 简简单单在线文件浏览的功能搞起来很头疼
-
您的系统支持在线预览文件吗?一个小小的问题,背后是无数程序员的爆肝研究,有人说了,我平时打开个文件不是很容易吗?其实不然。文件格式代表着软件行业的底层、高端产出,也代表着经久不衰的使用场景,也是我国底...
- 没硬盘、网盘也能看片自由!NAS一键部署MoonTV,随时随地爽看。
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙有没有一个应用服务,能满足既没有足够预算购置硬盘,也不想依托网盘的朋友的家庭观影需求?之前我介绍过LibreTV,本篇再来看看另一个更...
- 阿里云ECS代理商:如何使用ECS部署Node.js应用?
-
Node.js作为一种高性能、事件驱动的JavaScript运行环境,广泛用于构建实时通信、微服务接口、后台管理系统等现代Web应用。而阿里云ECS服务器以高可用性、灵活配置、安全稳定等优势,为部署N...
- 阿里云数据库代理商:如何提高数据库的查询效率?
-
在现代企业应用中,数据库查询效率对整体系统性能的影响巨大。特别是随着数据量的不断增加,如何提升数据库查询的响应速度,成为了数据库优化的关键任务。阿里云提供了一系列工具和策略,帮助用户提升数据库的查询效...
- 阿里云代理商:阿里云G6ne实例如何承载1.4亿QPS?
-
一、阿里云G6ne实例概述1.1G6ne实例的背景与定位阿里云G6ne实例是基于阿里云自主研发的“飞天”架构设计的高性能云服务器实例,专为大规模、需要高IOPS和低延迟的业务场景设计。它采用了更强大的...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (83)
- php redis (97)
- redis 存储 (67)
- redis 锁 (74)
- 启动 redis (73)
- redis 时间 (60)
- redis 删除 (69)
- redis内存 (64)
- redis并发 (53)
- redis 主从 (71)
- redis同步 (53)
- redis结构 (53)
- redis 订阅 (54)
- redis 登录 (62)
- redis 面试 (58)
- redis问题 (54)
- 阿里 redis (67)
- redis的缓存 (57)
- lua redis (59)
- redis 连接池 (64)