百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

基于redis订阅消息和websocket技术实现的消息推送功能

mhr18 2024-10-29 14:35 37 浏览 0 评论

相关依赖文件

创建Redis消息监听者容器

@Configuration //相当于xml中的beans
public class RedisConfig {
 /**
 * 需要手动注册RedisMessageListenerContainer加入IOC容器
 * @author lijt
 * @return
 */
 @Bean
 public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 //订阅了一个叫chat 的通道
 container.addMessageListener(new MessageListener(){
 @Override
 public void onMessage(Message message, byte[] pattern) {
 String msg = new String(message.getBody());
 System.out.println(new String(pattern) + "主题发布:" + msg);
 }
 }, new PatternTopic("TOPIC"));
 return container;
 }
}

创建Websocket配置类

@Configuration
public class WebSocketConfig {
 @Bean
 public ServerEndpointExporter serverEndpointExporter() {
 return new ServerEndpointExporter();
 }
}
 这个配置类的作用是要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
	本文的例子是采用的springboot的内置tomcat容器,所以还是要创建这个配置类,作用就是注入ServerEndpointExporter。
	
	

创建消息订阅监听者类

public class SubscribeListener implements MessageListener {
 private Session session;
 public Session getSession() { return session; }
 public void setSession(Session session) {
 this.session = session;
 }
 /**
 * 接收发布者的消息
 */
 @Override
 public void onMessage(Message message, byte[] pattern) {
 String msg = new String(message.getBody());
 System.out.println(new String(pattern) + "主题发布:" + msg);
 if (null != session && session.isOpen()) {
 try {
 session.getBasicRemote().sendText(msg);
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 }
}
 这个消息订阅监听者类持有websocket的客户端会话对象(session),当接收到订阅的消息时,通过这个会话对象(session)将消息发送到前端,从而实现消息的主动推送。
	

创建Websocket服务端类

@Component
@ServerEndpoint("/websocket/server")
public class WebSocketServer {
 /**
 * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
 */
 private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
 //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
 private static AtomicInteger onlineCount=new AtomicInteger(0);
 //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
 //与某个客户端的连接会话,需要通过它来给客户端发送数据
 private Session session;
 private SubscribeListener subscribeListener;
 /**
 * 连接建立成功调用的方法
 * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
 */
 @OnOpen
 public void onOpen(Session session){
 this.session = session;
 webSocketSet.add(this); //加入set中
 addOnlineCount(); //在线数加1
 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
 subscribeListener = new SubscribeListener();
 subscribeListener.setSession(session);
 //设置订阅topic
 redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC"));
 }
 /**
 * 连接关闭调用的方法
 */
 @OnClose
 public void onClose() throws IOException {
 webSocketSet.remove(this); //从set中删除
 subOnlineCount(); //在线数减1
 redisMessageListenerContainer.removeMessageListener(subscribeListener);
 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
 }
 /**
 * 收到客户端消息后调用的方法
 * @param message 客户端发送过来的消息
 * @param session 可选的参数
 */
 @OnMessage
 public void onMessage(String message, Session session) {
 System.out.println("来自客户端的消息:" + message);
 //群发消息
 for(WebSocketServer item: webSocketSet){
 try {
 item.sendMessage(message);
 } catch (IOException e) {
 e.printStackTrace();
 continue;
 }
 }
 }
 /**
 * 发生错误时调用
 * @param session
 * @param error
 */
 @OnError
 public void onError(Session session, Throwable error){
 System.out.println("发生错误");
 error.printStackTrace();
 }
 /**
 * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
 * @param message
 * @throws IOException
 */
 public void sendMessage(String message) throws IOException {
 this.session.getBasicRemote().sendText(message);
 }
 public int getOnlineCount() {
 return onlineCount.get();
 }
 public void addOnlineCount() {
 WebSocketServer.onlineCount.getAndIncrement();
 }
 public void subOnlineCount() {
 WebSocketServer.onlineCount.getAndDecrement();
 }
}
@ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。注意的是在客户端链接关闭的方法onClose中,一定要 删除之前的订阅监听对象,就是下面这行代码:redisMessageListenerContainer.removeMessageListener(subscribeListener);
 否则在浏览器刷一下之后,后台会报如下错误:
java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session
原因就是当链接关闭之后,session对象就没有了,而订阅者对象还是会接收消息,在用session对象发送消息时会报错。虽然代码中加了判断if (null != session && session.isOpen()) { 可以避免报错,但是为了防止内存泄漏,应该把没有用的监听者对象从容器中删除。

创建前端页面

在resource\static目录下创建html页面,命名为websocket.html。代码如下:

 <!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
 <meta charset="utf-8"></meta>
 <title>websocket</title>
</head>
<h4>
使用redis订阅消息和websocket实现消息推送
</h4>
<br/>
<h5>收到的订阅消息:</h5>
<div id="message_id"></div>
</body>
<script type="text/javascript">
 var websocket = null;
 //当前浏览前是否支持websocket
 if("WebSocket" in window){
 var url = "ws://localhost:8080/demo/websocket/server";
 websocket = new WebSocket(url);
 }else{
 alert("浏览器不支持websocket");
 }
 websocket.onopen = function(event){
 setMessage("打开连接");
 }
 websocket.onclose = function(event){
 setMessage("关闭连接");
 }
 websocket.onmessage = function(event){
 setMessage(event.data);
 }
 websocket.onerror = function(event){
 setMessage("连接异常");
 }
 //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
 window.onbeforeunload = function(){
 closeWebsocket();
 }
 //关闭websocket
 function closeWebsocket(){
 //3代表已经关闭
 if(3!=websocket.readyState){
 websocket.close();
 }else{
 alert("websocket之前已经关闭");
 }
 }
 //将消息显示在网页上
 function setMessage(message){
 document.getElementById('message_id').innerHTML += message + '<br/>';
 }
</script>
</html>

启动服务进行测试

1. 启动springboot服务,浏览器输入地址:http://localhost:8080/demo/websocket.html,此时页面显示如下:

2.打开redis客户端,在命令行输入publish TOPIC “this is test message”

浏览器页面显示如下:

说明刚刚发布的消息已经主动推送到浏览器显示了。

完整代码见: https://gitee.com/freide/springboot

相关推荐

Redis合集-使用benchmark性能测试

采用开源Redis的redis-benchmark工具进行压测,它是Redis官方的性能测试工具,可以有效地测试Redis服务的性能。本次测试使用Redis官方最新的代码进行编译,详情请参见Redis...

Java简历总被已读不回?面试挂到怀疑人生?这几点你可能真没做好

最近看了几十份简历,发现大部分人不是技术差,而是不会“卖自己”——一、简历死穴:你写的不是经验,是岗位说明书!反面教材:ד使用SpringBoot开发项目”ד负责用户模块功能实现”救命写法:...

redission YYDS(redission官网)

每天分享一个架构知识Redission是一个基于Redis的分布式Java锁框架,它提供了各种锁实现,包括可重入锁、公平锁、读写锁等。使用Redission可以方便地实现分布式锁。red...

从数据库行锁到分布式事务:电商库存防超卖的九重劫难与破局之道

2023年6月18日我们维护的电商平台在零点刚过3秒就遭遇了严重事故。监控大屏显示某爆款手机SKU_IPHONE13_PRO_MAX在库存仅剩500台时,订单系统却产生了1200笔有效订单。事故复盘发...

SpringBoot系列——实战11:接口幂等性的形而上思...

欢迎关注、点赞、收藏。幂等性不仅是一种技术需求,更是数字文明对确定性追求的体现。在充满不确定性的网络世界中,它为我们建立起可依赖的存在秩序,这或许正是技术哲学最深刻的价值所在。幂等性的本质困境在支付系...

如何优化系统架构设计缓解流量压力提升并发性能?Java实战分享

如何优化系统架构设计缓解流量压力提升并发性能?Java实战分享在高流量场景下。首先,我需要回忆一下常见的优化策略,比如负载均衡、缓存、数据库优化、微服务拆分这些。不过,可能还需要考虑用户的具体情况,比...

Java面试题: 项目开发中的有哪些成长?该如何回答

在Java面试中,当被问到“项目中的成长点”时,面试官不仅想了解你的技术能力,更希望看到你的问题解决能力、学习迭代意识以及对项目的深度思考。以下是回答的策略和示例,帮助你清晰、有说服力地展示成长点:一...

互联网大厂后端必看!Spring Boot 如何实现高并发抢券逻辑?

你有没有遇到过这样的情况?在电商大促时,系统上线了抢券活动,结果活动刚一开始,服务器就不堪重负,出现超卖、系统崩溃等问题。又或者用户疯狂点击抢券按钮,最后却被告知无券可抢,体验极差。作为互联网大厂的后...

每日一题 |10W QPS高并发限流方案设计(含真实代码)

面试场景还原面试官:“如果系统要承载10WQPS的高并发流量,你会如何设计限流方案?”你:“(稳住,我要从限流算法到分布式架构全盘分析)…”一、为什么需要限流?核心矛盾:系统资源(CPU/内存/数据...

Java面试题:服务雪崩如何解决?90%人栽了

服务雪崩是指微服务架构中,由于某个服务出现故障,导致故障在服务之间不断传递和扩散,最终造成整个系统崩溃的现象。以下是一些解决服务雪崩问题的常见方法:限流限制请求速率:通过限流算法(如令牌桶算法、漏桶算...

面试题官:高并发经验有吗,并发量多少,如何回复?

一、有实际高并发经验(建议结构)直接量化"在XX项目中,系统日活用户约XX万,核心接口峰值QPS达到XX,TPS处理能力为XX/秒。通过压力测试验证过XX并发线程下的稳定性。"技术方案...

瞬时流量高并发“保命指南”:这样做系统稳如泰山,老板跪求加薪

“系统崩了,用户骂了,年终奖飞了!”——这是多少程序员在瞬时大流量下的真实噩梦?双11秒杀、春运抢票、直播带货……每秒百万请求的冲击,你的代码扛得住吗?2025年了,为什么你的系统一遇高并发就“躺平”...

其实很多Java工程师不是能力不够,是没找到展示自己的正确姿势。

其实很多Java工程师不是能力不够,是没找到展示自己的正确姿势。比如上周有个小伙伴找我,五年经验但简历全是'参与系统设计''优化接口性能'这种空话。我就问他:你做的秒杀...

PHP技能评测(php等级考试)

公司出了一些自我评测的PHP题目,现将题目和答案记录于此,以方便记忆。1.魔术函数有哪些,分别在什么时候调用?__construct(),类的构造函数__destruct(),类的析构函数__cal...

你的简历在HR眼里是青铜还是王者?

你的简历在HR眼里是青铜还是王者?兄弟,简历投了100份没反应?面试总在第三轮被刷?别急着怀疑人生,你可能只是踩了这些"隐形求职雷"。帮3630+程序员改简历+面试指导和处理空窗期时间...

取消回复欢迎 发表评论: