百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

基于Redis实现简单的延时消息队列

mhr18 2024-11-27 12:09 16 浏览 0 评论


说到消息队列相信作为开发人员的大家都不陌生,在实际的工作中我们可能在很多场景下都会用到消息队列,消息队列不仅仅是用于收发消息,而且也可以用于解耦我们的应用系统设计,在大型的应用系统或者分布式应用系统中,我们必然会用到消息队列。

总结下,消息队列的应用场景一般有以下几种场景:

  1. 异步处理任务;
  2. 应用系统解耦;
  3. 大流量削峰;
  4. 日志处理系统;
  5. 消息通讯;


目前主流的消息队列框架有:

  • Apache的ActiveMQ;
  • Erlang语言实现的RabbitMQ;
  • Apache的RocketMQ;
  • Apache的Kafka;

这几种主流的消息队列框架各有各的优势,也有略微的不同。



当然,消息队列也有一些特殊的使用场景,比如:一些电商系统中,当用户下单后,需要在规定的时间内对订单发起支付,如果在规定的时间内没有支付订单,那么该订单将自动取消。这个问题的常规解决方案有两个:

  1. 使用定时任务定时去扫描表,修改过期订单的状态。这种方案当然存在许多局限性,当订单数量不多,而且对系统性能要求不高的情况下可以考虑使用。这种方案也不够优雅;
  2. 使用基于消息队列的延时消息队列,这种方案可以对整个消息队列设置一个消息过期时间,也可以给每一个消息设置一个过期时间,这个时候消息的过期时间取决于设置的最小时间;

延时消息队列我们可以采用上面所说的消息队列框架去实现,也可以采用比较简单的基于Redis的方式去实现,众所周知Redis并不是一个消息队列框架,但是Redis在某些应用场景下可以采用其高级特性为我们提供消息队列的特性。


Rdis在常规的应用场景下,我们使用它作为高速缓存框架,搜索百度百科我们可以发现,对Redis的定义如下:

Redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis的最基本用法是我们使用它的Key-Value特性,存储我们的热点数据或者高频访问数据,来达到提高整个应用系统的吞吐量。

就像上面对Redis的定义,zset是对有序集合的操作,我们可以利用这一特性来实现我们的延时消息队列功能。大致实现的原理如下:

Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它便会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。



有了实现原理,下面我们通过一个简单的例子来演示下具体的操作过程,可能对基于Redis实现延时消息队列这一主题有更好的理解,演示的源代码链接在文章末尾附上。

新建工程:delay-message-queue-redis

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.delay.message.queue</groupId>
        <artifactId>delay-message-queue</artifactId>
        <version>0.0.1</version>
    </parent>
    <groupId>com.delay.message.queue</groupId>
    <artifactId>delay-message-queue-redis</artifactId>
    <version>0.0.1</version>
    <name>delay-message-queue-redis</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

操作Redis我们使用RedisTemplate,简单的配置如下:

@Configuration
public class RedisConfig {
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("10.0.0.50", 6379));
    }

    @Bean
    public RedisTemplate redisTemplate() {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        return redisTemplate;
    }
}

定义我们的消息生产者:

@Slf4j
@Service
public class ProducerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void produce(String orderId, long expiredTime) {
        redisTemplate.opsForZSet().add(QUEUE_NAME, orderId, expiredTime);
        //log.info("order id:{} set success", orderId);
        long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
        //log.info("[produce]{} length:{}", QUEUE_NAME, length);
    }
}

消息生产者主要完成的功能:以订单的过期时间为元素(value)的得分,将数据添加到队列中去;

定义我们的消息消费者:

@Slf4j
@Service
public class ConsumerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void consume() {
        while (true) {
            Set<String> set = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (set == null || set.isEmpty()) {
                try {
                    //log.info("no data will sleep");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("InterruptedException", e);
                }
                continue;
            }
            String orderId = set.iterator().next();
            if (redisTemplate.opsForZSet().remove(QUEUE_NAME, orderId) > 0) {
                log.info("order id:{} handle success", orderId);
                long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
                log.info("[consume]{} length:{}", QUEUE_NAME, length);
            }
        }
    }
}

消息消费者的主要功能如下:

  • 循环从Redis的Zset中拿取一个0<=score<=当前时间的元素(value);
  • 如果没有取到值,则整个线程休眠一秒钟;
  • 如果取到值,则从该队列(key)删除取到的元素(value),删除的目的是防止一个数据被重复的消费;然后对取到的值进行后续的数据处理,这里是将数据打印出来。

整个生产者和消费者我们已经实现了,下面我们通过Junit来生产消息和消费消息;

生产消息的过程:

@Slf4j
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerServiceTest {
    @Autowired
    private ProducerService producerService;

    @Test
    public void produce() {
        Random random = new Random(1);
        for (int i = 0; i < 10; i++) {
            Calendar calendar = Calendar.getInstance();
            // 产生一个随机数
            int time = random.nextInt(100);
            calendar.add(Calendar.SECOND, time);
            String orderId = "order-id-" + i;
            long expired = calendar.getTimeInMillis();
            log.info("order id:{}, expired:{}", orderId, time);
            producerService.produce(orderId, expired);
            try {
                //log.info("no data will sleep");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
            }
        }
    }
}

为了方便我们观察,我们采用在当前时间的基础上加上一个随机的时间作为订单的过期时间,每产生一个订单线程休眠0.5秒,总共产生10个订单。

消息的消费过程:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ConsumerServiceTest {
    @Autowired
    private ConsumerService consumerService;

    @Test
    public void consume() {
        consumerService.consume();
    }
}

该过程比较简单,它主要是启动我们的循环函数从Redis中取数据。

先启动我们的消息消费过程ConsumerServiceTest,然后再启动我们的消息生产过程ProducerServiceTest,观察日志的打印输出。

消息生产过程:


消息消费过程:


仔细对比我们可以很容易发现:

  1. order-id-5的过期时间最短,它也是最先被消费掉的,其次是order-id-7;
  2. 当生产消息的过程完成后共产生了10个消息,消息消费过程中,每消费一个消息,又没有产生新的消息的时候,整个消息队列的长度在变小;


通过上面的演示,我们实现了基于Redis实现简单的延时消息队列,最主要我们使用了Redis的Zset特性来完成延时消息队列的功能。

上面的演示在高并发场景下,可能会存在问题:

  • 高并发场景下对Redis的操作不够原子性;
  • 不适合分布式应用场景下使用;
  • 适合单体应用中延时消息队列的使用;

基于上面所提到的问题,最好的解决方案是采用消息队列框架去实现,例如:RabbitMQ给延时队列统一设置消息过期时间,过期的消息将被路由到另一个队列中;也可以给每一个消息设置一个过期时间,过期的消息同样路由到另一个队列中。


源代码GitHub地址:

https://github.com/bq-xiao/delay-message-queue


不积跬步,无以至千里;不积小流,无以成江海!

相关推荐

使用 Docker 部署 Java 项目(通俗易懂)

前言:搜索镜像的网站(推荐):DockerDocs1、下载与配置Docker1.1docker下载(这里使用的是Ubuntu,Centos命令可能有不同)以下命令,默认不是root用户操作,...

Spring Boot 3.3.5 + CRaC:从冷启动到秒级响应的架构实践与踩坑实录

去年,我们团队负责的电商订单系统因扩容需求需在10分钟内启动200个Pod实例。当运维组按下扩容按钮时,传统SpringBoot应用的冷启动耗时(平均8.7秒)直接导致流量洪峰期出现30%的请求超时...

《github精选系列》——SpringBoot 全家桶

1简单总结1SpringBoot全家桶简介2项目简介3子项目列表4环境5运行6后续计划7问题反馈gitee地址:https://gitee.com/yidao620/springbo...

Nacos简介—1.Nacos使用简介

大纲1.Nacos的在服务注册中心+配置中心中的应用2.Nacos2.x最新版本下载与目录结构3.Nacos2.x的数据库存储与日志存储4.Nacos2.x服务端的startup.sh启动脚...

spring-ai ollama小试牛刀

序本文主要展示下spring-aiollama的使用示例pom.xml<dependency><groupId>org.springframework.ai<...

SpringCloud系列——10Spring Cloud Gateway网关

学习目标Gateway是什么?它有什么作用?Gateway中的断言使用Gateway中的过滤器使用Gateway中的路由使用第1章网关1.1网关的概念简单来说,网关就是一个网络连接到另外一个网络的...

Spring Boot 自动装配原理剖析

前言在这瞬息万变的技术领域,比了解技术的使用方法更重要的是了解其原理及应用背景。以往我们使用SpringMVC来构建一个项目需要很多基础操作:添加很多jar,配置web.xml,配置Spr...

疯了!Spring 再官宣惊天大漏洞

Spring官宣高危漏洞大家好,我是栈长。前几天爆出来的Spring漏洞,刚修复完又来?今天愚人节来了,这是和大家开玩笑吗?不是的,我也是猝不及防!这个玩笑也开的太大了!!你之前看到的这个漏洞已...

「架构师必备」基于SpringCloud的SaaS型微服务脚手架

简介基于SpringCloud(Hoxton.SR1)+SpringBoot(2.2.4.RELEASE)的SaaS型微服务脚手架,具备用户管理、资源权限管理、网关统一鉴权、Xss防跨站攻击、...

SpringCloud分布式框架&amp;分布式事务&amp;分布式锁

总结本文承接上一篇SpringCloud分布式框架实践之后,进一步实践分布式事务与分布式锁,其中分布式事务主要是基于Seata的AT模式进行强一致性,基于RocketMQ事务消息进行最终一致性,分布式...

SpringBoot全家桶:23篇博客加23个可运行项目让你对它了如指掌

SpringBoot现在已经成为Java开发领域的一颗璀璨明珠,它本身是包容万象的,可以跟各种技术集成。本项目对目前Web开发中常用的各个技术,通过和SpringBoot的集成,并且对各种技术通...

开发好物推荐12之分布式锁redisson-sb

前言springboot开发现在基本都是分布式环境,分布式环境下分布式锁的使用必不可少,主流分布式锁主要包括数据库锁,redis锁,还有zookepper实现的分布式锁,其中最实用的还是Redis分...

拥抱Kubernetes,再见了Spring Cloud

相信很多开发者在熟悉微服务工作后,才发现:以为用SpringCloud已经成功打造了微服务架构帝国,殊不知引入了k8s后,却和CloudNative的生态发展脱轨。从2013年的...

Zabbix/J监控框架和Spring框架的整合方法

Zabbix/J是一个Java版本的系统监控框架,它可以完美地兼容于Zabbix监控系统,使得开发、运维等技术人员能够对整个业务系统的基础设施、应用软件/中间件和业务逻辑进行全方位的分层监控。Spri...

SpringBoot+JWT+Shiro+Mybatis实现Restful快速开发后端脚手架

作者:lywJee来源:cnblogs.com/lywJ/p/11252064.html一、背景前后端分离已经成为互联网项目开发标准,它会为以后的大型分布式架构打下基础。SpringBoot使编码配置...

取消回复欢迎 发表评论: