基于redis订阅消息和websocket技术实现的消息推送功能
mhr18 2024-10-29 14:35 46 浏览 0 评论
相关依赖文件
创建Redis消息监听者容器
@Configuration //相当于xml中的beans public class RedisConfig { /** * 需要手动注册RedisMessageListenerContainer加入IOC容器 * @author lijt * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat 的通道 container.addMessageListener(new MessageListener(){ @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); System.out.println(new String(pattern) + "主题发布:" + msg); } }, new PatternTopic("TOPIC")); return container; } }
创建Websocket配置类
@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 这个配置类的作用是要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。 本文的例子是采用的springboot的内置tomcat容器,所以还是要创建这个配置类,作用就是注入ServerEndpointExporter。
创建消息订阅监听者类
public class SubscribeListener implements MessageListener { private Session session; public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } /** * 接收发布者的消息 */ @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); System.out.println(new String(pattern) + "主题发布:" + msg); if (null != session && session.isOpen()) { try { session.getBasicRemote().sendText(msg); } catch (IOException e) { e.printStackTrace(); } } } } 这个消息订阅监听者类持有websocket的客户端会话对象(session),当接收到订阅的消息时,通过这个会话对象(session)将消息发送到前端,从而实现消息的主动推送。
创建Websocket服务端类
@Component @ServerEndpoint("/websocket/server") public class WebSocketServer { /** * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例 */ private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class); //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static AtomicInteger onlineCount=new AtomicInteger(0); //concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private SubscribeListener subscribeListener; /** * 连接建立成功调用的方法 * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session){ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); subscribeListener = new SubscribeListener(); subscribeListener.setSession(session); //设置订阅topic redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC")); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() throws IOException { webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1 redisMessageListenerContainer.removeMessageListener(subscribeListener); System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @OnMessage public void onMessage(String message, Session session) { System.out.println("来自客户端的消息:" + message); //群发消息 for(WebSocketServer item: webSocketSet){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } } } /** * 发生错误时调用 * @param session * @param error */ @OnError public void onError(Session session, Throwable error){ System.out.println("发生错误"); error.printStackTrace(); } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } public int getOnlineCount() { return onlineCount.get(); } public void addOnlineCount() { WebSocketServer.onlineCount.getAndIncrement(); } public void subOnlineCount() { WebSocketServer.onlineCount.getAndDecrement(); } } @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。注意的是在客户端链接关闭的方法onClose中,一定要 删除之前的订阅监听对象,就是下面这行代码:redisMessageListenerContainer.removeMessageListener(subscribeListener); 否则在浏览器刷一下之后,后台会报如下错误: java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session 原因就是当链接关闭之后,session对象就没有了,而订阅者对象还是会接收消息,在用session对象发送消息时会报错。虽然代码中加了判断if (null != session && session.isOpen()) { 可以避免报错,但是为了防止内存泄漏,应该把没有用的监听者对象从容器中删除。
创建前端页面
在resource\static目录下创建html页面,命名为websocket.html。代码如下:
<!doctype html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="utf-8"></meta> <title>websocket</title> </head> <h4> 使用redis订阅消息和websocket实现消息推送 </h4> <br/> <h5>收到的订阅消息:</h5> <div id="message_id"></div> </body> <script type="text/javascript"> var websocket = null; //当前浏览前是否支持websocket if("WebSocket" in window){ var url = "ws://localhost:8080/demo/websocket/server"; websocket = new WebSocket(url); }else{ alert("浏览器不支持websocket"); } websocket.onopen = function(event){ setMessage("打开连接"); } websocket.onclose = function(event){ setMessage("关闭连接"); } websocket.onmessage = function(event){ setMessage(event.data); } websocket.onerror = function(event){ setMessage("连接异常"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ closeWebsocket(); } //关闭websocket function closeWebsocket(){ //3代表已经关闭 if(3!=websocket.readyState){ websocket.close(); }else{ alert("websocket之前已经关闭"); } } //将消息显示在网页上 function setMessage(message){ document.getElementById('message_id').innerHTML += message + '<br/>'; } </script> </html>
启动服务进行测试
1. 启动springboot服务,浏览器输入地址:http://localhost:8080/demo/websocket.html,此时页面显示如下:
2.打开redis客户端,在命令行输入publish TOPIC “this is test message”
浏览器页面显示如下:
说明刚刚发布的消息已经主动推送到浏览器显示了。
完整代码见: https://gitee.com/freide/springboot
相关推荐
- Java培训机构,你选对了吗?(java培训机构官网)
-
如今IT行业发展迅速,不仅是大学生,甚至有些在职的员工都想学习java开发,需求量的扩大,薪资必定增长,这也是更多人选择java开发的主要原因。不过对于没有基础的学员来说,java技术不是一两天就能...
- 产品经理MacBook软件清单-20个实用软件
-
三年前开始使用MacBookPro,从此再也不想用Windows电脑了,作为生产工具,MacBook可以说是非常胜任。作为产品经理,值得拥有一台MacBook。MacBook是工作平台,要发挥更大作...
- RAD Studio(Delphi) 本月隆重推出新的版本12.3
-
#在头条记录我的2025#自2024年9月,推出Delphi12.2版本后,本月隆重推出新的版本12.3,RADStudio12.3,包含了Delphi12.3和C++builder12.3最...
- 图解Java垃圾回收机制,写得非常好
-
什么是自动垃圾回收?自动垃圾回收是一种在堆内存中找出哪些对象在被使用,还有哪些对象没被使用,并且将后者删掉的机制。所谓使用中的对象(已引用对象),指的是程序中有指针指向的对象;而未使用中的对象(未引用...
- Centos7 初始化硬盘分区、挂载(针对2T以上)添加磁盘到卷
-
1、通过命令fdisk-l查看硬盘信息:#fdisk-l,发现硬盘为/dev/sdb大小4T。2、如果此硬盘以前有过分区,则先对磁盘格式化。命令:mkfs.文件系统格式-f/dev/sdb...
- 半虚拟化如何提高服务器性能(虚拟化 半虚拟化)
-
半虚拟化是一种重新编译客户机操作系统(OS)将其安装在虚拟机(VM)上的一种虚拟化类型,并在主机操作系统(OS)运行的管理程序上运行。与传统的完全虚拟化相比,半虚拟化可以减少开销,并提高系统性能。虚...
- HashMap底层实现原理以及线程安全实现
-
HashMap底层实现原理数据结构:HashMap的底层实现原理主要依赖于数组+链表+红黑树的结构。1、数组:HashMap最底层是一个数组,称为table,它存放着键值对。2、链...
- long和double类型操作的非原子性探究
-
前言“深入java虚拟机”中提到,int等不大于32位的基本类型的操作都是原子操作,但是某些jvm对long和double类型的操作并不是原子操作,这样就会造成错误数据的出现。其实这里的某些jvm是指...
- 数据库DELETE 语句,还保存原有的磁盘空间
-
MySQL和Oracle的DELETE语句与数据存储MySQL的DELETE操作当你在MySQL中执行DELETE语句时:逻辑删除:数据从表中标记为删除,不再可见于查询结果物理...
- 线程池—ThreadPoolExecutor详解(线程池实战)
-
一、ThreadPoolExecutor简介在juc-executors框架概述的章节中,我们已经简要介绍过ThreadPoolExecutor了,通过Executors工厂,用户可以创建自己需要的执...
- navicat如何使用orcale(详细步骤)
-
前言:看过我昨天文章的同鞋都知道最近接手另一个国企项目,数据库用的是orcale。实话实说,也有快三年没用过orcale数据库了。这期间问题不断,因为orcale日渐消沉,网上资料也是真真假假,难辨虚...
- 你的程序是不是慢吞吞?GraalVM来帮你飞起来性能提升秘籍大公开
-
各位IT圈内外的朋友们,大家好!我是你们的老朋友,头条上的IT技术博主。不知道你们有没有这样的经历:打开一个软件,半天没反应;点开一个网站,图片刷不出来;或者玩个游戏,卡顿得想砸电脑?是不是特别上火?...
- 大数据正当时,理解这几个术语很重要
-
目前,大数据的流行程度远超于我们的想象,无论是在云计算、物联网还是在人工智能领域都离不开大数据的支撑。那么大数据领域里有哪些基本概念或技术术语呢?今天我们就来聊聊那些避不开的大数据技术术语,梳理并...
- 秒懂列式数据库和行式数据库(列式数据库的特点)
-
行式数据库(Row-Based)数据按行存储,常见的行式数据库有Mysql,DB2,Oracle,Sql-server等;列数据库(Column-Based)数据存储方式按列存储,常见的列数据库有Hb...
- AMD发布ROCm 6.4更新:带来了多项底层改进,但仍不支持RDNA 4
-
AMD宣布,对ROCm软件栈进行了更新,推出了新的迭代版本ROCm6.4。这一新版本里,AMD带来了多项底层改进,包括更新改进了ROCm的用户空间库和AMDKFD内核驱动程序之间的兼容性,使其更容易...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)