百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

RabbitMQ 补偿机制、消息幂等性解决方案

mhr18 2024-11-12 11:15 14 浏览 0 评论

1. 场景

先看这么几个面试题:

  1. 如何保证消息的可靠性投递?即如何确定消息是否发送成功?
  2. 如果失败如何处理(补偿机制)?
  3. 如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

2.消息的可靠性投递

消息确认

消息确认包括主要 生产者发送确认消费者接受确认 ,因为发送消息的过程中我们是无法确认消息是否能路由等,一旦消息丢失我们就无法处理,所以需要确认消息,避免消息丢失。

2.1 生产者确认

我们知道生产者与消费者是完全隔离的,不做任何配置的情况下,生产者是不知道消息是否真正到达 RabbitMQ,也就是说消息发布操作不返回任何消息给生产者。

那么怎么保证我们消息发布的 可靠性投递 ?有以下几种常用机制。

由于之前的文章对上面都有过介绍,所以这里不一一介绍,而一般采用的方式就是 发布者确认模式(生产者确认模式)

原理:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这个 id 在生产者和 RabbitMQ 之间进行消息的确认。

这里的 唯一 ID 能够唯一标识消息,在消息不可达的时候触发回调时可以获取该值,进行对应的错误处理,建立对应的消息补偿机制。(记住这个唯一 ID,且是全局唯一,分布式系统中可采用雪花算法等方式)

confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。

注:这里描述的场景都是那样 正确路由到队列中 的,也就是不考虑失败通知(ReturnCallback)的情况。

由于现在大家开发基本都是通过 Spring Boot 的方式进行开发,所以,这里直接提供其基本配置类参考,如下:

/**
 * @description : 消息生产者
 */
@Component
@Slf4j
public class RabbitmqProducer {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(Map<String, Object> headers, Object message, String messageId, String exchangeName, String key) {
        // 自定义消息头
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        // 创建消息
        Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
        /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
         * 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
            if (!ack) {
                System.out.println("进行对应的消息补偿机制");
            }
        });
        // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData);
    }
}

复制代码

2.2 消费者确认

说了生产者如何保证消息的确认,而对于消费者来说,同样需要确认。

前面也说了目前编码都是基于 Spring 的,那么对于消费者来说,同样也有一个接收消息的配置类,如下:

/**
 * @description : 消息消费者
 */
@Component
@Slf4j
public class RabbitmqConsumer {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),
            exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),
            key = RabbitInfo.ROUTING_KEY)
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageHeaders headers = message.getHeaders();
        // 获取消息头信息和消息体
        log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload());
    }
}

复制代码

对于上面接收消息的配置并没有做任何配置,当我们发送消息的时候,消费者接收消息并进行对应的逻辑处理,并且 Spring 的处理是自动 ack 的 ,但其实它也有配置,如下:

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
    # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        # 为了保证信息能够被正确消费,建议签收模式设置为手工签收,并在代码中实现手工签收
        acknowledge-mode: manual
        # 侦听器调用者线程的最小数量
        concurrency: 10
        # 侦听器调用者线程的最大数量
        max-concurrency: 50

复制代码

也就是上面的 acknowledge-mode ,他有三个值,如下:

  1. 当为 NONE 的时候,即默认值,即 autoAck=true ,消费者接收消息后自动确认,此时,MQ 队列中的消息会移除;
  2. 当为 MANUAL 的时候,需要我们手动确认,即 channel.basicAck ,如下:
  3. 当为 AUTO 的时候,经测试发现和 NONE 没什么不同 。

此时,有人就会说那么如果在默认情况下(自动确认),我们的业务代码抛出异常了怎么办?

Spring 的做法是会抛出异常,并且消息不会被 ack,如下面这种情况:

1/0 肯定会抛出异常,此时会一直打印日志,如下:

再去看 MQ 客户端,状态如下:

此时,可能会造成重复消费,怎么理解?

假如我的业务代码没有事务,或者在参数的传递过程中某个方法没有事务的控制,当异常业务代码之前入库了,那么这条消息实际上是没有被确认的,还在队列中,因此,当下次程序启动,则会再次消费这条消息,尽管业务代码出现了异常。

自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端业务代码抛出异常,也就是消费端没有处理成功这条消息,那么就 相当于丢失了消息

如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务代码会进行回滚,这也同样造成了实际意义的消息丢失。

3. 补偿机制

在回到前面的问题,如何确定消息是否发送成功? 生产者确认机制 确实能帮我们解决这个问题,但如果生产者就是接收不到 ack 这个指令怎么办,比如消费者处理时间太长或者网络超时,等等情况,导致生产者一直接收不到这个 ack ,此时怎么办?

生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有得到响应的消息,可以设置一个定时重发的补偿机制:通过消息落库 + 定时任务来实现。

怎么做?这里讲讲思路,如下:

  1. 发送消息之前,先把消息入库,我这里的表设计如下:
   CREATE TABLE `t_cap_published_message` (
     `id` varchar(40) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '标识。',
     `version` varchar(20) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '版本',
     `exchange` varchar(200) COLLATE utf8mb4_bin DEFAULT '' COMMENT '交换机。',
     `topic` varchar(200) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '话题。',
     `content` longtext COLLATE utf8mb4_bin NOT NULL COMMENT '消息内容。',
     `retries` int(11) NOT NULL COMMENT '重试次数,一般为 3 次。',
     `expiry` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '过期时间。',
     `status` varchar(40) COLLATE utf8mb4_bin NOT NULL COMMENT '状态,成功则消息ack成功,其他状态都要重试。',
     `created_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间。',
     `last_modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间,可以用作数据版本。',
     PRIMARY KEY (`id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='发布的消息。';

复制代码

  1. 入库之后在发送消息;
  2. 如果在规定时间不能 ack 或者 ack=false ,即 confirmCallback 回调的 ack=false ,则按照定时规则重新发送消息;
  3. 然后对于发布成功的消息,如果业务操作完成,实际上它的作用已经发挥完成,一段时间对数据库做清理即可,根据业务的具体情况。

4. 消息幂等性

首先我们要知道什么是幂等性,比如一个转账系统,A 要转给 B 100 元,当 A 发出消息后,B 接收成功,然后给 MQ 确认的时候出现网络波动,MQ 并没有接收到 ack 确认,那 MQ 为了保证消息被消费,就会继续给消费者投递之前的消息,如果再重复投递 5 次,则 B 在处理 5 次,加上之前的一次,B 的余额增加了 600 元,很明显是不合理的。

所以幂等性简单来说就是: 重复调用多次产生的业务结果与调用一次产生的业务结果相同 ;

为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端是没有这种控制的,因为它不知道你是不是就要把一条消息发送两次,所以只能在消费端控制。

回到前面生产者确认模式中讲到了一个 全局唯一 ID ,我们可以通过他来保证消息的幂等性,如下:

  1. 消费者获取到消息后先根据这个全局 唯一 ID 去查询 redis/db 是否存在该消息;
  2. 如果不存在,则正常消费,消费完毕后写入 redis/db;
  3. 如果存在,则证明消息被消费过,直接丢弃,不做处理。


原文 https://xie.infoq.cn/article/abe7691508f57c91906d9a160

相关推荐

Spring Boot3 连接 Redis 竟有这么多实用方式

各位互联网大厂的后端开发精英们,在日常开发中,想必大家都面临过系统性能优化的挑战。当系统数据量逐渐增大、并发请求不断增多时,如何提升系统的响应速度和稳定性,成为了我们必须攻克的难题。而Redis,这...

隧道 ssh -L 命令总结 和 windows端口转发配置

摘要:隧道ssh-L命令总结和windows端口转发配置关键词:隧道、ssh-L、端口转发、网络映射整体说明最近在项目中,因为内网的安全密级比较高,只能有一台机器连接内网数据库,推送...

火爆BOOS直聘的13个大厂Java社招面经(5年经验)助你狂拿offer

火爆BOOS直聘的13个大厂Java社招面经(5年经验)助你狂拿offer综上所述,面试遇到的所有问题,整理成了一份文档,希望大家能够喜欢!!Java面试题分享(Java中高级核心知识全面解析)一、J...

「第五期」游服务器一二三面 秋招 米哈游

一面下午2点,35分钟golang内存模型golang并发模型golanggc原理过程channel用途,原理redis数据结构,底层实现跳跃表查询插入复杂度进程,线程,协程kill原理除了kil...

RMQ——支持合并和优先级的消息队列

业务背景在一个项目中需要实现一个功能,商品价格发生变化时将商品价格打印在商品主图上面,那么需要在价格发生变动的时候触发合成一张带价格的图片,每一次触发合图时计算价格都是获取当前最新的价格。上游价格变化...

Redis 中的 zset 为什么要用跳跃表,而不是B+ Tree 呢?

Redis中的有序集合使用的是一种叫做跳跃表(SkipList)的数据结构来实现,而不是使用B+Tree。本文将介绍为什么Redis中使用跳跃表来实现有序集合,而不是B+Tree,并且探讨跳跃表...

一文让你彻底搞懂 WebSocket 的原理

作者:木木匠转发链接:https://juejin.im/post/5c693a4f51882561fb1db0ff一、概述上一篇文章《图文深入http三次握手核心问题【思维导图】》我们分析了简单的一...

Redis与Java整合的最佳实践

Redis与Java整合的最佳实践在这个数字化时代,数据处理速度决定了企业的竞争力。Redis作为一款高性能的内存数据库,以其卓越的速度和丰富的数据结构,成为Java开发者的重要伙伴。本文将带你深入了...

Docker与Redis:轻松部署和管理你的Redis实例

在高速发展的云计算时代,应用程序的部署和管理变得越来越复杂。面对各种操作系统、依赖库和环境差异,开发者常常陷入“在我机器上能跑”的泥潭。然而,容器化技术的兴起,尤其是Docker的普及,彻底改变了这一...

Java开发中的缓存策略:让程序飞得更快

Java开发中的缓存策略:让程序飞得更快缓存是什么?首先,让我们来聊聊什么是缓存。简单来说,缓存是一种存储机制,它将数据保存在更快速的存储介质中,以便后续使用时能够更快地访问。比如,当你打开一个网页时...

国庆临近,字节后端开发3+4面,终于拿到秋招第一个offer

字节跳动,先面了data部门,3面技术面之后hr说需要实习转正,拒绝,之后另一个部门捞起,四面技术面,已oc分享面经,希望对大家有所帮助,秋招顺利在文末分享了我为金九银十准备的备战资源库,包含了源码笔...

“快”就一个字!Redis凭什么能让你的APP快到飞起?

咱们今天就来聊一个字——“快”!在这个信息爆炸、耐心越来越稀缺的时代,谁不希望自己手机里的APP点一下“嗖”就打开,刷一下“唰”就更新?谁要是敢让咱用户盯着个小圈圈干等,那简直就是在“劝退”!而说到让...

双十一秒杀,为何总能抢到?Redis功不可没!

一年一度的双十一“剁手节”,那场面,简直比春运抢票还刺激!零点的钟声一敲响,亿万个手指头在屏幕上疯狂戳戳戳,眼睛瞪得像铜铃,就为了抢到那个心心念念的半价商品、限量版宝贝。你有没有发现一个奇怪的现象?明...

后端开发必看!为什么说Redis是天然的幂等性?

你在做后端开发的时候,有没有遇到过这样的困扰:高并发场景下,同一个操作重复执行多次,导致数据混乱、业务逻辑出错?别担心,很多同行都踩过这个坑。某电商平台就曾因订单创建接口在高并发时不具备幂等性,用户多...

开发一个app需要哪些技术和工具

APP开发需要一系列技术和工具的支持,以下是对这些技术的清晰归纳和分点表示:一、前端开发技术HTML用于构建页面结构。CSS用于样式设计和布局。JavaScript用于页面交互和逻辑处理。React...

取消回复欢迎 发表评论: