百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis + MQ:高并发秒杀的技术方案与实现

mhr18 2025-07-23 16:30 4 浏览 0 评论

大家好,我是一安~

前言

在电商秒杀场景中,瞬间爆发的海量请求往往成为系统的 生死考验 。当并发量达到数万甚至数十万 QPS 时,传统数据库单表架构难以支撑,而 Redis 与消息队列( MQ )的组合凭借其高性能与可靠性,成为应对高并发秒杀的 黄金方案

方案总览

用户请求 → 前端生成Token → Redis执行Lua脚本(预扣减+防重+流水)→ 发送RocketMQ事务消息 → 
[本地事务校验Redis结果] → MQ消息确认(COMMIT/ROLLBACK)→ 消费者消费消息 → MySQL扣减库存+记录订单

秒杀系统的核心诉求是抗并发、防超卖、保一致。 Redis+MQ 方案通过 “前端拦截 - 中间缓冲 - 后端落地” 的三层架构实现这一目标:

流程拆解(示例代码)

Redis 库存预扣减

预扣减流程
开始

├─ 生成Token(前端)

├─ 前端携带Token请求秒杀

├─ 执行Lua脚本
│ │
│ ├─ 检查Token是否存在(Hash结构)
│ │ ├─ 存在 → 返回“重复提交”
│ │ └─ 不存在 → 继续
│ │
│ ├─ 获取Redis库存(String结构)
│ │ ├─ 库存不足 → 返回“库存不足”
│ │ └─ 库存充足 → 继续
│ │
│ ├─ 扣减Redis库存并更新
│ │
│ └─ 记录流水到Hash结构

├─ 返回扣减结果(成功/失败)

结束
Lua 脚本
-- 启用Redis命令复制,确保脚本在集群环境中正确同步
redis.replicate_commands()

-- 1. 防重提交校验:通过用户ID+Token判断是否重复提交
-- KEYS[2]为用户ID(uid),ARGV[2]为本次请求的Token
if redis.call('hexists', KEYS[2], ARGV[2]) == 1 then
return redis.error_reply('repeat submit') -- 重复提交,返回错误
end

-- 2. 库存充足性校验
local product_id = KEYS[1] -- 商品ID
local stock = redis.call('get', KEYS[1]) -- 获取当前库存
if not stock then -- 库存不存在(如商品未上架)
return redis.error_reply('product not found')
end
if tonumber(stock) < tonumber(ARGV[1]) then -- 库存不足
return redis.error_reply('stock is not enough')
end

-- 3. 执行库存扣减
local remaining_stock = tonumber(stock) - tonumber(ARGV[1])
redis.call('set', KEYS[1], tostring(remaining_stock)) -- 更新库存

-- 4. 记录交易流水(用于后续一致性校验)
local time = redis.call('time') -- 获取当前时间(秒+微秒)
local currentTimeMillis = (time[1] * 1000) + math.floor(time[2] / 1000) -- 转换为毫秒时间戳
-- 存储流水到Hash结构:用户ID → Token → 流水详情
redis.call('hset', KEYS[2], ARGV[2],
cjson.encode({
action = '扣减库存',
product = product_id,
from = stock, -- 扣减前库存
to = remaining_stock, -- 扣减后库存
change = ARGV[1], -- 扣减数量
token = ARGV[2],
timestamp = currentTimeMillis
})
)

return remaining_stock -- 返回扣减后库存
Java 调用 Lua
@Service
public class SeckillService {

@Autowired
private StringRedisTemplate redisTemplate;

// 加载Lua脚本
private DefaultRedisScript stockScript;

@PostConstruct
public void init() {
stockScript = new DefaultRedisScript<>();
stockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("seckill.lua")));
stockScript.setResultType(Long.class);
}

/**
* 执行Redis库存预扣减
* @param productId 商品ID
* @param uid 用户ID
* @param quantity 购买数量
* @param token 防重Token
* @return 扣减后库存(-1表示失败)
*/
public Long preDeductStock(String productId, String uid, Integer quantity, String token) {
try {
// 执行Lua脚本:KEYS = [商品ID, 用户ID],ARGV = [数量, Token]
return redisTemplate.execute(
stockScript,
Arrays.asList(productId, uid),
quantity.toString(),
token
);
} catch (Exception e) {
log.error("Redis预扣减失败", e);
return -1L;
}
}
}

MySQL 库存扣减

扣减流程图

开始

├─ 发送半消息到RocketMQ

├─ 执行本地事务
│ │
│ ├─ 检查Redis流水是否存在
│ │ ├─ 存在 → 提交消息(COMMIT)
│ │ └─ 不存在 → 回滚消息(ROLLBACK)
│ │
│ └─ 未知状态 → 等待回查

├─ RocketMQ回查机制
│ ├─ 有流水 → 提交消息
│ └─ 无流水 → 回滚消息

├─ 消息被消费
│ │
│ ├─ 查询数据库当前版本号(乐观锁)
│ │
│ ├─ 执行库存扣减(WHERE version = 当前版本)
│ │ ├─ 扣减成功 → 记录数据库流水
│ │ └─ 扣减失败 → 抛出异常(触发重试)
│ │
├─ 结束
发送半消息

系统首先向 RocketMQ 发送一条 半消息 Half Message )。此时消息处于不可消费状态,需等待生产者确认本地事务执行结果后,才会被消费者处理。

// 发送半消息
public void sendHalfMessage(String productId, String uid, String token, Integer quantity) {
// 构建消息
Message message = new Message(
"seckill_topic", // 主题
"stock_deduct", // 标签
JSON.toJSONString(new SeckillMessage(productId, uid, token, quantity)).getBytes()
);
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"seckill_producer_group", // 生产者组
message,
null // 本地事务参数(可传递上下文)
);
log.info("半消息发送结果:{}", result.getSendStatus());
}
本地事务校验

本地事务的核心是判断 Redis 预扣减是否成功:

@Component
public class SeckillTransactionListener implements TransactionListener {

@Autowired
private StringRedisTemplate redisTemplate;

// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 检查Redis中是否存在对应流水(验证预扣减成功)
Boolean flag = redisTemplate.opsForHash().hasKey(
message.getUid(), // Hash key:用户ID
message.getToken() // Hash field:Token
);
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN; // 未知状态,触发回查
}
}

// 消息回查(解决超时未确认问题)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
SeckillMessage message = JSON.parseObject(new String(msg.getBody()), SeckillMessage.class);
// 回查逻辑:再次检查流水是否存在
Boolean flag = redisTemplate.opsForHash().hasKey(message.getUid(), message.getToken());
return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
消费消息并扣减 MySQL 库存

消费者监听消息,执行数据库扣减(需保证 幂等性 ): 消费者接收到可消费的消息后,执行 MySQL 库存扣减操作,并同步记录数据库中的交易流水。为确保消费成功,需利用 MQ 的重试机制:若消费失败(如数据库暂时不可用), MQ 会自动重试,直至消费成功或达到最大重试次数(此时需人工介入处理)。

@Component
@RocketMQMessageListener(
topic = "seckill_topic",
consumerGroup = "seckill_consumer_group",
messageModel = MessageModel.CLUSTERING
)
public class SeckillConsumer implements RocketMQListener {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void onMessage(MessageExt message) {
SeckillMessage msg = JSON.parseObject(new String(message.getBody()), SeckillMessage.class);
String productId = msg.getProductId();
int quantity = msg.getQuantity();

// 数据库扣减(使用乐观锁防超卖)
String sql = "UPDATE product_stock " +
"SET stock = stock - ?, version = version + 1 " +
"WHERE product_id = ? AND stock >= ? AND version = ?";

// 1. 查询当前版本号
Integer version = jdbcTemplate.queryForObject(
"SELECT version FROM product_stock WHERE product_id = ?",
Integer.class,
productId
);

// 2. 执行扣减(乐观锁保证原子性)
int rows = jdbcTemplate.update(sql, quantity, productId, quantity, version);
if (rows > 0) {
// 扣减成功:记录数据库流水
jdbcTemplate.update(
"INSERT INTO stock_flow (product_id, quantity, op_type, create_time) " +
"VALUES (?, ?, 'SECKILL', NOW())",
productId, quantity
);
// 确认消费成功(返回ACK)
} else {
// 扣减失败:触发重试(MQ默认重试机制)
throw new RuntimeException("数据库扣减失败,触发重试");
}
}
}

一致性保障

为防止 Redis MySQL 数据不一致(如 Redis 扣减成功但 MySQL 扣减失败),需定期对账:

@Scheduled(cron = "0 0 */1 * * ?")  // 每小时执行一次
public void reconcileStock() {
// 1. 扫描Redis中未同步到MySQL的流水
Set uids = redisTemplate.keys("uid:*"); // 假设用户ID前缀为uid:
for (String uid : uids) {
Map tokenMap = redisTemplate.opsForHash().entries(uid);
for (Map.Entry entry : tokenMap.entrySet()) {
String token = (String) entry.getKey();
String flowJson = (String) entry.getValue();
SeckillFlow flow = JSON.parseObject(flowJson, SeckillFlow.class);

// 2. 检查MySQL是否有对应订单
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(1) FROM orders WHERE product_id = ? AND uid = ? AND token = ?",
Integer.class,
flow.getProduct(), flow.getUid(), token
);

if (count == 0) {
// 3. 未找到订单 → 人工介入或自动回滚Redis库存
log.warn("发现不一致:Redis有流水但MySQL无订单,product={}, uid={}", flow.getProduct(), uid);
// redisTemplate.opsForValue().increment(flow.getProduct(), Integer.parseInt(flow.getChange()));
}
}
}
}

系统可通过定时任务对比 Redis 流水、 MySQL 库存流水与订单表数据:若 Redis 流水存在但订单表无对应记录,说明订单生成失败,需人工介入补单或回滚 Redis 库存,避免 少卖 ;若订单表有记录但 MySQL 库存未扣减,则需触发库存补扣,避免 多卖

总结

Redis + MQ 方案通过 预扣减 + 事务消息 + 对账 三重机制,完美解决了高并发秒杀的核心痛点:

基于 FileAlterationListener 实现文件变化监听
MyBatis 批量更新实现技术解析
Java 文件处理:MultipartFile 与 File 互转
Spring Boot + MeiliSearch 快速整合指南
聚簇与非聚簇索引、回表及索引下推原理

相关推荐

Redis教程——数据类型(字符串、列表)

上篇文章我们学习了Redis教程——Redis入门,这篇文章我们学习Redis教程——数据类型(字符串、列表)。Redis数据类型有:字符串、列表、哈希表、集合、有序集合、地理空间、基数统计、位图、位...

说说Redis的数据类型(redis数据类型详解)

一句话总结Redis核心数据类型包括:String:存储文本、数字或二进制数据。List:双向链表,支持队列和栈操作。Hash:字段-值映射,适合存储对象。Set:无序唯一集合,支持交并差运算。Sor...

Redis主从复制(Redis主从复制复制文件)

介绍Redis有两种不同的持久化方式,Redis服务器通过持久化,把Redis内存中持久化到硬盘当中,当Redis宕机时,我们重启Redis服务器时,可以由RDB文件或AOF文件恢复内存中的数据。不过...

深入解析 Redis 集群的主从复制实现方式

在互联网大厂的后端开发领域,Redis作为一款高性能的内存数据库,被广泛应用于缓存、消息队列等场景。而Redis集群中的主从复制机制,更是保障数据安全、实现读写分离以及提升系统性能的关键所在。今...

Redis + MQ:高并发秒杀的技术方案与实现

大家好,我是一安~前言在电商秒杀场景中,瞬间爆发的海量请求往往成为系统的生死考验。当并发量达到数万甚至数十万QPS时,传统数据库单表架构难以支撑,而Redis与消息队...

Redis面试题2025(redis面试题及答案2024)

Redis基础什么是Redis?它的主要特点是什么?Redis和Memcached有什么区别?Redis支持哪些数据类型?Redis的字符串类型最大能存储多少数据?Redis的列表类型和集合类型有什么...

Redis学习笔记:过期键管理与EXPIRE命令详解(第七章)

在Redis中,过期键(ExpireKey)机制是实现缓存自动失效、临时数据管理的核心功能。EXPIRE命令作为设置键过期时间的基础工具,其工作原理与使用细节直接影响系统的内存效率和数据一致性。本章...

Redis传送术:几分钟内将生产数据迁移到本地

在生产环境中使用Redis就像一把双刃剑。它快速、强大,存储了大量实时数据——但当你想要在本地调试问题或使用真实数据进行测试时,事情就变得棘手了。我们要做什么?我们想要从生产环境Redis实例中导出键...

使用redis bitmap计算日活跃用户数

Metrics(指标)在允许延迟的情况下,通常通过job任务定时执行(如按小时、每天等频率),而基于Redis的Bitmap使我们能够实时完成此类计算,且极其节省空间。以亿级用户计算“日活跃用户...

大部分.NET开发者都不知道的Redis性能优化神技!

你还在为Redis存储空间不够而发愁吗?还在为Json数据太大导致网络传输缓慢而头疼吗?今天我要告诉你一个让Redis性能飙升300%的秘密武器!这个技巧简单到让你怀疑人生,但效果却强大到让你的老板对...

Redis学习笔记:内存优化实战指南(第六章)

Redis作为内存数据库,内存使用效率直接影响系统性能与成本。对于处理大规模数据的场景,合理的内存优化能显著降低资源消耗,提升服务稳定性。本章将基于Redis的内存管理特性,详解实用的优化技巧与最佳实...

大数据-47 Redis 内存控制、Key 过期与数据...

点一下关注吧!!!非常感谢!!持续更新!!!AI篇持续更新中!(长期更新)AI炼丹日志-30-新发布【1T万亿】参数量大模型!Kimi-K2开源大模型解读与实践,持续打造实用AI工具指南!...

Redis学习笔记:内存优化进阶与实战技巧(第六章·续)

上一节我们介绍了Redis内存优化的基础策略,本节将深入更多实战技巧,包括数据结构的精细化选择、过期键的内存回收机制,以及大规模场景下的内存管理方案,帮助你在高并发场景下进一步提升内存利用率。七、数据...

低配服务器(2核3G)宝塔面板的Redis优化指南:512MB内存高效运行

在2核3G内存的低配服务器上部署Redis服务时,资源分配不当极易导致服务器崩溃。本文针对宝塔面板环境(PHP8.2+MariaDB10.6+Nginx),提供经过实战验证的Redis优化...

Redis:为什么您应该多缓存少查询(为什么使用redis做缓存而不是其他的消息队列入kafka)

还在一次又一次地调用相同的API吗?这不仅效率低下——而且成本高昂。性能缓慢、成本更高,用户体验更差。让我们停止这种做法——从这篇文章开始。:D首先您需要了解Redis,简单来说,它是一个超快速的内存...

取消回复欢迎 发表评论: