百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

redis延迟队列,处理正常订单超时自动关闭

mhr18 2024-11-02 11:53 20 浏览 0 评论

demo代码地址

链接:https://pan.baidu.com/s/1yY_84ANvwh78gE6G0HXnBg
提取码:gzvu

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.9.1</version>
        </dependency>

1.创建redis连接 连接池

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @author : zhang sq
 * @date : 2019/8/22 11:56
 **/
@Configuration
public class RedisConfig {
 
    @Value("${redis.addresses}")
    private String[] addresses;  //地址配置在配置文件上
 
    @Bean
    public RedissonClient createRedisAPi(){
        //redis集群配置 start
        Config redissonConfig = new Config();
        //改用redisson后为了之间数据能兼容,这里修改编码为org.redisson.client.codec.StringCodec
        redissonConfig.setCodec(new org.redisson.client.codec.StringCodec());
        ClusterServersConfig clusterServersConfig = redissonConfig.useClusterServers();
        clusterServersConfig.setScanInterval(2000).addNodeAddress(addresses);
//                .addNodeAddress("redis://127.0.0.1:7000")
//                .addNodeAddress("redis://127.0.0.1:7001")
//                .addNodeAddress("redis://127.0.0.1:7002")
//                .addNodeAddress("redis://127.0.0.1:7003")
//                .addNodeAddress("redis://127.0.0.1:7004")
//                .addNodeAddress("redis://127.0.0.1:7005");
        //设置对于master节点的连接池中连接数最大为500
        clusterServersConfig.setMasterConnectionPoolSize(500);
        //设置密码
//        clusterServersConfig.setPassword("1qaz@WSX");
        //设置对于slave节点的连接池中连接数最大为500
        clusterServersConfig.setSlaveConnectionPoolSize(500);
        //如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,
        // 那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
        clusterServersConfig.setIdleConnectionTimeout(10000);
        //同任何节点建立连接时的等待超时。时间单位是毫秒。
        clusterServersConfig.setConnectTimeout(30000);
        //等待节点回复命令的时间。该时间从命令发送成功时开始计时。
        clusterServersConfig.setTimeout(3000);
        clusterServersConfig.setPingTimeout(30000);
        //当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。
        clusterServersConfig.setReconnectionTimeout(3000);
        //redis集群配置 end
        return Redisson.create(redissonConfig);
 
        //单redis连接 start
//        SingleServerConfig singleServerConfig = redissonConfig.useSingleServer();
//        singleServerConfig.setAddress("redis://127.0.0.1:6379");
//        singleServerConfig.setPassword("zsq2170");
//          设置redis几号数据库
//        singleServerConfig.setDatabase(0);
//        singleServerConfig.setClientName("jiuyue");
//        singleServerConfig.setConnectTimeout(10000);
//        singleServerConfig.setConnectionPoolSize(300);
//        return Redisson.create(redissonConfig);
        //单redis连接 end
    }
}

2.编写测试 生产者 ,正式生产应该在提交订单的时候生产

@RestController
@RequestMapping("/test")
@Api(tags = "生产者",description = "OrderProducerController")
public class OrderProducerController {
 
    @Autowired
    private RedissonClient redisson; //注入redis连接
 
    @PostMapping("/producer")
    @ApiOperation(value = "生产")
        public void Producer(){
        RBlockingDeque<Order> blockingDeque = redisson.getBlockingDeque("jiuyang_order_message"); //获取阻塞队列<名称和消费时获取名称一致>
        RDelayedQueue<Order> delayedQueue = redisson.getDelayedQueue(blockingDeque); //加入延迟队列
        for (long i = 100; i < 120; i++) {  //循环放20个
            try {
                Order order=new Order();
                order.setId(123L);
                //orderInfo.setCreateTime(DateHelper.date2String(new Date()));
                delayedQueue.offer(order,20, TimeUnit.SECONDS);  //将对象发送到redis延迟队列 ,20 单位秒 <这里单位可以选择的>
                System.out.println("===="+i);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

3.编写消费者消费 创建配置类,启动服务时便执行消费方法

@Component
public class ApplicationRunnerImpl implements ApplicationRunner {
 
    @Autowired
    private RefundOrderServiceImpl refundOrderService;
    
    //tomcat启动执行此方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        refundOrderService.refundOrder();
    }
}

4.多线程处理业务,先创建线程池

@Component
public class MyAsyncConfigurer implements AsyncConfigurer {
 
    private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);
 
    @Override
    @Bean
    public ThreadPoolTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数,默认为1
        threadPool.setCorePoolSize(4);
        // 当核心线程都在跑任务,还有多余的任务会存到此处。
        threadPool.setQueueCapacity(30);
        //最大线程数,默认为Integer.MAX_VALUE,如果queueCapacity存满了,还有任务就会启动更多的线程,直到线程数达到maxPoolSize。如果还有任务,则根据拒绝策略进行处理。
        threadPool.setMaxPoolSize(8);
        // 设置线程活跃时间(秒)
        threadPool.setKeepAliveSeconds(120);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;
    }
}

5.多线程消费延迟队列任务

@Service
@Transactional
public class RefundOrderServiceImpl implements RefundOrderService {
 
    private static final Logger logger = LoggerFactory.getLogger(RefundOrderServiceImpl.class);
 
    @Autowired
    private RefundOrderMapper refundOrderMapper;
 
    @Autowired
    private RedissonClient redissonClient;  //redis
 
    @Autowired
    private ThreadPoolTaskExecutor scheduledThreadPoolExecutor; //线程
 
    //读取阻塞队列,多线程执行
    public void refundOrder() {
        RBlockingDeque<Order> blockingDeque = redissonClient.getBlockingDeque("jiuyang_order_message");
        scheduledThreadPoolExecutor.execute(new SendingTask(blockingDeque));
    }
 
 
    private class SendingTask implements Runnable {
 
        RBlockingDeque<Order> blockingDeque = null;
 
        public SendingTask(RBlockingDeque<Order> blockingDeque) {
            this.blockingDeque = blockingDeque;
        }
 
        @Override
        public void run() {
            while (true) {
                try {
                    /**
                     * blockqueue,当拿到数据时,才往下执行.没有则等待
                     */
                    Order order = blockingDeque.take();
                    scheduledThreadPoolExecutor.execute(new BusiTask(order));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
 
 
    private class BusiTask implements Runnable {
 
        Order order = null;
 
        public BusiTask(Order order) {
            this.order = order;
        }
 
        @Override
        @Transactional
        public void run() {
            try {
                if (order != null) {
                    Long orderId = order.getId();
                    String orderSn = order.getOrderSn();
                    //查询订单状态
                    Order newOrder = refundOrderMapper.selectOrderById(orderId, orderSn);
                    //订单状态:0->待付款;1->待发货;2->已发货;3->已收货待评价;4->已关闭;5->无效订单;6->已评价;8->退款中
                    Integer status = newOrder.getStatus();
                    if (status == 0) {
                        refundOrderMapper.updateStatus(orderId, orderSn, 4); //关闭订单
                        logger.info("未支付订单已关闭");
                        System.err.println("未支付订单已关闭");
                        //退还商品库存  查询订单商品表退库存
                       List<OmsOrderItem> orderItemList= refundOrderMapper.selectQuantity(orderId,orderSn);
                       if(TargetUtils.listNotNull(orderItemList)){
                           for (OmsOrderItem orderItem:orderItemList){
                               String productId = orderItem.getProductId(); //商品id
                               Integer quantity = orderItem.getProductQuantity(); //购买数量
                               //修改库存,退款库存
                               refundOrderMapper.updateProductSku(productId,quantity);
                               logger.info("未支付订单购买商品库存已退还");
                               System.err.println("未支付订单购买商品库存已退还");
                           }
                       }
                       //退还优惠券
                        Long couponId = newOrder.getCouponId(); //获取优惠券id
                        if (null != couponId && couponId > 0) { //有优惠券
                            refundOrderMapper.updateCouponId(couponId); //退还优惠券
                            logger.info("优惠券已退还");
                            System.err.println("优惠券已退还");
                        }
                        //退还礼品卡钱
                        String cardSn = newOrder.getCardSn();//卡号
                        if(StringUtils.isNotEmpty(cardSn)){
                            BigDecimal cardAmount = newOrder.getCardAmount(); //消费金额
                            //退还礼品卡消费金额
                            refundOrderMapper.updateCardOrder(cardSn,cardAmount);
                            logger.info("礼品卡消费金额已退还");
                            System.err.println("礼品卡消费金额已退还");
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
 
}

相关推荐

如何通过 Redis 日志排查连接超时问题

Redis是一种高性能的内存数据存储服务,但在高并发或误配置情况下,可能会出现连接超时问题。借助Redis日志,可以快速定位并解决连接超时的根本原因。以下是具体的排查和解决步骤:1.什么是R...

给你1亿的Redis key,如何高效统计?

前言有些小伙伴在工作中,可能遇到过这样的场景:老板突然要求统计Redis中所有key的数量,你随手执行了KEYS*命令,下一秒监控告警疯狂闪烁——整个Redis集群彻底卡死,线上服务大面积瘫痪。今天...

Redis分布式锁的安全性分析与实践指南

一、Redis分布式锁的核心原理Redis分布式锁通过SETNX(SetifNotExists)和EXPIRE(Expire)指令实现原子性操作,结合UUID生成唯一标识符,确保锁的互斥性和安全...

高可用Redis分布式锁:秒杀系统中的锁战

引言在分布式系统中,“程序猿的终极武器是并发控制”。当多个服务实例同时访问共享资源时,如何避免数据不一致和重复操作?答案是分布式锁。Redis凭借其高性能和原子性操作,成为实现分布式锁的首选方案。...

Redis分布式锁(redis分布式锁解决超卖)

场景描述简单模拟一个高并发库存扣减场景,商品库存加载到Redis缓存,如:127.0.0.1:6379>setproduct:stock:101200无锁状态操作从缓存中获取对应商品的库存...

Redis 分布式锁和 ZooKeeper分布式锁

Redis分布式锁和ZooKeeper(简称zk)分布式锁都是用来解决在分布式系统中多个节点之间竞争资源的问题。它们各自有不同的特点和适用场景。Redis分布式锁Redis实现分布式锁主要是...

Redis vs ZooKeeper锁:高并发下的生死对决,谁才是最终赢家?

在分布式系统中,锁是控制资源访问的重要机制。Redis和ZooKeeper作为两种主流的分布式锁实现方案,各有优劣。本文将从原理、性能、代码实现三个维度进行硬核对比,助你做出最佳技术选型。一、原理对比...

说说Redis的大key(redis key大小限制)

一句话总结Redis大key指存储超大值(如字符串过大、集合元素过多)的键。主要成因包括:1.设计不合理,未拆分数据结构;2.业务需求(如缓存整页数据);3.数据持续积累未清理;4.使用不当的集合类型...

PHP Laravel框架底层机制(php框架的底层原理)

当然可以,Laravel是最受欢迎的PHP框架之一,以优雅的语法和丰富的生态而闻名。尽管开发体验非常“高端”,它的底层其实是由一系列结构清晰、职责分明的组件构成的。下面我从整体架构、核心流程、...

PHP性能全面优化-值得收藏(php优化网站性能)

PHP项目卡顿频发,老技巧失灵?隐藏漏洞竟在代码循环里。上周公司服务器突然开始卡顿,测试发现用户请求响应时间翻倍。我们先按以前学的方法做了基准测试,用AB工具压测时发现2000并发就有5%错误,换成S...

PHP+UniApp:低成本打造外卖系统横扫App+小程序+H5全平台

在餐饮行业数字化转型中,外卖系统开发常面临两大痛点:高昂的开发成本(需独立开发App、小程序、H5)和多端维护的复杂性。PHP+UniApp的组合通过技术复用与跨平台能力,为中小商家和开发者提供了“降...

从需求到上线:PHP+Uniapp校园圈子系统源码的架构设计与性能优化

一、需求分析与架构设计1.核心功能需求用户体系:支持手机号/微信登录、多角色权限(学生、教师、管理员)。圈子管理:支持创建/加入兴趣圈子(如学术、电竞)、标签分类、动态发布与审核。实时互动:点赞、评...

PHP 8.0性能翻3倍?四年亲测:这些项目升了哭晕!

2020年那个感恩节,当PHP8.0带着“性能翻倍”的豪言横空出世时,无数程序员连夜备份代码准备升级。四年过去了,那些宣称“性能提升3倍”的项目,真的跑出火箭速度了吗?还记得当时铺天盖地的宣传吗?“...

我把 Mac mini 托管到机房了:一套打败云服务器的终极方案

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:薯仔不爱吃薯仔我把我积灰的Macmini托管到机房了,有图有真相。虽然画质又渣又昏暗,但是!这就是实锤。作为开发者,谁不想拥有个自己的服...

从phpstudy到Docker:我用一个下午让开发效率翻倍的实战指南

一、为什么放弃phpstudy?上周三下午,我花了3小时将本地开发环境从phpstudy迁移到Docker,没想到第二天团队反馈:环境部署时间从2小时压缩到5分钟,跨设备协作bug减少70%。作为一个...

取消回复欢迎 发表评论: