阿里二面:RabbitMQ中如何保证消息不被重复消费?
mhr18 2024-12-04 13:59 21 浏览 0 评论
想象一下,你正在构建一个超级重要的电商系统,就像一个庞大而繁忙的商业帝国。在这个系统中,消息的传递就像是帝国的信使,负责在各个部门之间传递重要的指令和信息。但是,如果这些消息像调皮的小精灵一样,时不时地重复出现,那可就会引发一场大混乱啦!比如说,用户可能会收到重复的订单通知,商家可能会重复处理同一个订单,这不仅会让用户感到困惑和不满,还可能给系统带来严重的问题。所以呀,在 RabbitMQ 中,如何保证消息不被重复消费,就成了我们必须要攻克的一个难题!我将带你深入RabbitMQ的腹地,揭秘那些让消息消费稳如泰山的独门秘籍,让你的系统从此告别“重复消费”的噩梦!
在RabbitMQ的世界里,保证消息不被重复消费,其实是一场与时间和逻辑的精彩博弈。关键在于,我们要确保每个消息只被处理一次,即便在系统故障、重启或网络波动等情况下也能如此。那么,如何实现这一宏伟目标呢?
一、消息唯一标识
首先,我们要为每一条消息赋予一个独一无二的标识,就像给每个小精灵贴上专属的魔法标签。在发送消息的时候,把这个魔法标签一并发送出去。当消费者接收到消息后,先检查一下这个标签是否已经见过了。如果见过,那就说明这个小精灵已经完成任务啦,直接忽略它;如果没见过,那就让它开始工作,并把这个魔法标签记录下来,以便下次再见到时能够识别出来。
实现步骤:
- 在发送消息之前,生成一个唯一的消息标识,比如使用 UUID 算法。
- 将消息标识和消息内容一起发送到 RabbitMQ 中。
- 消费者接收到消息后,从消息中提取出消息标识,进行重复判断。
import java.util.UUID;
// 生成唯一消息标识的方法
public class MessageUtils {
public static String generateUniqueMessageId() {
// 使用 UUID 生成唯一标识
return UUID.randomUUID().toString();
}
}
举个例子,在我们的电商魔法王国中,当用户下单成功后,系统会这样发送消息:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 生成唯一消息标识
String messageId = MessageUtils.generateUniqueMessageId();
// 要发送的消息内容
String message = "订单信息:商品 A,数量 1";
// 将消息标识和消息内容一起发送到 RabbitMQ 中
channel.basicPublish("exchange_name", "routing_key", null, (messageId + "," + message).getBytes());
System.out.println("消息已发送:" + message);
// 关闭信道和连接
channel.close();
connection.close();
}
}
消费者接收到消息后,从消息中提取出消息标识,进行重复判断:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashSet;
import java.util.Set;
public class Consumer {
private static final Set<String> processedMessageIds = new HashSet<>();
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("queue_name", true, false, false, null);
// 设置消息接收回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 从消息中提取消息标识
String messageId = message.split(",")[0];
// 检查消息标识是否已处理过
if (!processedMessageIds.contains(messageId)) {
System.out.println("接收到新消息:" + message);
// 处理消息逻辑...
// 将消息标识添加到已处理集合中
processedMessageIds.add(messageId);
} else {
System.out.println("重复消息,已忽略:" + message);
}
};
// 开始接收消息
channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> { });
}
}
二、数据库去重表
另一种方法是利用数据库来实现消息的去重。我们可以在数据库中创建一个专门的去重表,用来记录已经处理过的消息标识。当消费者接收到消息后,先在去重表中查询这个消息标识是否存在。如果存在,说明这条消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并在去重表中插入这条消息的标识。
比如说,我们可以在数据库中创建一个名为 “message_processed” 的表,包含两个字段:“message_id” 和 “processed_time”。当消费者接收到消息后,先查询 “message_processed” 表中是否存在相同的 “message_id”。如果存在,说明消息已经处理过了;如果不存在,就处理消息,并在 “message_processed” 表中插入一条新记录,记录消息标识和处理时间。
实现步骤:
- 创建一个数据库表,用于存储已处理消息的标识。
- 消费者在处理消息之前,先查询数据库表中是否存在该消息标识。
- 如果不存在,处理消息,并将消息标识插入到数据库表中。
三、Redis 去重
除了数据库,我们还可以借助 Redis 来实现消息的去重。Redis 是一个非常高效的内存数据库,它的操作速度非常快,可以满足高并发场景下的需求。我们可以使用 Redis 的集合(Set)数据结构来存储已经处理过的消息标识。当消费者接收到消息后,先在 Redis 中查询这个消息标识是否存在于集合中。如果存在,说明消息已经处理过了,直接忽略;如果不存在,就正常处理消息,并将消息标识添加到 Redis 集合中。
比如说,在我们的电商系统中,当用户支付成功后,系统会发送一条支付成功的消息到 RabbitMQ 中。消费者在接收到消息后,先从 Redis 中获取一个名为 “processed_messages” 的集合,然后检查集合中是否包含该消息的标识。如果包含,说明消息已经处理过了;如果不包含,就处理消息,并将消息标识添加到 “processed_messages” 集合中。
实现步骤:
- 连接到 Redis 服务器。
- 在 Redis 中创建一个集合,用于存储已处理消息的标识。
- 消费者在处理消息之前,先查询 Redis 集合中是否存在该消息标识。
- 如果不存在,处理消息,并将消息标识添加到 Redis 集合中。
四、死信队列与重试机制
在 RabbitMQ 的魔法世界里,还有一个非常强大的工具,那就是死信队列(Dead Letter Queue,简称 DLQ)和重试机制。当消息在处理过程中遇到问题,比如消费者处理消息时抛出了异常,或者消息在队列中等待了太长时间而没有被消费,这些消息就可以被转移到死信队列中。然后,我们可以针对死信队列中的消息进行重试或者其他特殊处理。
实现步骤:
- 创建普通队列和死信交换器、死信队列:首先,我们需要创建一个普通的队列,用于接收和处理消息。同时,还需要创建一个死信交换器和一个死信队列。死信交换器用于将死信路由到死信队列中。
channel.queueDeclare("normal_queue", true, false, false, null);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlq_queue", true, false, false, null);
channel.queueBind("dlq_queue", "dlx_exchange", "dlq_routing_key");
- 设置普通队列的死信参数:在创建普通队列时,我们需要设置一些死信参数,告诉 RabbitMQ 当消息满足什么条件时,将其转移到死信队列中。这些参数包括死信交换器的名称、死信路由键以及消息的过期时间等。
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlq_routing_key");
args.put("x-message-ttl", 60000); // 设置消息过期时间为 60 秒(可根据实际情况调整)
channel.queueDeclare("normal_queue", true, false, false, args);
- 消费者处理消息并进行重试:当消费者从普通队列中获取到消息并进行处理时,如果处理过程中出现异常,我们可以根据实际情况决定是否进行重试。如果需要重试,我们可以将消息重新发送回原来的队列,或者发送到一个专门用于重试的队列中。在重新发送消息之前,我们可以根据需要修改消息的一些属性,比如重试次数等。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息逻辑,可能会抛出异常
System.out.println("正在处理消息:" + message);
// 模拟处理消息时出现异常
throw new Exception("处理消息失败");
} catch (Exception e) {
System.out.println("处理消息失败,将进行重试");
// 获取消息的重试次数
int retryCount = getRetryCount(delivery);
if (retryCount < 3) { // 假设最大重试次数为 3 次
// 增加重试次数
retryCount++;
// 将重试次数设置到消息头部
AMQP.BasicProperties properties = delivery.getProperties();
properties = properties.builder().headers(putRetryCount(retryCount)).build();
// 将消息重新发送回队列
channel.basicPublish("", "normal_queue", properties, delivery.getBody());
} else {
// 超过最大重试次数,将消息发送到死信队列
System.out.println("超过最大重试次数,将消息发送到死信队列");
channel.basicPublish("dlx_exchange", "dlq_routing_key", null, delivery.getBody());
}
}
};
- 处理死信队列中的消息:我们可以创建一个专门的消费者来处理死信队列中的消息。这个消费者可以根据具体的业务需求,对死信队列中的消息进行相应的处理,比如记录日志、发送告警通知等。
DeliverCallback dlqDeliverCallback = (consumerTag, delivery) -> {
String deadLetterMessage = new String(delivery.getBody(), "UTF-8");
System.out.println("从死信队列中获取到消息:" + deadLetterMessage);
// 处理死信队列中的消息逻辑...
};
channel.basicConsume("dlq_queue", true, dlqDeliverCallback, consumerTag -> { });
五、ACK 机制
ACK(Acknowledgment)机制就像是消费者给 RabbitMQ 的一个反馈信号,告诉它消息是否已经被成功处理。当消费者成功处理完一条消息后,它会向 RabbitMQ 发送一个 ACK 确认消息。RabbitMQ 收到 ACK 后,就会知道这条消息已经被处理了,然后将其从队列中移除。如果消费者在处理消息的过程中出现了问题,或者没有在规定的时间内发送 ACK,RabbitMQ 会认为这条消息没有被成功处理,从而将其重新发送给其他消费者进行处理。
实现步骤:
- 消费者在订阅队列时,设置 autoAck 参数为 false,表示需要手动发送 ACK。
channel.basicConsume("queue_name", false, deliverCallback, consumerTag -> { });
- 在消息处理逻辑中,当消息处理成功后,调用 channel.basicAck 方法发送 ACK 确认消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息逻辑
System.out.println("正在处理消息:" + message);
// 处理成功后发送 ACK 确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 处理消息失败时的逻辑
System.out.println("处理消息失败");
}
};
六、RabbitMQ 事务
此外,RabbitMQ 还提供了事务机制,就像给我们的魔法操作加上了一层坚固的护盾。通过事务,我们可以确保消息的发送和相关操作要么全部成功,要么全部失败,从而避免出现消息发送成功但后续处理失败的情况。
实现步骤:
- 开启事务:在生产者代码中,使用 channel.txSelect () 方法开启事务。
- 发送消息:像平常一样发送消息到 RabbitMQ 队列中。
- 提交事务:如果消息发送和相关操作都成功,使用 channel.txCommit () 方法提交事务,此时消息才会真正被发送到队列中。
- 回滚事务:如果在发送消息或执行相关操作过程中出现错误,可以使用 channel.txRollback () 方法回滚事务,消息将不会被发送到队列中。
好啦,今天关于 RabbitMQ 中如何保证消息不被重复消费、死信队列与重试机制以及事务的实现方法就介绍到这里啦!希望这些魔法秘籍能够帮助大家在实际开发中更好地驾驭 RabbitMQ,避免消息重复消费和丢失带来的混乱,让我们的系统更加稳定和可靠。
小伙伴们,你们在使用 RabbitMQ 的奇妙旅程中还遇到过哪些有趣的挑战和问题呢?快来评论区和大家分享吧!让我们一起在 Java 的魔法世界里不断探索,共同成长!
别忘了,点赞、分享、关注,让我们一起在技术的海洋里乘风破浪,共创辉煌!你的每一次互动,都是我前进的动力!
相关推荐
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
- 备份软件调用rman接口备份报错RMAN-06820 ORA-17629 ORA-17627
-
一、报错描述:备份归档报错无法连接主库进行归档,监听问题12541RMAN-06820:WARNING:failedtoarchivecurrentlogatprimarydatab...
- 增量备份修复物理备库gap(增量备份恢复数据库步骤)
-
适用场景:主备不同步,主库归档日志已删除且无备份.解决方案:主库增量备份修复dg备库中的gap.具体步骤:1、停止同步>alterdatabaserecovermanagedstand...
- 一分钟看懂,如何白嫖sql工具(白嫖数据库)
-
如何白嫖sql工具?1分钟看懂。今天分享一个免费的sql工具,毕竟现在比较火的NavicatDbeaverDatagrip都需要付费才能使用完整功能。幸亏今天有了这款SQLynx,它不仅支持国内外...
- 「开源资讯」数据管理与可视化分析平台,DataGear 1.6.1 发布
-
前言数据齿轮(DataGear)是一款数据库管理系统,使用Java语言开发,采用浏览器/服务器架构,以数据管理为核心功能,支持多种数据库。它的数据模型并不是原始的数据库表,而是融合了数据库表及表间关系...
- 您还在手工打造增删改查代码么,该神器带你脱离苦海
-
作为Java开发程序,日常开发中,都会使用Spring框架,完成日常的功能开发;在相关业务系统中,难免存在各种增删改查的接口需求开发。通常来说,实现增删改查有如下几个方式:纯手工打造,编写各种Cont...
- Linux基础知识(linux基础知识点及答案)
-
系统目录结构/bin:命令和应用程序。/boot:这里存放的是启动Linux时使用的一些核心文件,包括一些连接文件以及镜像文件。/dev:dev是Device(设备)的缩写,该目录...
- PL/SQL 杂谈(二)(pl/sql developer使用)
-
承接(一)部分。我们从结构和功能这两个方面展示PL/SQL的关键要素。可以看看PL/SQL的优雅的代码。写出一个好的代码,就和文科生写出一篇优秀的作文一样,那么赏心悦目。1、与SQL的集成PL/S...
- 电商ERP系统哪个好用?(电商erp哪个好一点)
-
电商ERP系统哪个好用?做电商的,谁还没被ERP折腾过?有老板说:“我们早就上了ERP,订单、库存、财务全搞定,系统用得飞起。”也有运营吐槽:“系统是上了,可库存老不准,订单漏单错单天天有,财务对账还...
- 汽车检测线系统实例,看集中控制与PLC分布控制
-
PLC可编程控制器,上个世纪70年代初,为取代早期继电器控制线路,开始采取存储指令方式,完成顺序控制而设计的。开始仅有逻辑运算、计时、计数等简单功能。随着微处理的发展,PLC可编程能力日益提高,已经能...
- 苹果五件套成公司年会奖品主角,几大小技巧教你玩转苹果新品
-
钱江晚报·小时新闻记者张云山随着春节的临近,各家大公司的年会又将陆续上演。上周,各大游戏公司的年会大奖,苹果五件套又成了标配。在上海的游戏公司中,莉莉丝奖品列表拉得相当长,从特等奖到九等奖还包含了特...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)