Java面试官:如何用Redis实现一个消息队列?直接上代码
mhr18 2024-11-10 09:48 24 浏览 0 评论
写这种单纯的源码很枯燥乏味,大家看起来估计也累,以后我这种类型的少写,大家老是让我写点深度的,我说真的很多东西我源码一贴,看都没人看。
整理了一份《Java面试核心知识点PDF》关注私信回复:555获取
Redis用作消息队列,其在spring boot中的主要表现为一个RedisTemplate.convertAndSend()方法和一个MessageListener接口。所以我们要在IOC容器中注入一个RedisTemplate和一个实现了MessageListener接口的类。话不多说,先看代码
配置RedisTemplate
配置RedisTemplate的主要目的是配置序列化方式以解决乱码问题,同时合理配置序列化方式还能降低一点性能开销。
/** * 配置RedisTemplate,解决乱码问题 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { LOGGER.debug("redis序列化配置开始"); RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); // string序列化方式 RedisSerializer serializer = new GenericJackson2JsonRedisSerializer(); // 设置默认序列化方式 template.setDefaultSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); LOGGER.debug("redis序列化配置结束"); return template; }
代码第12行,我们配置默认的序列化方式为GenericJackson2JsonRedisSerializer
代码第13行,我们配置键的序列化方式为StringRedisSerializer
代码第14行,我们配置哈希表的值的序列化方式为GenericJackson2JsonRedisSerializer
RedisTemplate几种序列化方式的简要介绍
序列化方式 介绍 StringRedisSerializer 将对象序列化为字符串,但是经测试,无法序列化对象,一般用在key上 OxmSerializer 将对象序列化为xml性质,本质上是字符串 ByteArrayRedisSerializer 默认序列化方式,将对象序列化为二进制字节,但是需要对象实现Serializable接口 GenericFastJsonRedisSerializer json序列化,使用fastjson序列化方式序列化对象 GenericJackson2JsonRedisSerializer json序列化,使用jackson序列化方式序列化对象
redis队列监听器(消费者)
上面说了,与redis队列监听器相关的类为一个名为MessageListener的接口,下面是该接口的源码
public interface MessageListener { void onMessage(Message message, @Nullable byte[] pattern); }
可以看到,该接口仅有一个onMessage(Message message, @Nullable byte[] pattern)方法,该方法便是监听到队列中消息后的回调方法。下面解释一下这两个参数:
- message:redis消息类,该类中仅有两个方法 byte[] getBody()以二进制形式获取消息体 byte[] getChannel()以二进制形式获取消息通道
- pattern:二进制形式的消息通道,和message.getChannel()返回值相同
介绍完接口,我们来实现一个简单的redis队列监听器
@Component public class RedisListener implement MessageListener{ private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); @Override public void onMessage(Message message,byte[] pattern){ LOGGER.debug("从消息通道={}监听到消息",new String(pattern)); LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel())); LOGGER.debug("元消息={}",new String(message.getBody())); // 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样 // 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer // 所以这里的实现方式为GenericJackson2JsonRedisSerializer RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody())); } }
代码很简单,就是输出参数中包含的关键信息。需要注意的是,RedisSerializer的实现要与上面配置的序列化方式一致。
队列监听器实现完以后,我们还需要将这个监听器添加到redis队列监听器容器中,代码如下:
@Bean public public RedisMessageListenerContainer container(RedisConnectionFactory factory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.addMessageListener(redisListener, new PatternTopic("demo-channel")); return container; }
这几行代码大概意思就是新建一个Redis消息监听器容器,然后将监听器和管道名想绑定,最后返回这个容器。
这里要注意的是,这个管道名和下面将要说的推送消息时的管道名要一致,不然监听器监听不到消息。
redis队列推送服务(生产者)
上面我们配置了RedisTemplate将要在这里使用到。
代码如下:
@Service public class Publisher{ @Autowrite private RedisTemplate redis; public void publish(Object msg){ redis.convertAndSend("demo-channel",msg); } }
关键代码为第7行,redis.convertAndSend()这个方法的作用为,向某个通道(参数1)推送一条消息(第二个参数)。
这里还是要注意上面所说的,生产者和消费者的通道名要相同。
至此,消息队列的生产者和消费者已经全部编写完成。
遇到的问题及解决办法
1、spring boot使用log4j2日志框架问题
在我添加了spring-boot-starter-log4j2依赖并在spring-boot-starter-web中排除了spring-boot-starter-logging后,运行项目,还是会提示下面的错误:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
这个错误就是maven中有多个日志框架导致的。后来通过依赖分析,发现在spring-boot-starter-data-redis中,也依赖了spring-boot-starter-logging,解决办法也很简单,下面贴出详细代码
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
2、redis队列监听器线程安全问题
redis队列监听器的监听机制是:使用一个线程监听队列,队列有未消费的消息则取出消息并生成一个新的线程来消费消息。如果你还记得,我开头说的是由于redis单线程特性,因此我们用它来做消息队列,但是如果监听器每次接受一个消息就生成新的线程来消费信息的话,这样就完全没有使用到redis的单线程特性,同时还会产生线程安全问题。
单一消费者(一个通道只有一个消费者)的解决办法
最简单的办法莫过于为onMessage()方法加锁,这样简单粗暴却很有用,不过这种方式无法控制队列监听的速率,且无限制的创造线程最终会导致系统资源被占光。
那如何解决这种情况呢?线程池。
在将监听器添加到容器的配置的时候,RedisMessageListenerContainer类中有一个方法setTaskExecutor(Executor taskExecutor)可以为监听容器配置线程池。配置线程池以后,所有的线程都会由该线程池产生,由此,我们可以通过调节线程池来控制队列监听的速率。
多个消费者(一个通道有多个消费者)的解决办法
单一消费者的问题相比于多个消费者来说还是较为简单,因为Java内置的锁都是只能控制自己程序的运行,不能干扰其他的程序的运行;然而现在很多时候我们都是在分布式环境下进行开发,这时处理多个消费者的情况就很有意义了。
那么这种问题如何解决呢?分布式锁。
下面来简要科普一下什么是分布式锁:
分布式锁是指在分布式环境下,同一时间只有一个客户端能够从某个共享环境中(例如redis)获取到锁,只有获取到锁的客户端才能执行程序。
然后分布式锁一般要满足:排他性(即同一时间只有一个客户端能够获取到锁)、避免死锁(即超时后自动释放)、高可用(即获取或释放锁的机制必须高可用且性能佳)
上面讲依赖的时候,我们导入了一个spring-integration-redis依赖,这个依赖里面包含了很多实用的工具类,而我们接下来要讲的分布式锁就是这个依赖下面的一个工具包RedisLockRegistry。
首先讲一下如何使用,导入了依赖以后,首先配置一个Bean
@Bean public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) { return new RedisLockRegistry(factory, "demo-lock",60); }
RedisLockRegistry的构造函数,第一个参数是redis连接池,第二个参数是锁的前缀,即取出的锁,键名为“demo-lock:KEY_NAME”,第三个参数为锁的过期时间(秒),默认为60秒,当持有锁超过该时间后自动过期。
使用锁的方法,下面是对监听器的修改
@Component public class RedisListener implement MessageListener{ @Autowrite private RedisLockRegistry redisLockRegistry; private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); @Override public void onMessage(Message message,byte[] pattern){ Lock lock=redisLockRegistry.obtain("lock"); try{ lock.lock(); //上锁 LOGGER.debug("从消息通道={}监听到消息",new String(pattern)); LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel())); LOGGER.debug("元消息={}",new String(message.getBody())); // 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样 // 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer // 所以这里的实现方式为GenericJackson2JsonRedisSerializer RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody())); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); //解锁 } } }
上面代码的代码比起前面的监听器代码,只是多了一个注入的RedisLockRegistry,一个通过redisLockRegistry.obtain()方法获取锁,一个加锁一个解锁,然后这就完成了分布式锁的使用。
注意这个获取锁的方法redisLockRegistry.obtain(),其返回的是一个名为RedisLock的锁,这是一个私有内部类,它实现了Lock接口,因此我们不能从代码外部创建一个他的实例,只能通过obtian()方法来获取这个锁。
整理了一份《Java面试核心知识点视频与PDF》关注私信回复:555获取
相关推荐
- Java面试宝典之问答系列(java面试回答)
-
以下内容,由兆隆IT云学院就业部根据多年成功就业服务经验提供:1.写出从数据库表Custom中查询No、Name、Num1、Num2并将Name以姓名显示、计算出的和以总和显示的SQL。SELECT...
- ADG (Active Data Guard) 数据容灾架构下,如何配置 Druid 连接池?
-
如上图的数据容灾架构下,上层应用如果使用Druid连接池,应该如何配置,才能在数据库集群节点切换甚至主备数据中心站点切换的情况下,上层应用不需要变动(无需修改配置也无需重启);即数据库节点宕机/...
- SpringBoot多数据源dynamic-datasource快速入门
-
一、简介dynamic-datasourc是一个基于SpringBoot的快速集成多数据源的启动器,其主要特性如下:支持数据源分组,适用于多种场景纯粹多库读写分离一主多从混合模式。支持...
- SpringBoot项目快速开发框架JeecgBoot——项目简介及系统架构!
-
项目简介及系统架构JeecgBoot是一款基于SpringBoot的开发平台,它采用前后端分离架构,集成的框架有SpringBoot2.x、SpringCloud、AntDesignof...
- 常见文件系统格式有哪些(文件系统类型有哪几种)
-
PART.01常见文件系统格式有哪些常见的文件系统格式有很多,通常根据使用场景(Windows、Linux、macOS、移动设备、U盘、硬盘等)有所不同。以下是一些主流和常见的文件系统格式及其特点:一...
- Oracle MySQL Operator部署集群(oracle mysql group by)
-
以下是使用OracleMySQLOperator部署MySQL集群的完整流程及关键注意事项:一、部署前准备安装MySQLOperator通过Helm安装Operator到Ku...
- LibreOffice加入"转向Linux"运动
-
LibreOffice项目正准备削减部分Windows支持,并鼓励用户切换到Linux系统。自Oracle放弃OpenOffice后,支持和指导LibreOffice开发的文档基金会对未来有着明确的观...
- Oracle Linux 10发布:UEK 8.1、后量子加密、增强开发工具等
-
IT之家6月28日消息,科技媒体linuxiac昨日(6月27日)发布博文,报道称OracleLinux10正式发布,完全二进制兼容(binarycompatibility...
- 【mykit-data】 数据库同步工具(数据库同步工具 开源)
-
项目介绍支持插件化、可视化的数据异构中间件,支持的数据异构方式如下MySQL<——>MySQL(增量、全量)MySQL<——>Oracle(增量、全量)Oracle...
- oracle关于xml的解析(oracle读取xml节点的属性值)
-
有时需要在存储过程中处理xml,oracle提供了相应的函数来进行处理,xmltype以及相关的函数。废话少说,上代码:selectxmltype(SIConfirmOutput).extract...
- 如何利用DBSync实现数据库同步(通过dblink同步数据库)
-
DBSync是一款通用型的数据库同步软件,能侦测数据表之间的差异,能实时同步差异数据,从而使双方始终保持一致。支持各种数据库,支持异构同步、增量同步,且提供永久免费版。本文介绍其功能特点及大致用法,供...
- MYSQL存储引擎InnoDB(八十):InnoDB静态数据加密
-
InnoDB支持独立表空间、通用表空间、mysql系统表空间、重做日志和撤消日志的静态数据加密。从MySQL8.0.16开始,还支持为模式和通用表空间设置加密默认值,这允许DBA控制在这些模...
- JDK高版本特性总结与ZGC实践(jdk高版本兼容低版本吗)
-
美团信息安全技术团队核心服务升级JDK17后,性能与稳定性大幅提升,机器成本降低了10%。高版本JDK与ZGC技术令人惊艳,且JavaAISDK最低支持JDK17。本文总结了JDK17的主要...
- 4 种 MySQL 同步 ES 方案,yyds!(两个mysql数据库自动同步的方法)
-
本文会先讲述数据同步的4种方案,并给出常用数据迁移工具,干货满满!不BB,上文章目录:1.前言在实际项目开发中,我们经常将MySQL作为业务数据库,ES作为查询数据库,用来实现读写分离,...
- 计算机Java培训课程包含哪些内容?其实就这六大块
-
不知不觉秋天已至,如果你还处于就业迷茫期,不如来学习Java。对于非科班小白来说,Java培训会更适合你。提前了解下计算机Java培训课程内容,会有助于你后续学习。下面,我就从六个部分为大家详细介绍...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Java面试宝典之问答系列(java面试回答)
- ADG (Active Data Guard) 数据容灾架构下,如何配置 Druid 连接池?
- SpringBoot多数据源dynamic-datasource快速入门
- SpringBoot项目快速开发框架JeecgBoot——项目简介及系统架构!
- 常见文件系统格式有哪些(文件系统类型有哪几种)
- Oracle MySQL Operator部署集群(oracle mysql group by)
- LibreOffice加入"转向Linux"运动
- Oracle Linux 10发布:UEK 8.1、后量子加密、增强开发工具等
- 【mykit-data】 数据库同步工具(数据库同步工具 开源)
- oracle关于xml的解析(oracle读取xml节点的属性值)
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)