百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

基于Netty+redis实现websocket集群方案

mhr18 2024-11-13 11:07 13 浏览 0 评论

一 背景

最近公司在做标注系统,有些用户之间需要进行消息的推送,比如管理员分配了一个任务给标注员时,需要给标注员推送一条消息

之前项目中用的spring-boot自带的websocket,spring-boot-starter-websocket集成,代码臃肿,性能不是很好

这次打算用netty来实现websocket,因为应用后期可能部署集群,用redis的sub pub功能来实现集群(用ZK也可以实现)

二 实现

1 当用户连接上来时,需要保存用户与channel的映射关系

2 当A用户需要向B用户推送一条消息的时候,需要根据发布到所有节点,看看B用户在那台实例上

三 代码实现

代码结构:netty包下,如下图所示:

一 、netty服务端启动类:

package com.minivision.label.management.netty;
 
import com.minivision.label.management.util.SysThreadPool;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
 
/**
 * @program: label-management-system-backend
 * @description: netty工具类
 * @author: wangjinwei
 * @create: 2022-01-04 09:34
 **/
@Component
@Slf4j
public class WebSocketServer {
    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
 
    /**
     * 端口号
     */
    @Value("${netty.server.port:8782}")
    private int port;
 
    private String path = "/home";
 
    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private LoggingHandler LOGGING_HANDLER;
 
 
    /**
     * 启动
     *
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
 
 
        // bossGroup就是parentGroup,是负责处理TCP/IP连接的
        bossGroup = new NioEventLoopGroup();
        // workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
        workGroup = new NioEventLoopGroup();
        //日志打印
        LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
        ServerBootstrap sb = new ServerBootstrap();
        //设置全连接队列大小
        sb.option(ChannelOption.SO_BACKLOG, 1024);
        sb.group(workGroup, bossGroup)
                .channel(NioServerSocketChannel.class)
                .localAddress(this.port)
                .childHandler(new WebSocketInitializer(path,WEBSOCKET_PROTOCOL,LOGGING_HANDLER));
 
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = sb.bind(port).sync();
        log.info("【Netty服务启动成功========端口:" + port + "】");
        log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
        // 成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
        channelFuture.channel().closeFuture().sync();
 
    }
 
    /**
     * 释放资源
     *
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
 
    @PostConstruct()
    public void init() {
        // 需要开启一个新的线程来执行netty server 服务器
        SysThreadPool.getThread().execute(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                log.error("【Netty服务启动失败】" + e.getMessage(), e);
                Thread.currentThread().interrupt();
            }
        });
    }
}

二、handler初始化类:

package com.minivision.label.management.netty;
 
import com.minivision.label.management.netty.handler.AuthHandler;
import com.minivision.label.management.netty.handler.LoginRequestHandler;
import com.minivision.label.management.netty.handler.ServerHeartBeatHandler;
import com.minivision.label.management.netty.handler.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
 
import java.util.concurrent.TimeUnit;
 
/**
 * @program: label-management-system-backend
 * @description: 初始化handler
 * @author: wangjinwei
 * @create: 2022-01-04 15:50
 **/
@Slf4j
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
 
 
    private String path;
 
    /**
     * webSocket协议名
     */
    private String protocol;
 
    private LoggingHandler loggingHandler;
 
    public WebSocketInitializer(String path, String protocol, LoggingHandler loggingHandler) {
        this.path = path;
        this.protocol = protocol;
        this.loggingHandler = loggingHandler;
    }
 
    @Override
    protected void initChannel(SocketChannel ch) {
        log.info("收到新连接:" + ch.localAddress());
        // websocket协议本身是基于Http协议的,所以需要Http解码器
        ch.pipeline().addLast("http-codec", new HttpServerCodec());
        //netty中的日志handler
        ch.pipeline().addLast(loggingHandler);
        // 以块的方式来写的处理器
        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        //HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度,将HTTP消息的多个部分合成一条完整的HTTP消息 说明:
        // 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
        /*
         * 说明:
         * 1、对应webSocket,它的数据是以帧(frame)的形式传递
         * 2、浏览器请求时 ws://localhost:58080/xxx表示请求的uri
         * 3、核心功能是将http协议升级为ws协议,保持长连接
         * 这个的作用主要是用来解决HTTP握手等问题。虽然可以自己实现,但是推荐采用这个默认的handler,它能够解决很多未知的问题。
         */
        ch.pipeline().addLast(new WebSocketServerProtocolHandler(path, protocol, true, 65536 * 10));
        // 进行设置心跳检测 0表示不监控
        ch.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
        //心跳事件
        ch.pipeline().addLast(new ServerHeartBeatHandler());
        // 配置通道处理 来进行业务处理
        ch.pipeline().addLast(new WebSocketHandler());
        //登录处理
        ch.pipeline().addLast(new LoginRequestHandler());
        //后面互发消息可以插拔式校验
        ch.pipeline().addLast(new AuthHandler());
    }
}

三、业务handler:

package com.minivision.label.management.netty.handler;
 
import com.minivision.label.management.netty.protocol.Packet;
import com.minivision.label.management.netty.protocol.PacketCodeC;
import com.minivision.label.management.netty.util.SessionUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
/**
 * @program: label-management-system-backend
 * @description: netty业务类
 * @author: wangjinwei
 * @create: 2022-01-04 09:34
 **/
@Component
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isWritable()) {
            log.info("channel is writable try to continue flush....");
            ctx.flush();
        }
        ctx.fireChannelWritabilityChanged();
    }
 
    /**
     * 客户端与服务器建立连接时触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("与客户端建立连接,通道开启!channelActive 被调用" + ctx.channel().id().asLongText());
    }
 
    /**
     * 客户端与服务器关闭连接时触发
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String channelLongId = ctx.channel().id().asLongText();
        log.info("channelInactive 被调用" + channelLongId);
        // 删除通道
        SessionUtil.unBindSession(ctx.channel());
    }
 
    /**
     * 服务器接收客户端的数据消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String content = msg.text();
        log.info("服务器收到消息:{}", content);
        Packet packet = PacketCodeC.INSTANCE.getPacket(content);
        if (packet == null) {
            log.error("WebSocketHandler.channelRead0 decode error content={}", content);
            ctx.channel().close();
        } else {
 
            ctx.fireChannelRead(packet);
        }
    }
 
    /**
     * description token校验
     *
     * @param isAppClient 是否是APP连接
     * @param token       token
     * @return boolean
     * @author wangjinwei
     * @date 2019年2月18日下午7:49:32
     */
    private boolean checkToken(String token, Boolean isAppClient) {
        return true;
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("", cause);
        super.exceptionCaught(ctx, cause);
    }
}
 
 
package com.minivision.label.management.netty.handler;
 
import com.alibaba.fastjson.JSON;
import com.minivision.admin.center.open.api.TokenVerifyServiceApi;
import com.minivision.admin.center.open.api.dto.TokenVerifyReq;
import com.minivision.admin.center.open.api.dto.TokenVerifyResp;
import com.minivision.label.management.dto.LoginUser;
import com.minivision.label.management.netty.protocol.LoginRequestPacket;
import com.minivision.label.management.netty.protocol.LoginResponsePacket;
import com.minivision.label.management.netty.util.Attributes;
import com.minivision.label.management.netty.util.SessionUtil;
import com.minivision.label.management.util.SpringUtil;
import com.minivision.maiot.common.base.dto.ResultDTO;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
 
import java.util.stream.Collectors;
 
/**
 * @program: label-management-system-backend
 * @description:登陆请求处理
 * @author: wangjinwei
 * @create: 2022-01-04 09:34
 **/
@Slf4j
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) throws Exception {
        log.info("收到客户端请求-----------------");
        LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
        LoginUser loginUser = getLoginUser(loginRequestPacket.getMtk());
        if (loginUser != null) {
            ctx.channel().attr(Attributes.LOGINUSER).set(loginUser);
            SessionUtil.bindSession(loginUser, ctx.channel());
            log.info("LoginRequestHandler---userId={},userName={} : 登录成功!", loginUser.getUserId(), loginUser.getUserName());
 
            loginResponsePacket.setSuccess(true);
            loginResponsePacket.setUserId(loginUser.getUserId());
            loginResponsePacket.setUserName(loginUser.getUserName());
            //ctx.fireChannelRead(loginRequestPacket.retain()); 这样可以传递到下一个handler继续处理
        } else {
            loginResponsePacket.setSuccess(false);
            loginResponsePacket.setReason("账号密码校验失败");
            log.info("LoginRequestHandler--- {}, 登录失败!" + loginRequestPacket.getMtk());
        }
        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(loginResponsePacket)));
 
        if (loginUser == null) {
            log.info("LoginRequestHandler--- {}, 登录失败!关闭channel" + loginRequestPacket.getMtk());
            ctx.channel().close();
        }
 
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionUtil.unBindSession(ctx.channel());
    }
 
    private LoginUser getLoginUser(String mtk) {
        TokenVerifyReq tokenVerifyReq = new TokenVerifyReq();
        tokenVerifyReq.setToken(mtk);
        TokenVerifyServiceApi tokenVerifyServiceApi = SpringUtil.getBean(TokenVerifyServiceApi.class);
        ResultDTO<TokenVerifyResp> tokenVerifyResp = tokenVerifyServiceApi.verifyToken(tokenVerifyReq);
        TokenVerifyResp resData = tokenVerifyResp.getResData();
        if (tokenVerifyResp.success() && resData.getStatus() == TokenVerifyResp.ReplyStatus.OK) {
            // 将认证后的信息传到后端服务
            LoginUser loginUser = new LoginUser();
            loginUser.setUserId(resData.getUserId());
            loginUser.setUserName(resData.getUserName());
            loginUser.setRoleIds(resData.getRoles().stream().map(TokenVerifyResp.UserRoleInfo::getRoleId).collect(Collectors.toList()));
            return loginUser;
        }
        return null;
    }
}
 
 
 
package com.minivision.label.management.netty.handler;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
 
/**
 * @program: label-management-system-backend
 * @description: 心跳
 * @author: wangjinwei
 * @create: 2022-01-04 16:06
 **/
@Slf4j
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //超时事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleEvent = (IdleStateEvent) evt;
            if (idleEvent.state() == IdleState.READER_IDLE) {
                //关闭通道连接
                log.info("ServerHeartBeatHandler.reader_idle");
            } else if (idleEvent.state() == IdleState.WRITER_IDLE) {
                //写
                log.info("ServerHeartBeatHandler.writer_idle");
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}
 
 
 
 
package com.minivision.label.management.netty.handler;
 
import com.minivision.label.management.netty.util.SessionUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
 
 
/**
 * @program: label-management-system-backend
 * @description:权限可插拔
 * @author: wangjinwei
 * @create: 2021-12-28 10:52
 **/
@Slf4j
public class AuthHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("AuthHandler.channelRead start;");
        if (!SessionUtil.hasLogin(ctx.channel())) {
            ctx.channel().close();
        } else {
            ctx.pipeline().remove(this);
            super.channelRead(ctx, msg);
        }
    }
 
 
}

四、协议接口:

package com.minivision.label.management.netty.protocol.command;
 
 
/**
 * @program: label-management-system-backend
 * @description: 消息类型
 * @author: wangjinwei
 * @create: 2021-12-28 14:07
 **/
public interface Command {
    /**
     * 登录请求
     */
    Byte LOGIN_REQUEST = 1;
    /**
     *登录响应
     */
    Byte LOGIN_RESPONSE = 2;
    /**
     * 客户端 消息请求
     */
    Byte MESSAGE_REQUEST = 3;
    /**
     * 服务端消息 响应
     */
    Byte MESSAGE_RESPONSE = 4;
 
 
}
 
package com.minivision.label.management.netty.protocol;
 
import lombok.Data;
 
import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_REQUEST;
 
/**
 * @program: label-management-system-backend
 * @description: 登陆请求消息
 * @author: wangjinwei
 * @create: 2021-12-28 08:57
 **/
@Data
public class LoginRequestPacket extends Packet {
 
    private String mtk;
 
    @Override
    public Byte getCommand() {
        return LOGIN_REQUEST;
    }
}
 
 
package com.minivision.label.management.netty.protocol;
 
import lombok.Data;
 
import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_RESPONSE;
 
/**
 * @program: label-management-system-backend
 * @description: 登录响应消息体
 * @author: wangjinwei
 * @create: 2021-12-28 14:12
 **/
@Data
public class LoginResponsePacket extends Packet {
 
    /**
     * 失败原因
     */
    private String reason;
    /**
     * 是否成功
     */
    private boolean success;
    /**
     * 用户id
     */
    private String userId;
    /**
     * 用户名
     */
    private String userName;
 
    @Override
    public Byte getCommand() {
 
        return LOGIN_RESPONSE;
    }
}
 
 
package com.minivision.label.management.netty.protocol;
 
import lombok.Data;
import lombok.NoArgsConstructor;
 
import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_REQUEST;
 
/**
 * @program: label-management-system-backend
 * @description: 客户端消息请求
 * @author: wangjinwei
 * @create: 2021-12-28 16:15
 **/
@Data
@NoArgsConstructor
public class MessageRequestPacket extends Packet {
 
    private String toUserId;
    private String message;
 
    public MessageRequestPacket(String toUserId, String message) {
        this.message = message;
        this.toUserId = toUserId;
    }
 
    @Override
    public Byte getCommand() {
        return MESSAGE_REQUEST;
    }
}
 
package com.minivision.label.management.netty.protocol;
 
 
import lombok.Data;
import lombok.experimental.Accessors;
 
import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_RESPONSE;
 
/**
 * @program: label-management-system-backend
 * @description: 客户端消息响应
 * @author: wangjinwei
 * @create: 2021-12-28 17:09
 **/
@Accessors(chain = true)
@Data
public class MessageResponsePacket extends Packet {
 
    private String fromUserId;
    private String fromUserName;
    private Object message;
 
    @Override
    public Byte getCommand() {
        return MESSAGE_RESPONSE;
    }
}
 
 
package com.minivision.label.management.netty.protocol;
 
/**
 * @program: label-management-system-backend
 * @description: 消息体的父类
 * @author: wangjinwei
 * @create: 2021-12-28 10:43
 **/
public abstract class Packet {
    /**
     *版本
     **/
    private Byte version = 1;
 
    /**
     * 消息类型 --登录请求,登录响应、消息请求,消息响应
     *
     * @return
     */
    public abstract Byte getCommand();
}
 
 
package com.minivision.label.management.netty.protocol;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
 
import java.util.HashMap;
import java.util.Map;
 
import static com.minivision.label.management.netty.protocol.command.Command.*;
 
/**
 * @program: label-management-system-backend
 * @description: 消息解析
 * @author: wangjinwei
 * @create: 2021-12-28 14:07
 **/
@Slf4j
public class PacketCodeC {
 
    public static final PacketCodeC INSTANCE = new PacketCodeC();
 
    /**
     * 协议版本类型
     */
    private final Map<Byte, Class<? extends Packet>> packetTypeMap;
 
    private PacketCodeC() {
        packetTypeMap = new HashMap<>();
        packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
        packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
        packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
        packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
 
 
    }
 
    private Class<? extends Packet> getRequestType(byte command) {
        return packetTypeMap.get(command);
    }
 
    public Packet getPacket(String content) {
        try {
            JSONObject jsonObject = JSON.parseObject(content);
            Byte command = jsonObject.getByte("command");
            if (command == null) {
                log.error("PacketCodeC.getPacket command is null content={}", content);
                return null;
            }
            Class<? extends Packet> classType = getRequestType(command);
            if (classType == null) {
                return null;
            }
            Packet packet = jsonObject.toJavaObject(classType);
            return packet;
        } catch (Exception e) {
            log.error("PacketCodeC.getPacket", e);
        }
        return null;
    }
}

五、用户session

package com.minivision.label.management.netty.util;
 
import com.minivision.label.management.dto.LoginUser;
import io.netty.util.AttributeKey;
 
 
/**
 * @program: label-management-system-backend
 * @description:
 * @author: wangjinwei
 * @create: 2021-12-27 16:34
 **/
public interface Attributes {
 
    /**
     * 登陆标志
     */
    AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
    /**
     * 登陆用户信息
     */
    AttributeKey<LoginUser> LOGINUSER = AttributeKey.newInstance("LoginUser");
}
 
package com.minivision.label.management.netty.util;
 
import com.minivision.label.management.dto.LoginUser;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import lombok.extern.slf4j.Slf4j;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * @program: label-management-system-backend
 * @description: 登陆常量类
 * @author: wangjinwei
 * @create: 2021-12-28 11:09
 **/
@Slf4j
public class SessionUtil {
    /**
     * 用户channel对应关系
     */
    private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
 
    /**
     * 群组消息
     */
    private static final Map<String, ChannelGroup> groupIdChannelGroup = new ConcurrentHashMap<>();
 
    /**
     * 绑定用户与通道
     * @param loginUser
     * @param channel
     */
    public static void bindSession(LoginUser loginUser, Channel channel) {
        userIdChannelMap.put(loginUser.getUserId(), channel);
        channel.attr(Attributes.LOGINUSER).set(loginUser);
    }
 
    /**
     * 获取用户
     * @param channel
     * @return
     */
    public static LoginUser getSession(Channel channel) {
        return channel.attr(Attributes.LOGINUSER).get();
 
    }
 
    public static boolean hasLogin(Channel channel) {
        return getSession(channel) != null;
    }
 
    public static Channel getChannel(String userId) {
        return userIdChannelMap.get(userId);
    }
 
    public static void bindChannelGroup(String groupId, ChannelGroup channelGroup) {
        groupIdChannelGroup.put(groupId, channelGroup);
    }
 
    public static ChannelGroup getChannelGroup(String groupId) {
        return groupIdChannelGroup.get(groupId);
    }
 
    /**
     * 解除绑定关系
     * @param channel
     */
    public static void unBindSession(Channel channel) {
        if (hasLogin(channel)) {
            LoginUser session = getSession(channel);
            userIdChannelMap.remove(session.getUserId());
            channel.attr(Attributes.LOGINUSER).set(null);
            log.info(session + " 退出登录!");
        }
 
    }
 
    public static Map<String, Channel> getUserIdChannelMap() {
        return userIdChannelMap;
    }
}

六、redis 实现集群:直接调用RedisClient#convertAndSend方法

package com.minivision.label.management.redis;
 
import com.minivision.label.management.service.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC;
 
/**
 * @Description Redis消息监听者容器:
 * @author wangjinwei
 * @date 2021/4/29 15:02
 */
@SuppressWarnings({"all"})
@Configuration
public class RedisConfig {
 
 
    @Autowired
    private PushService pushService;
 
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     *
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
            , MessageListenerAdapter TopicAdapter, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //添加消息监听器
        container.addMessageListener(TopicAdapter, new PatternTopic(TASK_MSG_WS_TOPIC));
        return container;
    }
 
 
    /**
     * 消息监听器适配器,绑定消息处理器
     *
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter TopicAdapter() {
        return new MessageListenerAdapter(new TopicListener(pushService));
    }
 
}
 
 
package com.minivision.label.management.redis;
 
import com.alibaba.fastjson.JSON;
import com.minivision.label.management.netty.protocol.Packet;
import com.minivision.label.management.service.PushService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
 
/**
 *
 * @author wangjinwei<br>
 * @version 1.0<br>
 * @Description: 监听消息<br>
 * @CreateDate 2021/4/27 9:13
 */
@Slf4j
public class TopicListener implements MessageListener {
 
    private PushService pushService;
 
    public TopicListener(PushService pushService) {
        this.pushService = pushService;
    }
 
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String msg = message.toString();
        RedisWebsocketMsg<Packet> redisWebsocketMsg = JSON.parseObject(msg, RedisWebsocketMsg.class);
        pushService.pushMsg(redisWebsocketMsg.getReceiver(), JSON.toJSONString(redisWebsocketMsg.getContent()));
    }
}
 
 
 
 
package com.minivision.label.management.redis;
 
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
 
/**
 * Description:redis客户端操作
 *
 * @author wangjinwei
 * @date 2021/4/28
 */
@Slf4j
@Component
public class RedisClient {
 
 
    @Autowired
    private StringRedisTemplate redisTemplate;
 
 
    /**
     * @return void
     * @Description:发送群消息
     * @param: topicName
     * @param: redisWebsocketMsg
     * @author wangjinwei
     * @date 2021/4/30 11:13
     */
    public void convertAndSend(String topicName, RedisWebsocketMsg redisWebsocketMsg) {
        redisTemplate.convertAndSend(topicName, JSON.toJSONString(redisWebsocketMsg));
    }
 
 
}
 
package com.minivision.label.management.redis;
 
import lombok.Data;
import lombok.experimental.Accessors;
 
import java.io.Serializable;
import java.util.List;
 
/**
 * websocket redis广播消息
 * @author wangjinwei <br>
 * @version 1.0<br>
 * @CreateDate 2021/6/23 <br>
 */
@Accessors(chain = true)
@Data
public class RedisWebsocketMsg<T> implements Serializable {
    /**
     * 序列号
     */
    private static final long serialVersionUID = -1L;
    /**
     * 消息接收者
     */
    private List<String> receiver;
    /**
     * 内容
     */
    private T content;
}

七、最终暴露给其他模块调用的接口:pushService#sendMsg接口

package com.minivision.label.management.service;
 
import com.minivision.label.management.redis.RedisWebsocketMsg;
 
import java.util.List;
 
/**
 * @program: label-management-system-backend
 * @description:
 * @author: wangjinwei
 * @create: 2021-12-27 15:28
 **/
public interface PushService {
 
    /**
     * 推送给指定通道
     * @param userId channelId
     * @param msg msg
     */
    void pushMsgToOne(String userId, String msg);
 
    /**
     * 推送给指定用户
     * @param userIds channelId
     * @param msg msg
     */
    void pushMsg(List<String> userIds, String msg);
 
    /**
     * @Description: 将消息推送给单个用户
     * @param: redisWebsocketMsg
     * @return void
     * @author wangjinwei
     * @date 2021/4/30 10:40
     */
    void sendMsg(RedisWebsocketMsg redisWebsocketMsg);
}
 
 
 
 
 
package com.minivision.label.management.service.impl;
 
import com.minivision.label.management.netty.util.SessionUtil;
import com.minivision.label.management.redis.RedisClient;
import com.minivision.label.management.redis.RedisWebsocketMsg;
import com.minivision.label.management.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.util.List;
 
import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC;
 
/**
 * ━━━━━━佛祖保佑━━━━━━
 *                  ,;,,;
 *                ,;;'(    社
 *      __      ,;;' ' \   会
 *   /'  '\'~~'~' \ /'\.)  主
 * ,;(      )    /  |.     义
 *,;' \    /-.,,(   ) \    码
 *     ) /       ) / )|    农
 *     ||        ||  \)
 *     (_\       (_\
 * ━━━━━━永无BUG━━━━━━
 * @program: label-management-system-backend
 * @description:
 * @author: wangjinwei
 * @create: 2021-12-27 15:30
 **/
@Service
@Slf4j
public class PushServiceImpl implements PushService {
 
    @Autowired
    private RedisClient redisClient;
 
    @Override
    public void pushMsgToOne(String userId, String msg) {
        Channel channel = SessionUtil.getChannel(userId);
        if (channel == null) {
            log.info("未找到通道信息,数据不推送");
            return;
        }
        if (!channel.isActive()) {
            log.info("通道已经关闭,数据不推送");
            //移除通道信息
            SessionUtil.unBindSession(channel);
        }
        long start = System.currentTimeMillis();
        log.info("PushServiceImpl.pushMsgToOne userId={}, msg={},start={}", userId, msg, start);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
        log.info("PushServiceImpl.pushMsgToOne userId={}, msg={},end={}", userId, msg, System.currentTimeMillis());
    }
 
    @Override
    public void pushMsg(List<String> userIds, String msg) {
        if (CollectionUtils.isEmpty(userIds)) {
            return;
        }
        userIds.forEach(userId -> {
            pushMsgToOne(userId, msg);
        });
    }
 
    /**
     * @Description: 将消息推送给单个用户
     * @param: redisWebsocketMsg
     * @return void
     * @author wangjinwei
     * @date 2021/4/30 10:40
     */
    @Override
    @SuppressWarnings("all")
    public void sendMsg(RedisWebsocketMsg redisWebsocketMsg) {
        long start = System.currentTimeMillis();
        //给其他订阅了主题的节点发消息,因为本节点没有
        redisClient.convertAndSend(TASK_MSG_WS_TOPIC, redisWebsocketMsg);
    }
 
}

相关推荐

MySQL数据库中,数据量越来越大,有什么具体的优化方案么?

个人的观点,这种大表的优化,不一定上来就要分库分表,因为表一旦被拆分,开发、运维的复杂度会直线上升,而大多数公司和开发人员是欠缺这种能力的。所以MySQL中几百万甚至小几千万的表,先考虑做单表的优化。...

Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然

你是不是每天打开APP,第一时间就是去“签到打卡”?或者在社交软件里,看到你的朋友头像旁边亮着“在线”的绿灯?这些看似简单的功能背后,都隐藏着一个有趣而高效的数据结构。如果让你来设计一个签到系统:用户...

想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!

作为一名内容创作者,你每天最期待的,除了文章阅读量蹭蹭上涨,是不是还特别想知道,到底有多少个“独立用户”阅读了你的文章?这个数字,我们通常称为“UV”(UniqueVisitors),它比总阅读量更...

Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器

你可能从未听过这个拗口的名字——“HyperLogLog”,它听起来就像是某个高深莫测的数学公式。但请相信我,理解它的核心思想并不难,而且一旦你掌握了它,你会发现它在处理大数据统计问题时,简直就是“救...

阿里云国际站:为什么我的云服务器运行缓慢?

本文由【云老大】TG@yunlaoda360撰写一、网络性能瓶颈带宽不足现象:上传/下载速度慢,远程连接卡顿。排查:通过阿里云控制台查看网络流量峰值是否接近带宽上限34。解决:升级带宽(如从1M提...

Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan

作者|MichaelRedlich译者|明知山策划|丁晓昀OpenJDKJEP503(移除32位x86移植版本)已从“ProposedtoTarget”状态进入到“T...

腾讯云国际站:怎样设置自动伸缩应对流量高峰?

云计算平台服务以阿里云为例:开通服务与创建伸缩组:登录阿里云控制台,找到弹性伸缩服务并开通。创建伸缩组时,选择地域与可用区,定义伸缩组内最小/最大实例数,绑定已有VPC虚拟交换机。实例模板需...

【案例分享】如何利用京东云建设高可用业务架构

本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业...

Spring Security在前后端分离项目中的使用

1文章导读SpringSecurity是Spring家族中的一个安全管理框架,可以和SpringBoot项目很方便的集成。SpringSecurity框架的两大核心功能:认证和授权认证:...

Redis与Java集成的最佳实践

Redis与Java集成的最佳实践在当今互联网飞速发展的时代,缓存技术的重要性毋庸置疑。Redis作为一款高性能的分布式缓存数据库,与Java语言的结合更是如虎添翼。今天,我们就来聊聊Redis与Ja...

Redis在Java项目中的应用与数据持久化

Redis在Java项目中的应用与数据持久化Redis简介:为什么我们需要它?在Java项目中,Redis就像一位不知疲倦的快跑选手,总能在关键时刻挺身而出。作为一个内存数据库,它在处理高并发请求时表...

Redis 集群最大节点个数是多少?

Redis集群最大节点个数取决于Redis的哈希槽数量,因为每个节点可以负责多个哈希槽。在Redis3.0之前,Redis集群最多支持16384个哈希槽,因此最大节点数为16384个。但是在Redi...

Java开发岗面试宝典:分布式相关问答详解

今天千锋广州Java小编就给大家分享一些就业面试宝典之分布式相关问题,一起来看看吧!1.Redis和Memcache的区别?1、存储方式Memecache把数据全部存在内存之中,断电后会挂掉,数据不...

当Redis内存不足时,除了加内存,还有哪些曲线救国的办法?

作为“速度之王”的Redis,其高性能的秘密武器之一就是将数据存储在内存中。然而,内存资源是有限且昂贵的。当你的Redis实例开始告警“内存不足”,或者写入请求被阻塞时,最直接的解决方案似乎就是“加内...

商品详情页那么多信息,Redis的“哈希”如何优雅存储?

你每天网购时,无论是打开淘宝、京东还是拼多多,看到的商品详情页都琳琅满目:商品名称、价格、库存、图片、描述、评价数量、销量。这些信息加起来,多的惊人。那么问题来了:这些海量的商品信息,程序是去哪里取出...

取消回复欢迎 发表评论: