百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

阿里二面:RabbitMQ中如何保证消息不被重复消费?

mhr18 2024-12-04 13:59 21 浏览 0 评论

想象一下,你正在构建一个超级重要的电商系统,就像一个庞大而繁忙的商业帝国。在这个系统中,消息的传递就像是帝国的信使,负责在各个部门之间传递重要的指令和信息。但是,如果这些消息像调皮的小精灵一样,时不时地重复出现,那可就会引发一场大混乱啦!比如说,用户可能会收到重复的订单通知,商家可能会重复处理同一个订单,这不仅会让用户感到困惑和不满,还可能给系统带来严重的问题。所以呀,在 RabbitMQ 中,如何保证消息不被重复消费,就成了我们必须要攻克的一个难题!我将带你深入RabbitMQ的腹地,揭秘那些让消息消费稳如泰山的独门秘籍,让你的系统从此告别“重复消费”的噩梦!

在RabbitMQ的世界里,保证消息不被重复消费,其实是一场与时间和逻辑的精彩博弈。关键在于,我们要确保每个消息只被处理一次,即便在系统故障、重启或网络波动等情况下也能如此。那么,如何实现这一宏伟目标呢?

一、消息唯一标识

首先,我们要为每一条消息赋予一个独一无二的标识,就像给每个小精灵贴上专属的魔法标签。在发送消息的时候,把这个魔法标签一并发送出去。当消费者接收到消息后,先检查一下这个标签是否已经见过了。如果见过,那就说明这个小精灵已经完成任务啦,直接忽略它;如果没见过,那就让它开始工作,并把这个魔法标签记录下来,以便下次再见到时能够识别出来。

实现步骤:

  1. 在发送消息之前,生成一个唯一的消息标识,比如使用 UUID 算法。
  2. 将消息标识和消息内容一起发送到 RabbitMQ 中。
  3. 消费者接收到消息后,从消息中提取出消息标识,进行重复判断。
import java.util.UUID;

// 生成唯一消息标识的方法
public class MessageUtils {
    public static String generateUniqueMessageId() {
        // 使用 UUID 生成唯一标识
        return UUID.randomUUID().toString();
    }
}

举个例子,在我们的电商魔法王国中,当用户下单成功后,系统会这样发送消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        Connection connection = factory.newConnection();

        // 创建信道
        Channel channel = connection.createChannel();

        // 生成唯一消息标识
        String messageId = MessageUtils.generateUniqueMessageId();

        // 要发送的消息内容
        String message = "订单信息:商品 A,数量 1";

        // 将消息标识和消息内容一起发送到 RabbitMQ 中
        channel.basicPublish("exchange_name", "routing_key", null, (messageId + "," + message).getBytes());

        System.out.println("消息已发送:" + message);

        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

消费者接收到消息后,从消息中提取出消息标识,进行重复判断:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashSet;
import java.util.Set;

public class Consumer {
    private static final Set<String> processedMessageIds = new HashSet<>();

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        Connection connection = factory.newConnection();

        // 创建信道
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare("queue_name", true, false, false, null);

        // 设置消息接收回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            // 从消息中提取消息标识
            String messageId = message.split(",")[0];

            // 检查消息标识是否已处理过
            if (!processedMessageIds.contains(messageId)) {
                System.out.println("接收到新消息:" + message);
                // 处理消息逻辑...

                // 将消息标识添加到已处理集合中
                processedMessageIds.add(messageId);
            } else {
                System.out.println("重复消息,已忽略:" + message);
            }
        };

        // 开始接收消息
        channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> { });
    }
}

二、数据库去重表

另一种方法是利用数据库来实现消息的去重。我们可以在数据库中创建一个专门的去重表,用来记录已经处理过的消息标识。当消费者接收到消息后,先在去重表中查询这个消息标识是否存在。如果存在,说明这条消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并在去重表中插入这条消息的标识。

比如说,我们可以在数据库中创建一个名为 “message_processed” 的表,包含两个字段:“message_id” 和 “processed_time”。当消费者接收到消息后,先查询 “message_processed” 表中是否存在相同的 “message_id”。如果存在,说明消息已经处理过了;如果不存在,就处理消息,并在 “message_processed” 表中插入一条新记录,记录消息标识和处理时间。

实现步骤:

  1. 创建一个数据库表,用于存储已处理消息的标识。
  2. 消费者在处理消息之前,先查询数据库表中是否存在该消息标识。
  3. 如果不存在,处理消息,并将消息标识插入到数据库表中。

三、Redis 去重

除了数据库,我们还可以借助 Redis 来实现消息的去重。Redis 是一个非常高效的内存数据库,它的操作速度非常快,可以满足高并发场景下的需求。我们可以使用 Redis 的集合(Set)数据结构来存储已经处理过的消息标识。当消费者接收到消息后,先在 Redis 中查询这个消息标识是否存在于集合中。如果存在,说明消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并将消息标识添加到 Redis 集合中。

比如说,在我们的电商系统中,当用户支付成功后,系统会发送一条支付成功的消息到 RabbitMQ 中。消费者在接收到消息后,先从 Redis 中获取一个名为 “processed_messages” 的集合,然后检查集合中是否包含该消息的标识。如果包含,说明消息已经处理过了;如果不包含,就处理消息,并将消息标识添加到 “processed_messages” 集合中。

实现步骤:

  1. 连接到 Redis 服务器。
  2. 在 Redis 中创建一个集合,用于存储已处理消息的标识。
  3. 消费者在处理消息之前,先查询 Redis 集合中是否存在该消息标识。
  4. 如果不存在,处理消息,并将消息标识添加到 Redis 集合中。

四、死信队列与重试机制

在 RabbitMQ 的魔法世界里,还有一个非常强大的工具,那就是死信队列(Dead Letter Queue,简称 DLQ)和重试机制。当消息在处理过程中遇到问题,比如消费者处理消息时抛出了异常,或者消息在队列中等待了太长时间而没有被消费,这些消息就可以被转移到死信队列中。然后,我们可以针对死信队列中的消息进行重试或者其他特殊处理。

实现步骤:

  1. 创建普通队列和死信交换器、死信队列:首先,我们需要创建一个普通的队列,用于接收和处理消息。同时,还需要创建一个死信交换器和一个死信队列。死信交换器用于将死信路由到死信队列中。
channel.queueDeclare("normal_queue", true, false, false, null);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlq_queue", true, false, false, null);
channel.queueBind("dlq_queue", "dlx_exchange", "dlq_routing_key");
  1. 设置普通队列的死信参数:在创建普通队列时,我们需要设置一些死信参数,告诉 RabbitMQ 当消息满足什么条件时,将其转移到死信队列中。这些参数包括死信交换器的名称、死信路由键以及消息的过期时间等。
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlq_routing_key");
args.put("x-message-ttl", 60000); // 设置消息过期时间为 60 秒(可根据实际情况调整)
channel.queueDeclare("normal_queue", true, false, false, args);
  1. 消费者处理消息并进行重试:当消费者从普通队列中获取到消息并进行处理时,如果处理过程中出现异常,我们可以根据实际情况决定是否进行重试。如果需要重试,我们可以将消息重新发送回原来的队列,或者发送到一个专门用于重试的队列中。在重新发送消息之前,我们可以根据需要修改消息的一些属性,比如重试次数等。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    try {
        // 处理消息逻辑,可能会抛出异常
        System.out.println("正在处理消息:" + message);
        // 模拟处理消息时出现异常
        throw new Exception("处理消息失败");
    } catch (Exception e) {
        System.out.println("处理消息失败,将进行重试");
        // 获取消息的重试次数
        int retryCount = getRetryCount(delivery);
        if (retryCount < 3) { // 假设最大重试次数为 3 次
            // 增加重试次数
            retryCount++;
            // 将重试次数设置到消息头部
            AMQP.BasicProperties properties = delivery.getProperties();
            properties = properties.builder().headers(putRetryCount(retryCount)).build();
            // 将消息重新发送回队列
            channel.basicPublish("", "normal_queue", properties, delivery.getBody());
        } else {
            // 超过最大重试次数,将消息发送到死信队列
            System.out.println("超过最大重试次数,将消息发送到死信队列");
            channel.basicPublish("dlx_exchange", "dlq_routing_key", null, delivery.getBody());
        }
    }
};
  1. 处理死信队列中的消息:我们可以创建一个专门的消费者来处理死信队列中的消息。这个消费者可以根据具体的业务需求,对死信队列中的消息进行相应的处理,比如记录日志、发送告警通知等。
DeliverCallback dlqDeliverCallback = (consumerTag, delivery) -> {
    String deadLetterMessage = new String(delivery.getBody(), "UTF-8");
    System.out.println("从死信队列中获取到消息:" + deadLetterMessage);
    // 处理死信队列中的消息逻辑...
};
channel.basicConsume("dlq_queue", true, dlqDeliverCallback, consumerTag -> { });

五、ACK 机制

ACK(Acknowledgment)机制就像是消费者给 RabbitMQ 的一个反馈信号,告诉它消息是否已经被成功处理。当消费者成功处理完一条消息后,它会向 RabbitMQ 发送一个 ACK 确认消息。RabbitMQ 收到 ACK 后,就会知道这条消息已经被处理了,然后将其从队列中移除。如果消费者在处理消息的过程中出现了问题,或者没有在规定的时间内发送 ACK,RabbitMQ 会认为这条消息没有被成功处理,从而将其重新发送给其他消费者进行处理。

实现步骤:

  1. 消费者在订阅队列时,设置 autoAck 参数为 false,表示需要手动发送 ACK。
channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> { });
  1. 在消息处理逻辑中,当消息处理成功后,调用 channel.basicAck 方法发送 ACK 确认消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    try {
        // 处理消息逻辑
        System.out.println("正在处理消息:" + message);
        // 处理成功后发送 ACK 确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理消息失败时的逻辑
        System.out.println("处理消息失败");
    }
};

六、RabbitMQ 事务

此外,RabbitMQ 还提供了事务机制,就像给我们的魔法操作加上了一层坚固的护盾。通过事务,我们可以确保消息的发送和相关操作要么全部成功,要么全部失败,从而避免出现消息发送成功但后续处理失败的情况。
实现步骤:

  1. 开启事务:在生产者代码中,使用 channel.txSelect () 方法开启事务。
  2. 发送消息:像平常一样发送消息到 RabbitMQ 队列中。
  3. 提交事务:如果消息发送和相关操作都成功,使用 channel.txCommit () 方法提交事务,此时消息才会真正被发送到队列中。
  4. 回滚事务:如果在发送消息或执行相关操作过程中出现错误,可以使用 channel.txRollback () 方法回滚事务,消息将不会被发送到队列中。

好啦,今天关于 RabbitMQ 中如何保证消息不被重复消费、死信队列与重试机制以及事务的实现方法就介绍到这里啦!希望这些魔法秘籍能够帮助大家在实际开发中更好地驾驭 RabbitMQ,避免消息重复消费和丢失带来的混乱,让我们的系统更加稳定和可靠。

小伙伴们,你们在使用 RabbitMQ 的奇妙旅程中还遇到过哪些有趣的挑战和问题呢?快来评论区和大家分享吧!让我们一起在 Java 的魔法世界里不断探索,共同成长!

别忘了,点赞、分享、关注,让我们一起在技术的海洋里乘风破浪,共创辉煌!你的每一次互动,都是我前进的动力!

相关推荐

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

备份软件调用rman接口备份报错RMAN-06820 ORA-17629 ORA-17627

一、报错描述:备份归档报错无法连接主库进行归档,监听问题12541RMAN-06820:WARNING:failedtoarchivecurrentlogatprimarydatab...

增量备份修复物理备库gap(增量备份恢复数据库步骤)

适用场景:主备不同步,主库归档日志已删除且无备份.解决方案:主库增量备份修复dg备库中的gap.具体步骤:1、停止同步>alterdatabaserecovermanagedstand...

一分钟看懂,如何白嫖sql工具(白嫖数据库)

如何白嫖sql工具?1分钟看懂。今天分享一个免费的sql工具,毕竟现在比较火的NavicatDbeaverDatagrip都需要付费才能使用完整功能。幸亏今天有了这款SQLynx,它不仅支持国内外...

「开源资讯」数据管理与可视化分析平台,DataGear 1.6.1 发布

前言数据齿轮(DataGear)是一款数据库管理系统,使用Java语言开发,采用浏览器/服务器架构,以数据管理为核心功能,支持多种数据库。它的数据模型并不是原始的数据库表,而是融合了数据库表及表间关系...

您还在手工打造增删改查代码么,该神器带你脱离苦海

作为Java开发程序,日常开发中,都会使用Spring框架,完成日常的功能开发;在相关业务系统中,难免存在各种增删改查的接口需求开发。通常来说,实现增删改查有如下几个方式:纯手工打造,编写各种Cont...

Linux基础知识(linux基础知识点及答案)

系统目录结构/bin:命令和应用程序。/boot:这里存放的是启动Linux时使用的一些核心文件,包括一些连接文件以及镜像文件。/dev:dev是Device(设备)的缩写,该目录...

PL/SQL 杂谈(二)(pl/sql developer使用)

承接(一)部分。我们从结构和功能这两个方面展示PL/SQL的关键要素。可以看看PL/SQL的优雅的代码。写出一个好的代码,就和文科生写出一篇优秀的作文一样,那么赏心悦目。1、与SQL的集成PL/S...

电商ERP系统哪个好用?(电商erp哪个好一点)

电商ERP系统哪个好用?做电商的,谁还没被ERP折腾过?有老板说:“我们早就上了ERP,订单、库存、财务全搞定,系统用得飞起。”也有运营吐槽:“系统是上了,可库存老不准,订单漏单错单天天有,财务对账还...

汽车检测线系统实例,看集中控制与PLC分布控制

PLC可编程控制器,上个世纪70年代初,为取代早期继电器控制线路,开始采取存储指令方式,完成顺序控制而设计的。开始仅有逻辑运算、计时、计数等简单功能。随着微处理的发展,PLC可编程能力日益提高,已经能...

苹果五件套成公司年会奖品主角,几大小技巧教你玩转苹果新品

钱江晚报·小时新闻记者张云山随着春节的临近,各家大公司的年会又将陆续上演。上周,各大游戏公司的年会大奖,苹果五件套又成了标配。在上海的游戏公司中,莉莉丝奖品列表拉得相当长,从特等奖到九等奖还包含了特...

取消回复欢迎 发表评论: