一文搞懂消息推送技术选型(消息推送的几种实现方式)
mhr18 2024-10-29 14:34 25 浏览 0 评论
- Ajax短轮询
- MQ
- redis 订阅/发布
Ajax短轮询
优点:
简单高效、浏览器使用循环不断地、间隔地发送请求获取数据
缺点:
频繁创建/断开连接,每次请求都会查询一遍数据不管有无都返回,对服务器业务处理的性能有很大的需求和压力;因为请求间有间隔时间,获取的数据是伪实时的,不适应对实时性要求很高的项目。
典型运用:
扫码登录
MQ
MQ的引入虽然 会造成技术的复杂度提升,但是合理的使用会极大的提高系统的 容错能力。
- 优点:
- 一般MQ都用于 系统解耦、流量削峰、数据分发
- 缺点:
如果MQ服务挂了,导致消息发送和接收就无法使用了
复杂度提高。
MQ的对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 |
rabbitmq 基于Elang语言编写 虽然提供了天然的高并发能力,但是 不利于 深入了解与掌握。
MQ的引入 需要保证两点:可靠性、高可用、幂等性。
mq想如果需要保证可靠性、在某些 对于实时性要求较高的 业务中,那么需要对消息进行持久化、以及保证消息的不丢失。
可靠性
结合三点就是生产者丢失消息、mq自身丢失消息、消费者丢失消息
MQ 若要保障消息的不丢失,对于rabbitmq来讲,常用的有两种方式:
1、开启事务
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
但是 对于rabbitmq 来说 开启事务 造成性能上的 浪费是很大的
消息数量 | 开启事务 | 未开启事务 |
10w | 320796ms | 10246ms |
开启事务 与不开启事务 对于 性能上的开销是 320倍。因为 其被 @Transaction注解 标注过,对于每条消息都会被事务拦截器拦截处理。
2、ACK机制
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费。
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
rabbitq提供了两个回调方法
confirm 与return 回调
confirm 是用于生产者发送消息,保证交换机exchange能正常收到,但是无法保证 从exchange的消息 正常发送给队列去消费。
return回调是处理一些 不可正确路由的消息,如exchange 不存在,或者就是路由key 无法正确找到队列。
这两种机制 是可靠性的 重要保障,可以保证消息正常的在mq中传递。
@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
}
}
/**
* Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
String messageId = message.getMessageProperties().getMessageId();
}
}
MQ的幂等性搭建:
ACK机制能保证消息一定能被消费但是无法保证消息被消息了几次,这就需要额外编码来保证幂等性,而rabbitmq没有提供额外的幂等操作需要额外代码保证。
MQ的高可用
rabbitmq的消息是储存在一个节点中,让mq的节点崩溃后 其存储的消息就会丢失,会造成服务的不可用,如果使用缓存使用一个持久化的queue,但是在message发送并写入磁盘之间会存在一个虽然短暂的时间差。
为了避免节点失效,将mq节点进行集群处理,当一个节点失效后 就有第二个节点接替前一个节点工作。单失效的那个节点上的消息无法被找回。
镜像队列的配置:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指n明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
可以通过下面命令判断那些slaves已经完成同步
rabbitmqctl list_queues name slave_pids synchronised_slave_pids
镜像队列的原理:
redis 订阅/通知
优缺点:
redis的 订阅通知 与rabbitmq相比,其优势体现在不想要搭建复杂笨重的 MQ ,简单轻量。但是由于redis没有类似于mq的消息持久化与ACK的保证,所以redis实现的发布/订阅功能并不可靠,仅适用于实时、且可靠性不高的场景(因为redis的订阅/发布目前是发送即忘的形式,如果客户端短线即会丢失)。如一些列消息的弹窗通知、有效期等等。
实现方式之一:
redis的键空间通知
配置:
- 首先找到redis.conf配置文件,打开文件,查找notify-keyspace-events,将前面的#去掉便可。注意:这里配置的是notify-keyspace-events的Ex参数,即说明,当键过时的时候会触发通知,若是只须要哈希命令键触发通知则能够设置为notify-keyspace-events Eh。
- 重启redis-server。
- 配置完成。
redis:
localhost: localhost
port: 6379
database: 7
password:
# 过期事件订阅,接收7号数据库中所有key的过期事件
listen-pattern: __keyevent@7__:expired
@Configuration
public class RedisListenerConfiguration {
@Value("${spring.redis.listen-pattern}")
public String pattern;
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnection);
/**
* Topic是消息发布(Pub)者和订阅(Sub)者之间的传输中介
*/
Topic topic = new PatternTopic(this.pattern);
container.addMessageListener(new RedisMessageListener(), topic);
return container;
}
}
监听:
public class RedisMessageListener implements MessageListener {
/**
* Redis 事件监听回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
}
}
当向redis订阅一个 过期时间的时候,当key过期的时候 redis会发送一个通知高速服务器,key事件已过期,然后 服务器可以执行自己的相关逻辑,可以在key过期的时候 执行一系列操作
4】三种通信方式的优缺点
短轮询 | 长轮询 | WebSocket | |
浏览器支持 | 几乎所有现代浏览器 | 几乎所有现代浏览器 | IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+ |
服务器负载 | 较少的CPU资源,较多的内存资源和带宽资源 | 与传统轮询相似,但是占用带宽较少 | 无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。三种方式里性能最佳。 |
客户端负载 | 占用较多的内存资源与请求数。 | 与传统轮询相似。 | 同Server-Sent Event。 |
延迟 | 非实时,延迟取决于请求间隔。 | 同传统轮询。 | 实时。 |
实现复杂度 | 非常简单。 | 需要服务器配合,客户端实现非常简单。 | 需要Socket程序实现和额外端口,客户端实现简单。 |
技术方案的选型:
关于业务场景,如果并发量不大,请求频率不高的情况下 选用轮询难度实现上小很多,而且容错率更高。如果在频繁请求资源,一次请求无法返回所有数据的情况下 适合使用websocket。具体需要看业务场景决定。
MQ的典型应用:
哔哩哔哩的弹幕技术架构
- Kafka(第三方服务)
消息队列系统。Kafka 是一个分布式的基于发布/订阅的消息系统,它是支持水平扩展的。每条发布到 Kafka 集群的消息都会打上一个名为 Topic(逻辑上可以被认为是一个 queue)的类别,起到消息分布式分发的作用。 - Router
存储消息。Comet 将信息传送给 Logic 之后,Logic 会对所收到的信息进行存储,采用 register session 的方式在 Router 上进行存储。Router 里面会收录用户的注册信息,这样就可以知道用户是与哪个机器建立的连接。 - Logic
对消息进行逻辑处理。用户建立连接之后会将消息转发给 Logic ,在 Logic 上可以进行账号验证。当然,类似于 IP 过滤以及黑名单设置此类的操作也可以经由 Logic 进行。 - Comet
维护客户端长链接。在上面可以规定一些业务需求,比如可以规定用户传送的信息的内容、输送用户信息等。Comet 提供并维持服务端与客户端之间的链接,这里保证链接可用性的方法主要是发送链接协议(如 Socket 等)。 - Client
客户端。与 Comet 建立链接。 - Jop
消息分发。可以起多个 Jop 模块放到不同的机器上进行覆盖,将消息收录之后,分发到所有的 Comet 上,之后再由 Comet 转发出去。
MQ的实现延迟的方式
rabbitmq:
rabbitmq并没有提供 原生的 延迟队列的实现方式,如果要实现延迟的效果可以使用 死信队列的方式
“死信”是RabbitMQ中的一种消息机制,死信是当MQ出现以下情况的时候:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度
如何配置死信队列
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
最后总结:选用哪种推送技术需要根据具体的业务场景,一句话一切脱离业务的设计都是耍流氓,不能杀鸡用牛刀,也不能不考虑未来。
相关推荐
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
-
AI-generatedimageAsianFin--Dubaiisrapidlytransformingitselffromadesertoilhubintoaglob...
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
-
TMTPOST--OpenAIisescalatingthepricewarinlargelanguagemodel(LLM)whileseekingpartnershi...
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
-
,抓住风口(iOS用户请用电脑端打开小程序)本期要点:详解2025年大热点你好,我是王煜全,这里是王煜全要闻评论。最近,有个词被各个科技大佬反复提及——AIAgent,智能体。黄仁勋在CES展的发布...
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
-
1、本文属于mini商城系列文档的第0章,由于篇幅原因,这篇文章拆成了6部分,本文属于第5部分2、mini商城项目详细文档及代码见CSDN:https://blog.csdn.net/Eclipse_...
- Python+Appium环境搭建与自动化教程
-
以下是保姆级教程,手把手教你搭建Python+Appium环境并实现简单的APP自动化测试:一、环境搭建(Windows系统)1.安装Python访问Python官网下载最新版(建议...
- 零配置入门:用VSCode写Java代码的正确姿
-
一、环境准备:安装JDK,让电脑“听懂”Java目标:安装Java开发工具包(JDK),配置环境变量下载JDKJava程序需要JDK(JavaDevelopmentKit)才能运行和编译。以下是两...
- Mycat的搭建以及配置与启动(mycat2)
-
1、首先开启服务器相关端口firewall-cmd--permanent--add-port=9066/tcpfirewall-cmd--permanent--add-port=80...
- kubernetes 部署mysql应用(k8s mysql部署)
-
这边仅用于测试环境,一般生产环境mysql不建议使用容器部署。这里假设安装mysql版本为mysql8.0.33一、创建MySQL配置(ConfigMap)#mysql-config.yaml...
- Spring Data Jpa 介绍和详细入门案例搭建
-
1.SpringDataJPA的概念在介绍SpringDataJPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射(Object-Re...
- 量子点格棋上线!“天衍”邀您执子入局
-
你是否能在策略上战胜量子智能?这不仅是一场博弈更是一次量子智力的较量——量子点格棋正式上线!试试你能否赢下这场量子智局!游戏玩法详解一笔一画间的策略博弈游戏目标:封闭格子、争夺领地点格棋的基本目标是利...
- 美国将与阿联酋合作建立海外最大的人工智能数据中心
-
当地时间5月15日,美国白宫宣布与阿联酋合作建立人工智能数据中心园区,据称这是美国以外最大的人工智能园区。阿布扎比政府支持的阿联酋公司G42及多家美国公司将在阿布扎比合作建造容量为5GW的数据中心,占...
- 盘后股价大涨近8%!甲骨文的业绩及指引超预期?
-
近期,美股的AI概念股迎来了一波上升行情,微软(MSFT.US)频创新高,英伟达(NVDA.US)、台积电(TSM.US)、博通(AVGO.US)、甲骨文(ORCL.US)等多股亦出现显著上涨。而从基...
- 甲骨文预计新财年云基础设施营收将涨超70%,盘后一度涨8% | 财报见闻
-
甲骨文(Oracle)周三盘后公布财报显示,该公司第四财季业绩超预期,虽然云基建略微逊于预期,但管理层预计2026财年云基础设施营收预计将增长超过70%,同时资本支出继上年猛增三倍后,新财年将继续增至...
- Springboot数据访问(整合MongoDB)
-
SpringBoot整合MongoDB基本概念MongoDB与我们之前熟知的关系型数据库(MySQL、Oracle)不同,MongoDB是一个文档数据库,它具有所需的可伸缩性和灵活性,以及所需的查询和...
- Linux环境下,Jmeter压力测试的搭建及报错解决方法
-
概述 Jmeter最早是为了测试Tomcat的前身JServ的执行效率而诞生的。到目前为止,它的最新版本是5.3,其测试能力也不再仅仅只局限于对于Web服务器的测试,而是涵盖了数据库、JM...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
- Python+Appium环境搭建与自动化教程
- 零配置入门:用VSCode写Java代码的正确姿
- Mycat的搭建以及配置与启动(mycat2)
- kubernetes 部署mysql应用(k8s mysql部署)
- Spring Data Jpa 介绍和详细入门案例搭建
- 量子点格棋上线!“天衍”邀您执子入局
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)