百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

用redis做异步队列,原来还可以这样

mhr18 2024-10-21 05:47 25 浏览 0 评论

Redis设计主要是用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。

它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;


另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可;


Redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。(List : lpush / rpop)


方式一:生产者与消费者模式
使用list结构作为队列,rpush生产消息,lpop消费消息,当lpop没有消息的时候,要适当sleep一会再重试。


或者,不用sleep,直接用blpop指令,在没有消息的时候,它会阻塞住直到消息到来。


具体文章可查看:PHP与redis队列实现电商订单自动确认收货


方式二:发布订阅者模式
使用pub/sub主题订阅者模式,可以实现1:N的消息队列。


基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。


消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)


消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。


redis做异步队列的缺点

在消费者下线的情况下,生产的消息会丢失。此场景,建议用MQ。

使用MQ可查看:在分布式系统上,你是如何处理数据的一致性的


下面用代码实际实现,用发布订阅者模式,给大家讲解讲解

消息订阅者subscribe.php

<?php
ini_set('default_socket_timeout', -1);  //php配置设置不超时
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
//$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);  //redis方式设置不超时,推荐

$redis->subscribe(['chan'],'callback');     //callback为回调函数名称
//$redis->subscribe(['chan'],array(new TestCall(),'callback') ); //如果回调函数是类中的方法名,这样写

// 回调函数,这里写处理逻辑
function callback($instance, $channelName, $message)
{
         echo $channelName, "==>", $message, PHP_EOL;

         //$instance,即为上面创建的redis实例对象,在回调函数中,默认的这个参数就是,因此不需专门传参。 这里除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE这4条命令之外其它命令都不能使用
         //如果要使用redis中的其他命令,这样实现
         $newredis = new Redis();
        $newredis->connect("127.0.0.1", 6379);
        echo $newredis->get('test') . PHP_EOL;
        $newredis->close();

          //可以根据$channelName, $message,处理不同的业务逻辑
          switch($chan) {
               case 'chan-1':
                  ...
                  break;

               case 'chan-2':
                              ...
                   break;
           }

           switch($message) {
               case 'msg1':
                  ...
                  break;

               case 'msg2':
                              ...
                   break;
           }

}


subscribe.php中设置不超时

方法1:ini_set('default_socket_timeout', -1);

方法2:$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

如果不设置不超时,60s后会报一个错误

PHP Fatal error: Uncaught RedisException: read error on connection to 127.0.0.1:6379 in subscribe.php:6

方式一的实现,是通过临时修改ini的配置值,default_socket_timeout默认为60s,default_socket_timeout是socket流的超时参数,即socket流从建立到传输再到关闭整个过程必须要在这个参数设置的时间以内完成,如果不能完成,那么PHP将自动结束这个socket并返回一个警告。

方式二是通过修改redis的配置项,因此仅对redis连接生效,相对于方式1,不会产生意外的对其他方法的影响。

消息发布者publish.php

<?php

$redis = new Redis();
$redis->connect("127.0.0.1",6379);

$redis->publish('chan','this is a message');

批量订阅

redis的psubscribe支持通过模式匹配的方式实现批量订阅,订阅方式

$redis->psubscribe(['my*'],'psubscribe'); //回调函数写函数名
或者
$redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回调函数为类中的方法,类名写你自己定义的类

subscribe.php

<?php
//ini_set('default_socket_timeout', -1);  //不超时
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

//匹配方式1:发布可用$redis->publish('mymest','this is a message');
//$redis->psubscribe(['my*'],'psubscribe');    

//匹配方式2:发布可用$redis->publish('mydest','this is a message');
//$redis->psubscribe(['my?est'],'psubscribe');

//匹配方式3:发布可用$redis->publish('myaest','this is a message');或$redis->publish('myeest','this is a message');
$redis->psubscribe(['my[ae]est'],'psubscribe');

function psubscribe($redis, $pattern, $chan, $msg) {
      echo "Pattern: $pattern\n";
      echo "Channel: $chan\n";
      echo "Payload: $msg\n";
}


模式匹配规则

支持以下几种,以hello举例:

h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
特殊字符用\转义


pubsub方法介绍

public function pubsub( $keyword, $argument )

pubsub获取pub/sub系统的信息,$keyword可用为"channels", "numsub", 或者"numpat",三种,传入不同的keyword返回的数据不同

* $redis->pubsub('channels'); // All channels 获取所有的频道,返回数组
* $redis->pubsub('channels', '*pattern*'); // Just channels matching your pattern,返回符合匹配模式的频道
* $redis->pubsub('numsub', array('chan1', 'chan2')); // Get subscriber counts for 'chan1' and 'chan2'    //返回每个订阅频道的数量,返回数组
* $redis->pubsub('numpat'); // Get the number of pattern subscribers 获取模式匹配方式的订阅的数量,即$redis->psubscribe(['my[ae]est'],'psubscribe');返回数量为1,$redis->subscribe(['chan'],'callback');    这种方式获取不到,因此返回数量为0

毕竟绝大部分写业务的程序员,在实际开发中使用 Redis 的时候,基本是用哪学哪,对 Redis 缺乏了一个整体认知


那么如何才能正确的掌握Redis呢?

为了帮大家快速建立Redis的体系,分享一套Redis实战教程【Redis6.0全套实战教程】

课程目录

1 redis的五种数据结构讲解


2 string结构与应用场景讲解


3 hash结构与应用场景讲解


4 list结构与应用场景讲解


5 set结构与应用场景讲解


6 实例讲解:集合操作实现微博微信关注模型


7 zset有序集合结构与应用场景


8 一线大厂redis深度学习路线指引


9 互联网公司分布式锁场景实例讲解


10 分布式架构下如何实现redis分布式锁


11 redis主从架构锁失效问题及redlock详解


12 基于redisson框架实现分布式锁


13 redisson分布式锁实战原理讲解


14 双十一大促如何将分布式锁性能提升100倍


15 Redis特性与应用场景解析


16 Redis Db数据结构讲解


17 string数据结构讲解


18 字符串数据结构讲解


19 面试题:如何实现亿级用户日活统计


20 Redis6.0多线程工作模式解析


21 Redis阻塞队列实现原理


22 list数据结构讲解


23 hash数据结构讲解


24 一线大厂Redis面试核心知识点梳理


25 redis集群方案比较-哨兵模式详解


26 redis集群方案比较-高可用集群模式


27 redis安装


28 redis集群搭建过程详解


29 验证集群


30 轻松搭建redis缓存高可用集群


非常抱歉由于头条规则无法发放资料外链,领取方式:点赞关注小编后私信【资料】获取资料领取方式!

部分资料展示:

领取方式:点赞关注小编后私信【资料】获取资料领取方式!

相关推荐

Redis合集-使用benchmark性能测试

采用开源Redis的redis-benchmark工具进行压测,它是Redis官方的性能测试工具,可以有效地测试Redis服务的性能。本次测试使用Redis官方最新的代码进行编译,详情请参见Redis...

Java简历总被已读不回?面试挂到怀疑人生?这几点你可能真没做好

最近看了几十份简历,发现大部分人不是技术差,而是不会“卖自己”——一、简历死穴:你写的不是经验,是岗位说明书!反面教材:ד使用SpringBoot开发项目”ד负责用户模块功能实现”救命写法:...

redission YYDS(redission官网)

每天分享一个架构知识Redission是一个基于Redis的分布式Java锁框架,它提供了各种锁实现,包括可重入锁、公平锁、读写锁等。使用Redission可以方便地实现分布式锁。red...

从数据库行锁到分布式事务:电商库存防超卖的九重劫难与破局之道

2023年6月18日我们维护的电商平台在零点刚过3秒就遭遇了严重事故。监控大屏显示某爆款手机SKU_IPHONE13_PRO_MAX在库存仅剩500台时,订单系统却产生了1200笔有效订单。事故复盘发...

SpringBoot系列——实战11:接口幂等性的形而上思...

欢迎关注、点赞、收藏。幂等性不仅是一种技术需求,更是数字文明对确定性追求的体现。在充满不确定性的网络世界中,它为我们建立起可依赖的存在秩序,这或许正是技术哲学最深刻的价值所在。幂等性的本质困境在支付系...

如何优化系统架构设计缓解流量压力提升并发性能?Java实战分享

如何优化系统架构设计缓解流量压力提升并发性能?Java实战分享在高流量场景下。首先,我需要回忆一下常见的优化策略,比如负载均衡、缓存、数据库优化、微服务拆分这些。不过,可能还需要考虑用户的具体情况,比...

Java面试题: 项目开发中的有哪些成长?该如何回答

在Java面试中,当被问到“项目中的成长点”时,面试官不仅想了解你的技术能力,更希望看到你的问题解决能力、学习迭代意识以及对项目的深度思考。以下是回答的策略和示例,帮助你清晰、有说服力地展示成长点:一...

互联网大厂后端必看!Spring Boot 如何实现高并发抢券逻辑?

你有没有遇到过这样的情况?在电商大促时,系统上线了抢券活动,结果活动刚一开始,服务器就不堪重负,出现超卖、系统崩溃等问题。又或者用户疯狂点击抢券按钮,最后却被告知无券可抢,体验极差。作为互联网大厂的后...

每日一题 |10W QPS高并发限流方案设计(含真实代码)

面试场景还原面试官:“如果系统要承载10WQPS的高并发流量,你会如何设计限流方案?”你:“(稳住,我要从限流算法到分布式架构全盘分析)…”一、为什么需要限流?核心矛盾:系统资源(CPU/内存/数据...

Java面试题:服务雪崩如何解决?90%人栽了

服务雪崩是指微服务架构中,由于某个服务出现故障,导致故障在服务之间不断传递和扩散,最终造成整个系统崩溃的现象。以下是一些解决服务雪崩问题的常见方法:限流限制请求速率:通过限流算法(如令牌桶算法、漏桶算...

面试题官:高并发经验有吗,并发量多少,如何回复?

一、有实际高并发经验(建议结构)直接量化"在XX项目中,系统日活用户约XX万,核心接口峰值QPS达到XX,TPS处理能力为XX/秒。通过压力测试验证过XX并发线程下的稳定性。"技术方案...

瞬时流量高并发“保命指南”:这样做系统稳如泰山,老板跪求加薪

“系统崩了,用户骂了,年终奖飞了!”——这是多少程序员在瞬时大流量下的真实噩梦?双11秒杀、春运抢票、直播带货……每秒百万请求的冲击,你的代码扛得住吗?2025年了,为什么你的系统一遇高并发就“躺平”...

其实很多Java工程师不是能力不够,是没找到展示自己的正确姿势。

其实很多Java工程师不是能力不够,是没找到展示自己的正确姿势。比如上周有个小伙伴找我,五年经验但简历全是'参与系统设计''优化接口性能'这种空话。我就问他:你做的秒杀...

PHP技能评测(php等级考试)

公司出了一些自我评测的PHP题目,现将题目和答案记录于此,以方便记忆。1.魔术函数有哪些,分别在什么时候调用?__construct(),类的构造函数__destruct(),类的析构函数__cal...

你的简历在HR眼里是青铜还是王者?

你的简历在HR眼里是青铜还是王者?兄弟,简历投了100份没反应?面试总在第三轮被刷?别急着怀疑人生,你可能只是踩了这些"隐形求职雷"。帮3630+程序员改简历+面试指导和处理空窗期时间...

取消回复欢迎 发表评论: