如何使用redis实现多种类型的消息队列?
mhr18 2024-11-21 17:53 20 浏览 0 评论
场景说明:
- 异步处理,可用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时。
- 高并发场景进行削峰,例如当某个时刻请求瞬间增加时,可以把请求写入到队列,后台再去慢慢处理这些请求。
- 微服务架构中多模块之间解耦,例如订单模块下单之后,可以直接调用消息模块,仓库模块,但是当后续增多一个模块时,那么需要订单模块修改调用代码。可以将订单消息写入队列中,再由其他模块主动去队列获取订单消息然后进行相应处理。
以上场景适合使用消息队列来解决,消息队列的作用简单来说有三种作用,分别为异步,削峰,解耦。本遍文章主要讲解用redis实现多种类型的消息队列。
命令:
rpush + blpop 或 lpush + brpop
rpush : 往列表右侧推入数据
blpop : 客户端阻塞直到队列有值输出
简单队列:
simple.php
//simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
$redis->rPush('goods:task', json_encode($row));
}
$redis->close();
获取20000万个商品,并把json化后的数据推入goods:task队列
queueBlpop.php
// 出队
while (true) {
// 阻塞设置超时时间为3秒
$task = $redis->blPop(array('goods:task'), 3);
if ($task) {
$redis->rPush('goods:success:task', $task[1]);
$task = json_decode($task[1], true);
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
echo PHP_EOL;
} else {
echo 'nothing' . PHP_EOL;
sleep(5);
}
}
设置blpop阻塞时间为3秒,当有数据出队时保存到goods:success:task表示执行成功,当队列没有数据时,程序睡眠10秒重新检查goods:task是否有数据出队
cli 模式执行命令:
php simple.php
php queueBlpop.php
优先级队列
思路:
blpop 有多个键时,blpop会从左至右遍历键,一旦一个键能弹出元素,客户端立即返回。例如:
blpop key1 key2 key3 key4
从key1到key4遍历,如果哪个key有值,则弹出这个值,若多个key同时有值时,优先弹出排在左边的key。
priority.php
// 设置优先级队列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
// cid 小于100放在低级队列
if ($row['cid'] < 100) {
$redis->rPush($low, json_encode($row));
}
// cid 100到600之间放在中级队列
elseif ($row['cid'] > 100 && $row['cid'] < 600) {
$redis->rPush($mid, json_encode($row));
}
// cid 大于600放在高级队列
else {
$redis->rPush($high, json_encode($row));
}
}
$redis->close();
priorityBlop.php
// 优先级队列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
// 出队
while(true){
// 优先级高的队列放在左侧
$task = $redis->blPop(array($high, $mid, $low), 3);
if ($task) {
$task = json_decode($task[1], true);
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
echo PHP_EOL;
} else {
echo 'nothing' . PHP_EOL;
sleep(5);
}
}
优先级高的队列放在blpop命令左侧,依次排序,blpop命令会依次弹出high, mid, low队列的值
cli 模式执行命令:
php priority.php
php priorityBlpop.php
延迟队列
思路:
可以用一个有序集合来保存延迟任务,member保存任务内容,score保存(当前时间 + 延时时间)。用时间作为score。程序只要用有序集合的第一条任务的score和当前时间做比较,如果当前时间比score小,说明有序集合的所有任务还没到执行时间。
delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
$redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));
}
将20万条任务导入有序集合goods:delay:task,所有任务延迟到之后的1秒到300秒内执行
delayHandle.php
while (true) {
// 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间
// 相对说明集合的其他任务未到执行时间
$rs = $redis->zRange('goods:delay:task', 0, 0, true);
// 集合没有任务,睡眠时间设置为5秒
if (empty($rs)) {
echo 'no tasks , sleep 5 seconds' . PHP_EOL;
sleep(5);
continue;
}
$taskJson = key($rs);
$delay = $rs[$taskJson];
$task = json_decode($taskJson, true);
$now = time();
// 到时间执行延时任务
if ($delay <= $now) {
// 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改
if (!($identifier = acquireLock($task['id']))) {
continue;
}
// 移动延时任务到任务队列
$redis->zRem('goods:delay:task', $taskJson);
$redis->rPush('goods:task', $taskJson);
echo $task['id'] . ' run ' . PHP_EOL;
// 释放锁
releaseLock($task['id'], $identifier);
} else {
// 延时任务未到执行时间
$sleep = $delay - $now;
// 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理
// $sleep = $sleep > 2 ? 2 :$sleep;
echo 'wait ' . $sleep . ' seconds ' . PHP_EOL;
sleep($sleep);
}
}
这个文件对有序集合内的延迟任务做处理,如果延迟任务到了执行时间,则把延迟任务移动到任务队列中
queueBlpop.php
// 出队
while (true) {
// 阻塞设置超时时间为3秒
$task = $redis->blPop(array('goods:task'), 3);
if ($task) {
$redis->rPush('goods:success:task', $task[1]);
$task = json_decode($task[1], true);
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
echo PHP_EOL;
} else {
echo 'nothing' . PHP_EOL;
sleep(5);
}
}
处理任务队列中的任务
cli模式下执行命令:
php delay.php
php delayHanlde.php
php queueBlpop.php
总结
以上分别讲解了简单队列,优先级队列,延迟队列的使用的思路。redis作为队列使用非常的简单,但是存在数据丢失的情况,如果你的系统不允许数据丢失,可以使用专门的消息中间件,如rabbitmq,kafka,rocketmq,nsq等等。关于队列的使用我们还需要考虑消息的可靠性,有序性,重复消费等等问题。今天就先到这里了,如果喜欢我的文章,可以关注我,我将继续为大家带来优质的文章。
相关推荐
- 【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库
-
如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...
- Pure Storage推出统一数据管理云平台及新闪存阵列
-
PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...
- 对Java学习的10条建议(对java课程的建议)
-
不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...
- SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!
-
官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...
- JDK21有没有什么稳定、简单又强势的特性?
-
佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...
- 「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了
-
在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...
- Java面试题及答案最全总结(2025版)
-
大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...
- 数据库日常运维工作内容(数据库日常运维 工作内容)
-
#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...
- 分布式之系统底层原理(上)(底层分布式技术)
-
作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...
- oracle 死锁了怎么办?kill 进程 直接上干货
-
1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...
- SpringBoot 各种分页查询方式详解(全网最全)
-
一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...
- 《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略
-
《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...
- LoadRunner(loadrunner录制不到脚本)
-
一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...
- Redis数据类型介绍(redis 数据类型)
-
介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...
- RMAN备份监控及优化总结(rman备份原理)
-
今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)