百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis 使用 List 实现消息队列的利与弊

mhr18 2024-12-01 09:06 18 浏览 0 评论

什么是消息队列

消息队列是一种异步的服务间通信方式,适用于分布式和微服务架构。消息在被处理和删除之前一直存储在队列上。

每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

消息队列

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

?

消息队列的使用场景有哪些呢?

消息队列在实际应用中包括如下四个场景:

  • 应用耦合:发送方、接收方系统之间不需要了解双方,只需要认识消息。多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
  • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

消息队列满足哪些特性

消息有序性

消息是异步处理的,但是消费者需要按照生产者发送消息的顺序来消费,避免出现后发送的消息被先处理的情况。

重复消息处理

生产者可能因为网络问题出现消息重传导致消费者可能会收到多条重复消息。

同样的消息重复多次的话可能会造成一业务逻辑多次执行,需要确保如何避免重复消费问题。

可靠性

一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。

当消费者重启后,可以继续读取消息进行处理,防止消息遗漏。

List 实现消息队列

Redis 的列表(List)是一种线性的有序结构,可以按照元素被推入列表中的顺序来存储元素,能满足「先进先出」的需求,这些元素既可以是文字数据,又可以是二进制数据。

LPUSH

生产者使用 LPUSH key element[element...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。

如下,生产者向队列 queue 先后插入了 「Java」「码哥字节」「Go」,返回值表示消息插入队列后的个数。

> LPUSH queue Java 码哥字节 Go
(integer) 3

RPOP

消费者使用 RPOP key 依次读取队列的消息,先进先出,所以 「Java」会先读取消费:

> RPOP queue
"Java"
> RPOP queue
"码哥字节"
> RPOP queue
"Go"

List队列

实时消费问题

?

65 哥:这么简单就实现了么?

别高兴的太早,LPUSH、RPOP 存在一个性能风险,生产者向队列插入数据的时候,List 并不会主动通知消费者及时消费。

我们需要写一个 while(true) 不停地调用 RPOP 指令,当有新消息就会返回消息,否则返回空。

程序需要不断轮询并判断是否为空再执行消费逻辑,这就会导致即使没有新消息写入到队列,消费者也要不停地调用 RPOP 命令占用 CPU 资源。

?

65 哥:要如何避免循环调用导致的 CPU 性能损耗呢?

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令,消费者在在读取队列没有数据的时候自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。

BRPOP queue 0

参数 0 表示阻塞等待时间无无限制

重复消费

  • 消息队列为每一条消息生成一个「全局 ID」;
  • 生产者为每一条消息创建一条「全局 ID」,消费者把一件处理过的消息 ID 记录下来判断是否重复。

其实这就是幂等,对于同一条消息,消费者收到后处理一次的结果和多次的结果是一致的。

消息可靠性

?

65 哥:消费者从 List 中读取一条在消息处理过程中宕机了就会导致消息没有处理完成,可是数据已经没有保存在 List 中了咋办?

本质就是消费者在处理消息的时候崩溃了,就无法再还原消息,缺乏一个消息确认机制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令,含义是从 List 从读取消息的同时把这条消息复制到另一个 List 中(备份),并且是原子操作。

我们就可以在业务流程正确处理完成后再删除队列消息实现消息确认机制。如果在处理消息的时候宕机了,重启后再从备份 List 中读取消息处理。

LPUSH redisMQ 公众号 码哥字节
BRPOPLPUSH redisMQ redisMQBack

生产者用 LPUSH 把消息插入到 redisMQ 队列中,消费者使用 BRPOPLPUSH 读取消息「公众号」,同时该消息会被插入到 「redisMQBack」队列中。

如果消费成功则把「redisMQBack」的消息删除即可,异常的话可以继续从 「redisMQBack」再次读取消息处理。

redis消息确认机制

需要注意的是,如果生产者消息发送的很快,而消费者处理速度慢就会导致消息堆积,给 Redis 的内存带来过大压力。

Redission 实战

在 Java 中,我们可以利用 Redission 封装的 API 来快速实现队列,接下来码哥基于 SpringBoot 2.1.4 版本来交大家如何整合并实战。

详细 API 文档大家可查阅:https://github.com/redisson/redisson/wiki/7.-Distributed-collections

添加依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>

添加 Redis 配置,码哥的 Redis 没有配置密码,大家根据实际情况配置即可。

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false

Java 代码实战

RBlockingDeque 继承 java.util.concurrent.BlockingDeque ,在使用过程中我们完全可以根据接口文档来选择合适的 API 去实现业务逻辑。

主要方法如下

码哥采用了双端队列来举例

@Slf4j
@Service
public class QueueService {

    @Autowired
    private RedissonClient redissonClient;

    private static final String REDIS_MQ = "redisMQ";

    /**
     * 发送消息到队列头部
     *
     * @param message
     */
    public void sendMessage(String message) {
        RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);

        try {
            blockingDeque.putFirst(message);
            log.info("将消息: {} 插入到队列。", message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 从队列尾部阻塞读取消息,若没有消息,线程就会阻塞等待新消息插入,防止 CPU 空转
     */
    public void onMessage() {
        RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);
        while (true) {
            try {
                String message = blockingDeque.takeLast();
                log.info("从队列 {} 中读取到消息:{}.", REDIS_MQ, message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {

    @Autowired
    private QueueService queueService;

    @Test
    public void testQueue() throws InterruptedException {
        new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                queueService.sendMessage("消息" + i);
            }
        }).start();

        new Thread(() -> queueService.onMessage()).start();

        Thread.currentThread().join();
    }


}

总结

可以使用 List 数据结构来实现消息队列,满足先进先出。为了实现消息可靠性,Redis 提供了 BRPOPLPUSH 命令是解决。

Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。

而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存。

在消息量不大的情况下使用 Redis 作为消息队列,他能给我们带来高性能的消息读写,这似乎也是一个很好消息队列解决方案。

大家觉得是否合适作为消息队列呢?点赞让我看看吧


来源:https://mp.weixin.qq.com/s/Pv7IXUQQnhGCGMM9n0VfkA

相关推荐

Spring Boot 分布式事务实现简单得超乎想象

环境:SpringBoot2.7.18+Atomikos4.x+MySQL5.71.简介关于什么是分布式事务,本文不做介绍。有需要了解的自行查找相关的资料。本篇文章将基于SpringBoot...

Qt编写可视化大屏电子看板系统15-曲线面积图

##一、前言曲线面积图其实就是在曲线图上增加了颜色填充,单纯的曲线可能就只有线条以及数据点,面积图则需要从坐标轴的左下角和右下角联合曲线形成完整的封闭区域路径,然后对这个路径进行颜色填充,为了更美观...

Doris大数据AI可视化管理工具SelectDB Studio重磅发布!

一、初识SelectDBStudioSelectDBStudio是专为ApacheDoris湖仓一体典型场景实战及其兼容数据库量身打造的GUI工具,简化数据开发与管理。二、Select...

RAD Studio 、Delphi或C++Builder设计代码编译上线缩短开发时间

#春日生活打卡季#本月,Embarcadero宣布RADStudio12.3Athens以及Delphi12.3和C++Builder12.3,提供下载。RADStudio12.3A...

Mybatis Plus框架学习指南-第三节内容

自动填充字段基本概念MyBatis-Plus提供了一个便捷的自动填充功能,用于在插入或更新数据时自动填充某些字段,如创建时间、更新时间等。原理自动填充功能通过实现com.baomidou.myba...

「数据库」Sysbench 数据库压力测试工具

sysbench是一个开源的、模块化的、跨平台的多线程性能测试工具,可以用来进行CPU、内存、磁盘I/O、线程、数据库的性能测试。目前支持的数据库有MySQL、Oracle和PostgreSQL。以...

如何选择适合公司的ERP(选erp系统的经验之谈)

很多中小公司想搞ERP,但不得要领。上ERP的目的都是歪的,如提高效率,减少人员,堵住财务漏洞等等。真正用ERP的目的是借机提升企业管理能力,找出管理上的问题并解决,使企业管理更规范以及标准化。上ER...

Manus放开注册,但Flowith才是Agent领域真正的yyds

大家好,我是运营黑客。前天,AIAgent领域的当红炸子鸡—Manus宣布全面放开注册,终于,不需要邀请码就能体验了。于是,赶紧找了个小号去确认一下。然后,额……就被墙在了外面。官方解释:中文版...

歌浓酒庄总酿酒师:我们有最好的葡萄园和最棒的酿酒师

中新网1月23日电1月18日,张裕董事长周洪江及总经理孙健一行在澳大利亚阿德莱德,完成了歌浓酒庄股权交割签约仪式,这也意味着张裕全球布局基本成型。歌浓:澳大利亚年度最佳酒庄据悉,此次张裕收购的...

软件测试进阶之自动化测试——python+appium实例

扼要:1、了解python+appium进行APP的自动化测试实例;2、能根据实例进行实训操作;本课程主要讲述用python+appium对APP进行UI自动化测试的例子。appium支持Androi...

为什么说Python是最伟大的语言?看图就知道了

来源:麦叔编程作者:麦叔测试一下你的分析能力,直接上图,自己判断一下为什么Python是最好的语言?1.有图有真相Java之父-JamesGoshlingC++之父-BjarneStrou...

如何在Eclipse中配置Python开发环境?

Eclipse是著名的跨平台集成开发环境(IDE),最初主要用来Java语言开发。但是我们通过安装不同的插件Eclipse可以支持不同的计算机语言。比如说,我们可以通过安装PyDev插件,使Eclip...

联合国岗位上新啦(联合国的岗位)

联合国人权事务高级专员办事处PostingTitleIntern-HumanRightsDutyStationBANGKOKDeadlineOct7,2025CategoryandL...

一周安全漫谈丨工信部:拟定超1亿条一般数据泄露属后果严重情节

工信部:拟定超1亿条一般数据泄露属后果严重情节11月23日,工信部官网公布《工业和信息化领域数据安全行政处罚裁量指引(试行)(征求意见稿)》。《裁量指引》征求意见稿明确了行政处罚由违法行为发生地管辖、...

oracle列转行以及C#执行语句时报错问题

oracle列转行的关键字:UNPIVOT,经常查到的怎么样转一列,多列怎么转呢,直接上代码(sshwomeyourcode):SELECTsee_no,diag_no,diag_code,...

取消回复欢迎 发表评论: