百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

redis延迟队列,处理正常订单超时自动关闭

mhr18 2024-11-02 11:53 16 浏览 0 评论

demo代码地址

链接:https://pan.baidu.com/s/1yY_84ANvwh78gE6G0HXnBg
提取码:gzvu

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.9.1</version>
        </dependency>

1.创建redis连接 连接池

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.ClusterServersConfig;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @author : zhang sq
 * @date : 2019/8/22 11:56
 **/
@Configuration
public class RedisConfig {
 
    @Value("${redis.addresses}")
    private String[] addresses;  //地址配置在配置文件上
 
    @Bean
    public RedissonClient createRedisAPi(){
        //redis集群配置 start
        Config redissonConfig = new Config();
        //改用redisson后为了之间数据能兼容,这里修改编码为org.redisson.client.codec.StringCodec
        redissonConfig.setCodec(new org.redisson.client.codec.StringCodec());
        ClusterServersConfig clusterServersConfig = redissonConfig.useClusterServers();
        clusterServersConfig.setScanInterval(2000).addNodeAddress(addresses);
//                .addNodeAddress("redis://127.0.0.1:7000")
//                .addNodeAddress("redis://127.0.0.1:7001")
//                .addNodeAddress("redis://127.0.0.1:7002")
//                .addNodeAddress("redis://127.0.0.1:7003")
//                .addNodeAddress("redis://127.0.0.1:7004")
//                .addNodeAddress("redis://127.0.0.1:7005");
        //设置对于master节点的连接池中连接数最大为500
        clusterServersConfig.setMasterConnectionPoolSize(500);
        //设置密码
//        clusterServersConfig.setPassword("1qaz@WSX");
        //设置对于slave节点的连接池中连接数最大为500
        clusterServersConfig.setSlaveConnectionPoolSize(500);
        //如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,
        // 那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
        clusterServersConfig.setIdleConnectionTimeout(10000);
        //同任何节点建立连接时的等待超时。时间单位是毫秒。
        clusterServersConfig.setConnectTimeout(30000);
        //等待节点回复命令的时间。该时间从命令发送成功时开始计时。
        clusterServersConfig.setTimeout(3000);
        clusterServersConfig.setPingTimeout(30000);
        //当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。
        clusterServersConfig.setReconnectionTimeout(3000);
        //redis集群配置 end
        return Redisson.create(redissonConfig);
 
        //单redis连接 start
//        SingleServerConfig singleServerConfig = redissonConfig.useSingleServer();
//        singleServerConfig.setAddress("redis://127.0.0.1:6379");
//        singleServerConfig.setPassword("zsq2170");
//          设置redis几号数据库
//        singleServerConfig.setDatabase(0);
//        singleServerConfig.setClientName("jiuyue");
//        singleServerConfig.setConnectTimeout(10000);
//        singleServerConfig.setConnectionPoolSize(300);
//        return Redisson.create(redissonConfig);
        //单redis连接 end
    }
}

2.编写测试 生产者 ,正式生产应该在提交订单的时候生产

@RestController
@RequestMapping("/test")
@Api(tags = "生产者",description = "OrderProducerController")
public class OrderProducerController {
 
    @Autowired
    private RedissonClient redisson; //注入redis连接
 
    @PostMapping("/producer")
    @ApiOperation(value = "生产")
        public void Producer(){
        RBlockingDeque<Order> blockingDeque = redisson.getBlockingDeque("jiuyang_order_message"); //获取阻塞队列<名称和消费时获取名称一致>
        RDelayedQueue<Order> delayedQueue = redisson.getDelayedQueue(blockingDeque); //加入延迟队列
        for (long i = 100; i < 120; i++) {  //循环放20个
            try {
                Order order=new Order();
                order.setId(123L);
                //orderInfo.setCreateTime(DateHelper.date2String(new Date()));
                delayedQueue.offer(order,20, TimeUnit.SECONDS);  //将对象发送到redis延迟队列 ,20 单位秒 <这里单位可以选择的>
                System.out.println("===="+i);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

3.编写消费者消费 创建配置类,启动服务时便执行消费方法

@Component
public class ApplicationRunnerImpl implements ApplicationRunner {
 
    @Autowired
    private RefundOrderServiceImpl refundOrderService;
    
    //tomcat启动执行此方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        refundOrderService.refundOrder();
    }
}

4.多线程处理业务,先创建线程池

@Component
public class MyAsyncConfigurer implements AsyncConfigurer {
 
    private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);
 
    @Override
    @Bean
    public ThreadPoolTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数,默认为1
        threadPool.setCorePoolSize(4);
        // 当核心线程都在跑任务,还有多余的任务会存到此处。
        threadPool.setQueueCapacity(30);
        //最大线程数,默认为Integer.MAX_VALUE,如果queueCapacity存满了,还有任务就会启动更多的线程,直到线程数达到maxPoolSize。如果还有任务,则根据拒绝策略进行处理。
        threadPool.setMaxPoolSize(8);
        // 设置线程活跃时间(秒)
        threadPool.setKeepAliveSeconds(120);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;
    }
}

5.多线程消费延迟队列任务

@Service
@Transactional
public class RefundOrderServiceImpl implements RefundOrderService {
 
    private static final Logger logger = LoggerFactory.getLogger(RefundOrderServiceImpl.class);
 
    @Autowired
    private RefundOrderMapper refundOrderMapper;
 
    @Autowired
    private RedissonClient redissonClient;  //redis
 
    @Autowired
    private ThreadPoolTaskExecutor scheduledThreadPoolExecutor; //线程
 
    //读取阻塞队列,多线程执行
    public void refundOrder() {
        RBlockingDeque<Order> blockingDeque = redissonClient.getBlockingDeque("jiuyang_order_message");
        scheduledThreadPoolExecutor.execute(new SendingTask(blockingDeque));
    }
 
 
    private class SendingTask implements Runnable {
 
        RBlockingDeque<Order> blockingDeque = null;
 
        public SendingTask(RBlockingDeque<Order> blockingDeque) {
            this.blockingDeque = blockingDeque;
        }
 
        @Override
        public void run() {
            while (true) {
                try {
                    /**
                     * blockqueue,当拿到数据时,才往下执行.没有则等待
                     */
                    Order order = blockingDeque.take();
                    scheduledThreadPoolExecutor.execute(new BusiTask(order));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
 
 
    private class BusiTask implements Runnable {
 
        Order order = null;
 
        public BusiTask(Order order) {
            this.order = order;
        }
 
        @Override
        @Transactional
        public void run() {
            try {
                if (order != null) {
                    Long orderId = order.getId();
                    String orderSn = order.getOrderSn();
                    //查询订单状态
                    Order newOrder = refundOrderMapper.selectOrderById(orderId, orderSn);
                    //订单状态:0->待付款;1->待发货;2->已发货;3->已收货待评价;4->已关闭;5->无效订单;6->已评价;8->退款中
                    Integer status = newOrder.getStatus();
                    if (status == 0) {
                        refundOrderMapper.updateStatus(orderId, orderSn, 4); //关闭订单
                        logger.info("未支付订单已关闭");
                        System.err.println("未支付订单已关闭");
                        //退还商品库存  查询订单商品表退库存
                       List<OmsOrderItem> orderItemList= refundOrderMapper.selectQuantity(orderId,orderSn);
                       if(TargetUtils.listNotNull(orderItemList)){
                           for (OmsOrderItem orderItem:orderItemList){
                               String productId = orderItem.getProductId(); //商品id
                               Integer quantity = orderItem.getProductQuantity(); //购买数量
                               //修改库存,退款库存
                               refundOrderMapper.updateProductSku(productId,quantity);
                               logger.info("未支付订单购买商品库存已退还");
                               System.err.println("未支付订单购买商品库存已退还");
                           }
                       }
                       //退还优惠券
                        Long couponId = newOrder.getCouponId(); //获取优惠券id
                        if (null != couponId && couponId > 0) { //有优惠券
                            refundOrderMapper.updateCouponId(couponId); //退还优惠券
                            logger.info("优惠券已退还");
                            System.err.println("优惠券已退还");
                        }
                        //退还礼品卡钱
                        String cardSn = newOrder.getCardSn();//卡号
                        if(StringUtils.isNotEmpty(cardSn)){
                            BigDecimal cardAmount = newOrder.getCardAmount(); //消费金额
                            //退还礼品卡消费金额
                            refundOrderMapper.updateCardOrder(cardSn,cardAmount);
                            logger.info("礼品卡消费金额已退还");
                            System.err.println("礼品卡消费金额已退还");
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
 
}

相关推荐

Java面试题合集200道!

1.Java中操作字符串都有哪些类?它们之间有什么区别?String、StringBuffer、StringBuilder.String和StringBufer、StringBuilder的区别...

JAVA分布式锁的原理,及多种分布式实现优劣对比分析

引题比如在同一个节点上,两个线程并发的操作A的账户,都是取钱,如果不加锁,A的账户可能会出现负数,正确的方式是对账户acount进行加锁,即使用synchronized关键字,对其进行加锁后,当有线程...

百度Linux C++后台开发面试题(个人整理)

1、C/C++程序的内存分区其实C和C++的内存分区还是有一定区别的,但此处不作区分:1)、栈区(stack)—由编译器自动分配释放,存放函数的参数值,局部变量的值等。其操作方式类似于数据结构中...

什么是云计算?看这篇就够了(建议收藏)

一、什么是云?云,又称云端,指无数的大型机房或者大型数据中心。二、为什么需要云?1)从用户的角度来讲:传统应用的需求日益复杂,比如需要支持更多的用户,需要更强的计算能力等,为满足这些日益增长的需求,企...

写PHP框架需要具备那些知识?

如果没用过框架,讨论各个框架的内容都没有可讨论性,想自己写个框架涉及到的内容很多,个人觉得自己写一个框架对自己的逻辑思维,开发架构以及这门语言都有质的提升。可以参照其他框架的源代码,仅仅是看他们的思路...

不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理

我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到Blocking...

Java性能优化指南—缓存那些事

由于笔者自身水平有限,如果有不对或者任何建议欢迎批评和指正本文预计阅读时间10分钟,分为前言、填坑两部分,主要包含缓存的基本使用到高级应用场景的介绍一、前言在处理高并发请求时,缓存几乎是无往不利的利器...

卓象科技:Nosql的介绍以及和关系型数据库的区别

Nosql介绍NoSQL(NotOnlySQL),泛指非关系型数据库。Nosql的全称是NotOnlySql,这个概念很早就有人提出,在09年的时候比较火。Nosql指的是非关系型数...

腾讯一面凉经(一面竟然就问了2小时,什么情况?)

这次一面感觉是在打心理战,哥们自己的心里防线基本是被击溃,面到怀疑人生的程度,所以过程感觉不是太好,很多题哥们自己也感觉没答好,要么答得“缺胳膊少腿”,要么就是“画蛇添足”。先是聊项目,从项目的架构设...

我凭借这份pdf,最终拿到了阿里,腾讯,京东等八家大厂offer

怎样才能拿到大厂的offer,没有掌握绝对的技术,那么就要不断的学习我是如何笑对金九银十,拿到阿里,腾讯等八家大厂的offer的呢,今天分享我的秘密武器,美团大神整理的Java核心知识点,面试时面试官...

高并发 异步解耦利器:RocketMQ究竟强在哪里?

本文带大家从以下几个方面详细了解RocketMQ:RocketMQ如何保证消息存储的可靠性?RocketMQ如何保证消息队列服务的高可用?如何构建一个高可用的RocketMQ双主双从最小集群?Rock...

阿里最新Java架构师成长笔记开源

下面先给大家上一个总的目录大纲,基础的东西就不进行过多的赘述,我们将会从JVM说起,同时由于每篇的内容过多,我们也只说重点,太过基础的内容谁都会,我就不多敲字浪费大家的时间了!JVM多线程与高并发Sp...

程序员失业2个月找不到工作,狂刷了5遍这份pdf终获字节跳动offer

写在前面1月初失业,找了近2个多月的工作了,还没找到心仪的工作,感觉心好慌,不知道该怎么办了?找不到工作的时候压力很大,有人说自信会很受打击,还有人说会很绝望,是人生的低谷……尽管很多时候我们自己知道...

Spring AI 模块架构与功能解析

SpringAI是Spring生态系统中的一个新兴模块,专注于简化人工智能和机器学习技术在Spring应用程序中的集成。本文将详细介绍SpringAI的核心组件、功能模块及其之间的关...

Nginx从入门到精通,超详细整理,含项目实战案例|运维必学

Nginx是免费的、开源的、高性能的HTTP和反向代理服务器、邮件代理服务器、以及TCP/UDP代理服务器。因为它的稳定性、丰富的模块库、灵活的配置和低系统资源的消耗而闻名。Nginx可以做静态HT...

取消回复欢迎 发表评论: