百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

net core Redis消息队列中间件「InitQ」

mhr18 2024-11-26 12:18 18 浏览 0 评论

前言

这是一篇拖更很久的博客,不知不觉InitQ在nuget下载量已经过15K了,奈何胸无点墨也不晓得怎么写(懒),随便在github上挂了个md,现在好好唠唠如何在redis里使用队列


队列缓存分布式 异步调优堆配置 ------(来自某位不知名码友)

诞生背景

redis在项目中使用的越来越频繁,通常我们是用来做缓存,使用较多的就是String,Hash这两种类型,以及分布式锁,redis的List类型,就可以用于消息队列,使用起来更加简单,且速度更快,非常适合子服务内部之间的消息流转,创造灵感来自于杨老板的CAP(地址:https://www.cnblogs.com/tibos/p/11858095.html),采用注解的方式消费队列,让业务逻辑更加的清晰,方便维护

安装环境

.net core版本:2.1redis版本:3.0以上

特点

1.通过注解的方式,订阅队列
2.可以设置消费消息的频次
3.支持消息广播
4.支持延迟队列

使用介绍

  • 1.获取initQ包

    方案A. install-package InitQ
    方案B. nuget包管理工具搜索 InitQ
  • 2.添加中间件(该中间件依赖 StackExchange.Redis)


services.AddInitQ(m=> { m.SuspendTime = 1000; m.IntervalTime = 1000; m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456"; m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) }; m.ShowLog = false; });

  • 3.配置说明public class InitQOptions { /// <summary> /// redis连接字符串 /// </summary> public string ConnectionString { get; set; } /// <summary> /// 没消息时挂起时长(毫秒) /// </summary> public int SuspendTime { get; set; } /// <summary> /// 每次消费消息间隔时间(毫秒) /// </summary> public int IntervalTime { get; set; } /// <summary> /// 是否显示日志 /// </summary> public bool ShowLog { get; set; } /// <summary> /// 需要注入的类型 /// </summary> public IList<Type> ListSubscribe { get; set; } public InitQOptions() { ConnectionString = ""; IntervalTime = 0; SuspendTime = 1000; ShowLog = false; } }

消息发布/订阅

消息的发布/订阅是最基础的功能,这里做了几个优化

  1. 采用的是长轮询模式,可以控制消息消费的频次,以及轮询空消息的间隔,避免资源浪费
  2. 支持多个类订阅消息,可以很方便的根据业务进行分类,前提是这些类 必须注册
  3. 支持多线程消费消息(在执行耗时任务的时候,非常有用)

示例如下(Thread.Sleep):

    public class RedisSubscribeA: IRedisSubscribe
    {
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest(string msg)
        {
            Console.WriteLine(#34;A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine(#34;A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
    }
    public class RedisSubscribeA: IRedisSubscribe
    {
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest(string msg)
        {
            Console.WriteLine(#34;A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine(#34;A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest2(string msg)
        {
            Console.WriteLine(#34;A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine(#34;A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
    }

示例如下(Task.Delay):

    [Subscribe("tibos_test_1")]
    private async Task SubRedisTest(string msg)
    {
        Console.WriteLine(#34;A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
        await Task.Delay(3000); //使用非堵塞线程模式,异步延时
        Console.WriteLine(#34;A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
    }

根据业务情况,合理的选择堵塞模式

  • 1.订阅发布者 using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis对象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); //循环向 tibos_test_1 队列发送消息 for (int i = 0; i < 1000; i++) { await _redis.ListRightPushAsync("tibos_test_1", #34;我是消息{i + 1}号"); } }
  • 2.定义消费者类 RedisSubscribeApublic class RedisSubscribeA: IRedisSubscribe { [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine(#34;A类--->订阅者A消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest1(string msg) { Console.WriteLine(#34;A类--->订阅者A1消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest2(string msg) { Console.WriteLine(#34;A类--->订阅者A2消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest3(string msg) { Console.WriteLine(#34;A类--->订阅者A3消息消息:{msg}"); } }
  • 3.定义消费者类 RedisSubscribeBpublic class RedisSubscribeB : IRedisSubscribe { /// <summary> /// 测试 /// </summary> /// <param name="msg"></param> /// <returns></returns> [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine(#34;B类--->订阅者B消费消息:{msg}"); } }

消息广播/订阅

消息广播是StackExchange.Redis已经封装好的,我们只用起个线程监听即可,只要监听了这个key的线程,都会收到消息

  • 1.订阅消息通道,订阅者需要在程序初始化的时候启动一个线程侦听通道,这里使用HostedService来实现,并注册到容器 public class ChannelSubscribeA : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程序启动"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis对象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 订阅服务A收到消息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("结束"); return Task.CompletedTask; } } public class ChannelSubscribeB : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程序启动"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis对象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 订阅服务B收到消息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("结束"); return Task.CompletedTask; } }
  • 2.将HostedService类注入到容器 services.AddHostedService<ChannelSubscribeA>(); services.AddHostedService<ChannelSubscribeB>();
  • 3.广播消息 using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis对象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 1000; i++) { await _redis.PublishAsync("test_channel", #34;往通道发送第{i}条消息"); } }

延迟消息

延迟消息非常适用处理一些定时任务的场景,如订单15分钟未付款,自动取消, xxx天后,自动续费...... 这里使用zset+redis锁来实现,这里的操作方式,跟发布/订阅非常类似
写入延迟消息:
SortedSetAddAsync
注解使用:SubscribeDelay

  • 1.定义发布者 Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis对象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 100; i++) { var dt = DateTime.Now.AddSeconds(3 * (i + 1)); //key:redis里的key,唯一 //msg:任务 //time:延时执行的时间 await _redis.SortedSetAddAsync("test_0625", #34;延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt); } } });
  • 2.定义消费者 //延迟队列 [SubscribeDelay("test_0625")] private async Task SubRedisTest1(string msg) { Console.WriteLine(#34;A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}"); //模拟任务执行耗时 await Task.Delay(TimeSpan.FromSeconds(3)); Console.WriteLine(#34;A类--->{msg} 结束<---"); }

版本

  • V1.0 更新时间:2019-12-30


原文地址:https://www.cnblogs.com/tibos/p/14944832.html

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: