百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis实现延迟队列

mhr18 2024-11-27 12:08 28 浏览 0 评论

什么是延迟队列

所谓的延迟队列就是,生产者的消息推送到队列中,消费者不会马上消费,而是到了设置的指定的时间才消费。可以采用Redis的zset来实现。将消息序列化成一个字符串作为zset的value。这个消息的到期处理时间作为score,然后用一个线程去轮询zset到期的任务处理,建议使用多线程,为了保障任务消费的可用性。不过多线程就要考虑并发抢任务。

废话不多说直接上代码。

代码逻辑

将任务放到队列中,设置延迟时间

/**
 * 任务放入队列中
 * @param msg    任务信息
 * @param afterTime   延迟时间
 */
public void delay(T msg, long afterTime) {
    TaskItem<T> task = new TaskItem<>();
    //随机生成的任务id
    task.id = UUID.randomUUID().toString();
    task.msg = msg;

    //这里将TaskItem转换成JSON格式
    String taskStr = JSON.toJSONString(task);
    Date now = new Date();
    log.info(">>>>>>>>>>开始将任务放入到zset中,当前时间:{}", now);
    //采用zset存入
    redisTemplate.opsForZSet().add(queueKey, taskStr, now.getTime() + afterTime);
}

消费代码

public void loop() {
    while (!Thread.interrupted()) {
        Date now = new Date();
        //从zset中获取score为当前时间的数据
        Set<String> values = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now.getTime(), 0, 1);
        //如果队列为空,sleep一下再重新循环,sleep是为了缓解redis压力
        if (values.isEmpty()) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        //不为空,获取值
        String next = values.iterator().next();
        //将redis中的值删掉,相当于被消费掉
        Long remove = redisTemplate.opsForZSet().remove(queueKey, next);
        if ((remove > 0)) {
            //Json 转 Object
            TaskItem<T> taskItem = JSON.parseObject(next, taskType);
            //将msg信息作处理
            this.handleMsg(taskItem.msg, now);
        }
    }
}

private void handleMsg(T msg, Date now) {
    log.info("延迟消费到数据:[{}],当前时间:[{}]", msg, now);
    System.out.println(String.format("延迟消费到数据:[%s]", msg));
}

测试

//延迟队列测试
@Test
public void testDelayQueue() {
    String queueKey = "redis_delay_queue";
    delayingQueue.setQueueKey(queueKey);
    Thread producer = new Thread(() -> delayingQueue.delay("I am Lvshe", 10000));

    Thread consumer = new Thread(() -> delayingQueue.loop());

    producer.start();
    consumer.start();

    try {
        producer.join();

        Thread.sleep(20000);
        //线程中断标记,跳出loop()方法循环
        consumer.interrupt();
        consumer.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

如上代码,生产了一个数据 “I am Lvshen”,要求10s 后消费

结果如下图:

源码见原文。

相关推荐

Spring Boot 分布式事务实现简单得超乎想象

环境:SpringBoot2.7.18+Atomikos4.x+MySQL5.71.简介关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。本篇文章将基于SpringBoot...

Qt编写可视化大屏电子看板系统15-曲线面积图

##一、前言曲线面积图其实就是在曲线图上增加了颜色填充,单纯的曲线可能就只有线条以及数据点,面积图则需要从坐标轴的左下角和右下角联合曲线形成完整的封闭区域路径,然后对这个路径进行颜色填充,为了更美观...

Doris大数据AI可视化管理工具SelectDB Studio重磅发布!

一、初识SelectDBStudioSelectDBStudio是专为ApacheDoris湖仓一体典型场景实战及其兼容数据库量身打造的GUI工具,简化数据开发与管理。二、Select...

RAD Studio 、Delphi或C++Builder设计代码编译上线缩短开发时间

#春日生活打卡季#本月,Embarcadero宣布RADStudio12.3Athens以及Delphi12.3和C++Builder12.3,提供下载。RADStudio12.3A...

Mybatis Plus框架学习指南-第三节内容

自动填充字段基本概念MyBatis-Plus提供了一个便捷的自动填充功能,用于在插入或更新数据时自动填充某些字段,如创建时间、更新时间等。原理自动填充功能通过实现com.baomidou.myba...

「数据库」Sysbench 数据库压力测试工具

sysbench是一个开源的、模块化的、跨平台的多线程性能测试工具,可以用来进行CPU、内存、磁盘I/O、线程、数据库的性能测试。目前支持的数据库有MySQL、Oracle和PostgreSQL。以...

如何选择适合公司的ERP(选erp系统的经验之谈)

很多中小公司想搞ERP,但不得要领。上ERP的目的都是歪的,如提高效率,减少人员,堵住财务漏洞等等。真正用ERP的目的是借机提升企业管理能力,找出管理上的问题并解决,使企业管理更规范以及标准化。上ER...

Manus放开注册,但Flowith才是Agent领域真正的yyds

大家好,我是运营黑客。前天,AIAgent领域的当红炸子鸡—Manus宣布全面放开注册,终于,不需要邀请码就能体验了。于是,赶紧找了个小号去确认一下。然后,额……就被墙在了外面。官方解释:中文版...

歌浓酒庄总酿酒师:我们有最好的葡萄园和最棒的酿酒师

中新网1月23日电1月18日,张裕董事长周洪江及总经理孙健一行在澳大利亚阿德莱德,完成了歌浓酒庄股权交割签约仪式,这也意味着张裕全球布局基本成型。歌浓:澳大利亚年度最佳酒庄据悉,此次张裕收购的...

软件测试进阶之自动化测试——python+appium实例

扼要:1、了解python+appium进行APP的自动化测试实例;2、能根据实例进行实训操作;本课程主要讲述用python+appium对APP进行UI自动化测试的例子。appium支持Androi...

为什么说Python是最伟大的语言?看图就知道了

来源:麦叔编程作者:麦叔测试一下你的分析能力,直接上图,自己判断一下为什么Python是最好的语言?1.有图有真相Java之父-JamesGoshlingC++之父-BjarneStrou...

如何在Eclipse中配置Python开发环境?

Eclipse是著名的跨平台集成开发环境(IDE),最初主要用来Java语言开发。但是我们通过安装不同的插件Eclipse可以支持不同的计算机语言。比如说,我们可以通过安装PyDev插件,使Eclip...

联合国岗位上新啦(联合国的岗位)

联合国人权事务高级专员办事处PostingTitleIntern-HumanRightsDutyStationBANGKOKDeadlineOct7,2025CategoryandL...

一周安全漫谈丨工信部:拟定超1亿条一般数据泄露属后果严重情节

工信部:拟定超1亿条一般数据泄露属后果严重情节11月23日,工信部官网公布《工业和信息化领域数据安全行政处罚裁量指引(试行)(征求意见稿)》。《裁量指引》征求意见稿明确了行政处罚由违法行为发生地管辖、...

oracle列转行以及C#执行语句时报错问题

oracle列转行的关键字:UNPIVOT,经常查到的怎么样转一列,多列怎么转呢,直接上代码(sshwomeyourcode):SELECTsee_no,diag_no,diag_code,...

取消回复欢迎 发表评论: