百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis 如何实现库存扣减操作和防止被超卖?

mhr18 2024-11-24 18:56 23 浏览 0 评论

电商当项目经验已经非常普遍了,不管你是包装的还是真实的,起码要能讲清楚电商中常见的问题,比如库存的操作怎么防止商品被超卖

解决方案:

  • 基于数据库单库存
  • 基于数据库多库存
  • 基于redis

基于redis实现扣减库存的具体实现

  • 初始化库存回调函数(IStockCallback)
  • 扣减库存服务(StockService)
  • 调用

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

  1. 使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。
  2. 还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。
  3. 将库存放到redis使用redis的incrby特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

  • 用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:
update number set x=x-1 where x > 0

  • MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

基于redis实现扣减库存的具体实现

  • 我们使用redis的lua脚本来实现扣减库存
  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback )

/**
 * 获取库存回调
 */
public interface IStockCallback {

 /**
  * 获取库存
  * @return
  */
 int getStock();
}

扣减库存服务(StockService)

/**
 * 扣库存
 *
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
?
    /**
     * 不限库存
     */
    public static final long UNINITIALIZED_STOCK = -3L;
?
    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
?
    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;
?
    static {
        /**
         *
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *   -3:库存未初始化
         *   -2:库存不足
         *   -1:不限库存
         *   大于等于0:剩余库存(扣减之后剩余的库存)
         *      redis缓存的库存(value)是-1表示不限库存,直接返回1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
?
    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
?
        }
        return stock;
    }
?
    /**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {
?
        return addStock(key, null, num);
    }
?
    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
?
        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
?
        return num;
    }
?
    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
?
    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
?
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
?
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
?
}

调用

@RestController
public class StockController {
?
    @Autowired
    private StockService stockService;
?
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
?
    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }
?
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
?
        return stockService.getStock(redisKey);
    }
?
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
?
        return stockService.addStock(redisKey, 2);
    }
}

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: