【redis实战六】Springboot+Redis实现通用消息队列stater
mhr18 2024-12-09 12:15 20 浏览 0 评论
1、消息队列选择
其实除了主流的各大消息中间件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其实Redis也是支持消息队列功能的。
而有时候我们不需要引入消息队列中间件,跟缓存中间件Redis一起一起共用一个Redis作为消息中间件也是可以的,这样就少用了一个组件。
2、Redis能实现哪些消息模式?
- 1)、使用stream实现点对点消息模式
- 2)、使用publish/subscribe实现发布订阅模式
3、我们将如何封装发消息中间件功能在starter中
- 1)、通过配置文件配置消息队列名称和对应的消费者类列表
- 2)、读取配置文件看创建消息队列相关stream(已创建就不重复)
- 3)、根据stream和消费者类名做一一绑定,实现不同的消息队列绑定不同的消费者监听器
- 4)、实现一个通用的消息消费者抽象类,子类只需要继承该类并且注入spring容器即可实现消费者接受减小功能
- 5)、将配置类和业务处理类放入spring.factories中实现自动配置
- 6)、使用redis的key过期监听实现延时队列功能
这个过程皆基于封装好的redis-cache-spring-boot-starter,如果需要则先查看封装SpringCache为starter的模块。
4、代码实现
4.1、依赖引入
因为是基于它实现的,所以需要引入它的依赖。
<dependencies>
<!--redis cache-->
<dependency>
<groupId>com.itdl</groupId>
<artifactId>redis-cache-spring-boot-starter</artifactId>
</dependency>
</dependencies>
4.2、点对点和广播模式监听器配置
不废话,直接上代码,监听器配置类
/**
* 自定义Redis MQ配置文件
*/
@Configuration // 标识为一个配置项,注入Spring容器
@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class, AbstractStreamConsumerListener.class, AutowireCapableBeanFactory.class})
@EnableConfigurationProperties({CustomStreamMqProperties.class}) // 启动Redis配置文件
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Slf4j
@SuppressWarnings("all")
public class RedisMqConfig extends AbstractRedisConfig implements ApplicationContextAware {
// 消息队列和监听器绑定的配置文件
@Autowired
private CustomStreamMqProperties streamMqProperties;
// 发布订阅模式消息监听器列表(继承BaseFanoutMessageListener注入spring容器,这里就能获取到)
@Autowired(required = false)
private List<BaseFanoutMessageListener> messageListeners;
// 点对点模式消息监听器列表(继承StreamListener注入spring容器,这里就能获取到)
@Autowired(required = false)
private List<StreamListener> streamListeners;
// spring容器上下文,用于获取Bean
private ApplicationContext applicationContext;
/**
* 配置Stream消息监听容器Options
*/
@Bean
public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions(){
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// block读取超时时间
.pollTimeout(Duration.ofSeconds(30))
// count 数量(一次只获取一条消息)
.batchSize(1)
// 序列化规则
.serializer(stringRedisSerializer)
.build();
}
/**
* 配置stream监听器接收消息
*/
@Bean
public StreamMessageListenerContainer<String,MapRecord<String,String,String>> streamMessageListenerContainer(RedisConnectionFactory factory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions,
RedisTemplate<String, Object> redisTemplate){
StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
streamMessageListenerContainerOptions);
final List<CustomStreamMqProperties.StreamConfig> configs = streamMqProperties.getConfigs();
// 如果配置为空或者容器中没有监听器注入,则不启动
if (CollectionUtils.isEmpty(configs) || CollectionUtils.isEmpty(streamListeners)){
return listenerContainer;
}
// 遍历队列和监听器配置列表
for (CustomStreamMqProperties.StreamConfig config : configs) {
//如果 流不存在 创建 stream 流
String streamName = config.getQueueName() + "Stream";
// 组名
String groupName = config.getQueueName() + "Group";
// 如果stream不存在,则创建
if(!redisTemplate.hasKey(streamName)){
redisTemplate.opsForStream().add(streamName, Collections.singletonMap("", ""));
log.info("======>>>初始化stream {} success", streamName);
}
//创建消费者组名不存在,则创建
try {
redisTemplate.opsForStream().createGroup(streamName, groupName);
log.info("======>>>初始化group {} success", groupName);
} catch (Exception e) {
log.warn("======>>>消费者组 {} 已存在", groupName);
}
// 获取消费者监听器的全限定类名
String messageListenerFullName = config.getMessageListenerFullName();
// 使用自定义的消息监听器和stream进行绑定。
// 遍历所有的注入容器中的stream监听器
for (StreamListener messageListener : streamListeners) {
// 如果监听器名称和配置全限定名称一样
if (messageListener.getClass().getName().equalsIgnoreCase(messageListenerFullName.trim())){
// 则将监听器和stream和消费者组进行绑定,从而接收消息
listenerContainer.receive(
Consumer.from(groupName, config.getQueueName() + "Consumer"),
StreamOffset.create(streamName, ReadOffset.lastConsumed()),
messageListener
);
log.info("add stream listener {} success", messageListener.getClass().getName());
}
}
}
// 所以的stream和监听器绑定之后,启动strean监听器
listenerContainer.start();
return listenerContainer;
}
/**
* redis监听器配置(发布订阅模式)
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
// 创建一个Redis消息监听容器
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 将连接工厂设置到容器中
container.setConnectionFactory(connectionFactory);
// 配置广播/发布订阅模式消息监听器
List<CustomStreamMqProperties.TopicConfig> topics = streamMqProperties.getTopics();
// not start
if (CollectionUtils.isEmpty(topics) || CollectionUtils.isEmpty(messageListeners)){
return container;
}
// 果配置为空或者容器中没有监听器注入,则不启动
for (CustomStreamMqProperties.TopicConfig topic : topics) {
// 获取topic名称
String topicName = topic.getTopicName().trim();
// 获取消息监听器的全限定类名
String messageListenerFullName = topic.getMessageListenerFullName();
// 使用自定义的消息监听器
for (BaseFanoutMessageListener messageListener : messageListeners) {
// 如果当前消息监听器名称就是配置的名称
if (messageListener.getClass().getName().equalsIgnoreCase(messageListenerFullName.trim())){
// 手动注册一个消息监听适配器。名称以topic + "ListenerAdapter"为适配器名称
MessageListenerAdapter adapter = (MessageListenerAdapter)registerBean(topic + "ListenerAdapter", listenerAdapter(messageListener));
log.info("register {} mapping MessageListenerAdapter", messageListener.getClass().getName());
container.addMessageListener(adapter, new PatternTopic(topicName));
}
}
}
return container;
}
// 根据见日你刚起创建一个监听器适配器
public MessageListenerAdapter listenerAdapter(MessageListener listener) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(listener, "onMessage");
// 配置序列化器
listenerAdapter.setSerializer(buildRedisJackson());
return listenerAdapter;
}
// 手动注册Bean,因为需要AutowireCapableBeanFactory进行注册bean, 所以需要使用@AutoConfigureBefore({AutowireCapableBeanFactory.class}) 提前加载该bean
public Object registerBean(String beanName, Object singletonObject){
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
//动态注册bean.
defaultListableBeanFactory.registerSingleton(beanName, singletonObject);
//获取动态注册的bean.
return applicationContext.getBean(beanName);
}
// 设置上下文,因为DefaultListableBeanFactory是从上下文中获取的
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
其中AbstractRedisConfig是在redis-cache-spring-boot-starter中RedisConnectionFactory等对象的创建。
@AutoConfigureBefore表示要先自动配置容器中这几个类。
点对点模式的监听器配置步骤
- 1)、读取队列和监听器全限定类名列表配置
- 2)、配置不存在或者注入spring容器的监听器列表不存在,则不绑定关系
- 3)、遍历配置文件,获取到队列名,加后缀Stream表示收发消息的stream流名称,加后缀Group表示对应的消费者组名称,加后缀Consumer表示消费者组的消费者名称。
- 4)、如果遍历到的监听器名称等于配置的全限定类名,那么就将监听器和队列进行绑定
- 5)、启动监听器
发布订阅模式的监听器配置步骤
- 1)、读取队列和监听器全限定类名列表配置
- 2)、配置不存在或者注入spring容器的监听器列表不存在,则不绑定关系
- 3)、遍历配置文件,获取到topic名,topic对应的监听器全限定类名
- 4)、如果遍历到的监听器名称等于配置的全限定类名,那么就将监听器和队列进行绑定
- 5)、绑定时需要使用AutowireCapableBeanFactory去手动注册一个MessageListenerAdapter,而监听器适配器的名称则是topic + "ListenerAdapter"
- 6)、启动监听器
4.3、为各自的监听器编写抽象父类或者父接口
点对点模式
点对点模式监听器需要实现StreamMessageListener接口,而这个接口不会被其他类实现,所有直接使用抽象类实现即可
/**
* 抽象消费者监听器实现(点对点模式)
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractStreamConsumerListener implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
protected RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("=====>>>消费者收消息: StreamName[{}], 消息内容: {}", message.getStream(), message.getValue());
// 获取生产者消息
Map<String, String> msgMap = message.getValue();
// 消息处理
handleMessage(message);
// 使用stream进行消息确认
StreamOperations<String, String, String> streamOperations = redisTemplate.opsForStream();
streamOperations.acknowledge(message.getStream(),
getGroupNameByStreamName(message),
message.getId());
}
// 消息处理交给子类自己处理
protected abstract void handleMessage(MapRecord<String, String, String> message);
// 根据streamName找到groupName, 因为我们前缀都一样,只是后缀不一样,替换掉即可
protected String getGroupNameByStreamName(MapRecord<String, String, String> message){
// 获取到streamName
String streamName = message.getStream();
return streamName.substring(0, streamName.lastIndexOf("Stream")) + "Group";
}
}
发布订阅模式
发布订阅模式监听器需要实现MessageListener接口,而这个接口在Key过期监听中已经被使用了,我们以它为父类注入实现类列表则会包含Key过期监听的类.
所以这里我们抽象一层接口.
/**
* 广播消息监听器
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractFanoutListener implements BaseFanoutMessageListener {
public abstract void doHandle(Message message);
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
log.info("=====>>>订阅到key【{}】消息了", key);
doHandle(message);
}
}
而我们的配置类中注入的也正是这个类的列表作为发布订阅模式模式的监听器列表
@Autowired(required = false)
private List<BaseFanoutMessageListener> messageListeners;
同样编写一个抽象类接收消息, 但是把消息处理交给子类去实现.
/**
* 广播消息监听器
*/
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractFanoutListener implements BaseFanoutMessageListener {
public abstract void doHandle(Message message);
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
log.info("=====>>>订阅到key【{}】消息了", key);
doHandle(message);
}
}
4.4、将RedisMqConfig放入spring.factories实现自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.mq.config.RedisMqConfig
4.5、在测试模块引入该starter并编写配置文件
<dependency>
<groupId>com.itdl</groupId>
<artifactId>redis-mq-spring-boot-starter</artifactId>
</dependency>
编写配置文件
server:
port: 8081
redis:
# Redis开关
enable: true
# Redis地址,格式ip:port,ip:port。集群使用逗号分割
host: 162.14.74.11:6379
# 密码
password:
# 数据库
database: 0
# 最大重试次数
max-redirects: 3
# 使用统一前缀管理
key-prefix: itdl
# 缓存空值开关开启
cache-null-values: true
# 缓存空值的key集合在内存中存储的最大数量
cacheNullValueKeySizeInMemory: 102400
mq:
# MQ点对点消息配置
configs:
# 队列名称
- queueName: testDemoQueue
# 队列绑定的消息监听器名称
messageListenerFullName: com.demo.consumer.TestConsumer1
# - queueName: testQueue2
# messageListenerFullName: com.demo.consumer.TestConsumer2
# - queueName: testQueue3
# messageListenerFullName: com.demo.consumer.TestConsumer3
# MQ发布订阅配置
topics:
# topic名称
- topicName: testDemoTopic
# topic绑定的消息监听器名称
messageListenerFullName: com.demo.subscriber.FanoutListener1
编写点对点模式监听器
/**
* test consumer
*/
@Component
@Slf4j
public class TestConsumer1 extends AbstractStreamConsumerListener {
@Override
protected void handleMessage(MapRecord<String, String, String> message) {
log.info("stream:{}, data:{}", message.getStream(), message.getValue());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("TestConsumer1 handle data");
}
}
可以复制一份,实现多个.
编写发布订阅模式监听器
/**
* 广播消息监听器
*/
@ConditionalOnProperty(value = "redis.enable", havingValue = "true")
@Slf4j
@Component
@SuppressWarnings("all")
public class FanoutListener1 extends AbstractFanoutListener {
@Override
public void doHandle(Message message) {
log.info("正在处理业务...:{}", message.toString());
}
}
编写测试接口
/**
* test
*/
@RestController
public class TestController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
// 测试点对点模式
@GetMapping("/testQueue")
public String testMq() {
// 创建Stream
StreamOperations streamOperations = redisTemplate.opsForStream();
//生产者发送消息 "emailStream"-是消费者配置的Stream队列名称
Map<String, String> map = new HashMap<>();
map.put("test1", "hello world0!");
// 发送两次testStream
Map<String, String> map0 = new HashMap<>();
map0.put("test1", "hello world1!");
streamOperations.add("testDemoQueueStream", map0);
return "success";
}
// 测试发布订阅模式
@GetMapping("/testPublish")
public String testMq2() throws InterruptedException {
Map<String, String> map4 = new HashMap<>();
map4.put("test4", "hello world4!");
redisTemplate.convertAndSend("testDemoTopic", JSONUtil.toJsonStr(map4));
return "success";
}
}
启动项目(启动三个实例)
测试点对点模式
调用10次http://localhost:8001/testQueue
测试发布订阅模式
调用3次http://localhost:8001/testPublish
5、小结
我们这里封装成了redis-mq-spring-boot-starter, 实现了点对点模式和发布订阅模式.
所以在业务量不太大的情况下,我们就不用太引入其他消息中间件了,一个reids就搞定了.
相关推荐
- 一文读懂Prometheus架构监控(prometheus监控哪些指标)
-
介绍Prometheus是一个系统监控和警报工具包。它是用Go编写的,由Soundcloud构建,并于2016年作为继Kubernetes之后的第二个托管项目加入云原生计算基金会(C...
- Spring Boot 3.x 新特性详解:从基础到高级实战
-
1.SpringBoot3.x简介与核心特性1.1SpringBoot3.x新特性概览SpringBoot3.x是建立在SpringFramework6.0基础上的重大版...
- 「技术分享」猪八戒基于Quartz分布式调度平台实践
-
点击原文:【技术分享】猪八戒基于Quartz分布式调度平台实践点击关注“八戒技术团队”,阅读更多技术干货1.背景介绍1.1业务场景调度任务是我们日常开发中非常经典的一个场景,我们时常会需要用到一些不...
- 14. 常用框架与工具(使用的框架)
-
本章深入解析Go生态中的核心开发框架与工具链,结合性能调优与工程化实践,提供高效开发方案。14.1Web框架(Gin,Echo)14.1.1Gin高性能实践//中间件链优化router:=...
- SpringBoot整合MyBatis-Plus:从入门到精通
-
一、MyBatis-Plus基础介绍1.1MyBatis-Plus核心概念MyBatis-Plus(简称MP)是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,为简化开发、提...
- Seata源码—5.全局事务的创建与返回处理
-
大纲1.Seata开启分布式事务的流程总结2.Seata生成全局事务ID的雪花算法源码3.生成xid以及对全局事务会话进行持久化的源码4.全局事务会话数据持久化的实现源码5.SeataServer创...
- Java开发200+个学习知识路线-史上最全(框架篇)
-
1.Spring框架深入SpringIOC容器:BeanFactory与ApplicationContextBean生命周期:实例化、属性填充、初始化、销毁依赖注入方式:构造器注入、Setter注...
- OpenResty 入门指南:从基础到动态路由实战
-
一、引言1.1OpenResty简介OpenResty是一款基于Nginx的高性能Web平台,通过集成Lua脚本和丰富的模块,将Nginx从静态反向代理转变为可动态编程的应用平台...
- 你还在为 Spring Boot3 分布式锁实现发愁?一文教你轻松搞定!
-
作为互联网大厂后端开发人员,在项目开发过程中,你有没有遇到过这样的问题:多个服务实例同时访问共享资源,导致数据不一致、业务逻辑混乱?没错,这就是分布式环境下常见的并发问题,而分布式锁就是解决这类问题的...
- 近2万字详解JAVA NIO2文件操作,过瘾
-
原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。从classpath中读取过文件的人,都知道需要写一些读取流的方法,很是繁琐。最近使用IDEA在打出.这个符号的时候,一行代...
- 学习MVC之租房网站(十二)-缓存和静态页面
-
在上一篇<学习MVC之租房网站(十一)-定时任务和云存储>学习了Quartz的使用、发邮件,并将通过UEditor上传的图片保存到云存储。在项目的最后,再学习优化网站性能的一些技术:缓存和...
- Linux系统下运行c++程序(linux怎么运行c++文件)
-
引言为什么要在Linux下写程序?需要更多关于Linux下c++开发的资料请后台私信【架构】获取分享资料包括:C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdf...
- 2022正确的java学习顺序(文末送java福利)
-
对于刚学习java的人来说,可能最大的问题是不知道学习方向,每天学了什么第二天就忘了,而课堂的讲解也是很片面的。今天我结合我的学习路线为大家讲解下最基础的学习路线,真心希望能帮到迷茫的小伙伴。(有很多...
- 一个 3 年 Java 程序员 5 家大厂的面试总结(已拿Offer)
-
前言15年毕业到现在也近三年了,最近面试了阿里集团(菜鸟网络,蚂蚁金服),网易,滴滴,点我达,最终收到点我达,网易offer,蚂蚁金服二面挂掉,菜鸟网络一个月了还在流程中...最终有幸去了网易。但是要...
- 多商户商城系统开发全流程解析(多商户商城源码免费下载)
-
在数字化商业浪潮中,多商户商城系统成为众多企业拓展电商业务的关键选择。这类系统允许众多商家在同一平台销售商品,不仅丰富了商品种类,还为消费者带来更多样的购物体验。不过,开发一个多商户商城系统是个复杂的...
你 发表评论:
欢迎- 一周热门
-
-
Redis客户端 Jedis 与 Lettuce
-
高并发架构系列:Redis并发竞争key的解决方案详解
-
redis如何防止并发(redis如何防止高并发)
-
开源推荐:如何实现的一个高性能 Redis 服务器
-
redis安装与调优部署文档(WinServer)
-
Redis 入门 - 安装最全讲解(Windows、Linux、Docker)
-
一文带你了解 Redis 的发布与订阅的底层原理
-
Redis如何应对并发访问(redis控制并发量)
-
oracle数据库查询Sql语句是否使用索引及常见的索引失效的情况
-
Java SE Development Kit 8u441下载地址【windows版本】
-
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)