百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

.NET Core自带的"消息队列"、还有"定时"+"定量"消费,真妙

mhr18 2024-12-09 12:15 19 浏览 0 评论

【作者】科技、互联网行业优质创作者

【专注领域】.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造

点击右上方“关注”,里面有很多高价值技术文章,是你刻苦努力也积累不到的经验,能助你快速成长。升职+涨薪!!

背景

最近做一个项目,连接了很多设备,需要保存设备的心跳数据,刚开始的做法是直接接收到设备的数据之后进行心跳数据的保存,但是随着设备多了起来,然后设备的使用时长不断的加大,对数据库的压力也比较大,所以想着优化一下。

方案调研

1、使用第三方中间件

常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。

2、使用channel

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 Writer 和 Reader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列

目前就介绍来看非常完美,不需要添加第三方中间件,直接添加现有的模块即可。

代码实现

选择了使用channel来做优化。拿到设备数据之后直接把消息丢入到channel,然后后台使用定时任务或者自己实现hostservice去不断的消费数据。

生产者代码

public async Task ProduceHeartBeat(string message)
{
    await channel.Writer.WriteAsync(message);
}

不断的向里面写入数据即可.

消费者代码

/// <summary>
/// timespan时间内消费多少数据
/// </summary>
/// <param name="count"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
{
     var result = new List<string>(count);
     CancellationTokenSource cts = new CancellationTokenSource();
     var cancellationToken = cts.Token;
     cts.CancelAfter(timeSpan);
     int rcount = 0;
     while ( !cancellationToken.IsCancellationRequested && rcount<count)
     {
         //await Task.Delay(2000);
         if (channel.Reader.TryRead(out var number))
         {
             Console.WriteLine(number);
             result.Add(number);
             rcount++;
         }
         else
         {
             break;
         } 
     }  
    return result;
}

里面加入了一个cancellationToken,进行消费的时长限制。在此时长内消费多少条数据,超时直接结束。

这就是基本的代码

后台定时消费数据

public class HeartBeatService : BackgroundService
{
     private readonly HeartBeatsChannel heartBeatsChannel;

     public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
     {
         this.heartBeatsChannel = heartBeatsChannel;
     }

     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         try
         {

             Task.Factory.StartNew(() =>
             {
                 while (!stoppingToken.IsCancellationRequested)
                 {
                     //阻塞的队列使得一直在同一个线程运行
                     Process(15,heartBeatsChannel).Wait();
                 }

             }, TaskCreationOptions.LongRunning);

             Console.WriteLine("主线程 现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);

             }
         catch (Exception ex)
         {
             Console.WriteLine(ex.ToString());
         }
     }
     /// <summary>
     /// 消费数据
     /// </summary>
     /// <param name="count">一次消费数量</param>
     /// <param name="heartBeatsChannel"></param>
     /// <returns></returns>
     private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
     {
         Console.WriteLine("子线程_现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
         //每次消费三十个
         if (heartBeatsChannel.IsHasContent)
         {
             //int count = 15;
             //进行消费
             await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
         }           
         await Task.Delay(3000);
     }
}

使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。

在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。


总结


以上就是主要的实现全过程,完整的代码在github

https://github.com/lackguozi/LearnChannelWebApi

实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源?

总结的话学习了channel的用法,底层似乎使用了deque?只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好

?请点击上方“关注”我,里面有很多高价值技术文章,是你刻苦努力也积累不到的经验,能助你快速成长。升职+涨薪!!

最后,再给你分享个全网最全.NET/C#视频学习教程:

领取方式:在我的个人主页的第一篇置顶文章中领取

相关推荐

一文读懂Prometheus架构监控(prometheus监控哪些指标)

介绍Prometheus是一个系统监控和警报工具包。它是用Go编写的,由Soundcloud构建,并于2016年作为继Kubernetes之后的第二个托管项目加入云原生计算基金会(C...

Spring Boot 3.x 新特性详解:从基础到高级实战

1.SpringBoot3.x简介与核心特性1.1SpringBoot3.x新特性概览SpringBoot3.x是建立在SpringFramework6.0基础上的重大版...

「技术分享」猪八戒基于Quartz分布式调度平台实践

点击原文:【技术分享】猪八戒基于Quartz分布式调度平台实践点击关注“八戒技术团队”,阅读更多技术干货1.背景介绍1.1业务场景调度任务是我们日常开发中非常经典的一个场景,我们时常会需要用到一些不...

14. 常用框架与工具(使用的框架)

本章深入解析Go生态中的核心开发框架与工具链,结合性能调优与工程化实践,提供高效开发方案。14.1Web框架(Gin,Echo)14.1.1Gin高性能实践//中间件链优化router:=...

SpringBoot整合MyBatis-Plus:从入门到精通

一、MyBatis-Plus基础介绍1.1MyBatis-Plus核心概念MyBatis-Plus(简称MP)是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,为简化开发、提...

Seata源码—5.全局事务的创建与返回处理

大纲1.Seata开启分布式事务的流程总结2.Seata生成全局事务ID的雪花算法源码3.生成xid以及对全局事务会话进行持久化的源码4.全局事务会话数据持久化的实现源码5.SeataServer创...

Java开发200+个学习知识路线-史上最全(框架篇)

1.Spring框架深入SpringIOC容器:BeanFactory与ApplicationContextBean生命周期:实例化、属性填充、初始化、销毁依赖注入方式:构造器注入、Setter注...

OpenResty 入门指南:从基础到动态路由实战

一、引言1.1OpenResty简介OpenResty是一款基于Nginx的高性能Web平台,通过集成Lua脚本和丰富的模块,将Nginx从静态反向代理转变为可动态编程的应用平台...

你还在为 Spring Boot3 分布式锁实现发愁?一文教你轻松搞定!

作为互联网大厂后端开发人员,在项目开发过程中,你有没有遇到过这样的问题:多个服务实例同时访问共享资源,导致数据不一致、业务逻辑混乱?没错,这就是分布式环境下常见的并发问题,而分布式锁就是解决这类问题的...

近2万字详解JAVA NIO2文件操作,过瘾

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。从classpath中读取过文件的人,都知道需要写一些读取流的方法,很是繁琐。最近使用IDEA在打出.这个符号的时候,一行代...

学习MVC之租房网站(十二)-缓存和静态页面

在上一篇<学习MVC之租房网站(十一)-定时任务和云存储>学习了Quartz的使用、发邮件,并将通过UEditor上传的图片保存到云存储。在项目的最后,再学习优化网站性能的一些技术:缓存和...

Linux系统下运行c++程序(linux怎么运行c++文件)

引言为什么要在Linux下写程序?需要更多关于Linux下c++开发的资料请后台私信【架构】获取分享资料包括:C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdf...

2022正确的java学习顺序(文末送java福利)

对于刚学习java的人来说,可能最大的问题是不知道学习方向,每天学了什么第二天就忘了,而课堂的讲解也是很片面的。今天我结合我的学习路线为大家讲解下最基础的学习路线,真心希望能帮到迷茫的小伙伴。(有很多...

一个 3 年 Java 程序员 5 家大厂的面试总结(已拿Offer)

前言15年毕业到现在也近三年了,最近面试了阿里集团(菜鸟网络,蚂蚁金服),网易,滴滴,点我达,最终收到点我达,网易offer,蚂蚁金服二面挂掉,菜鸟网络一个月了还在流程中...最终有幸去了网易。但是要...

多商户商城系统开发全流程解析(多商户商城源码免费下载)

在数字化商业浪潮中,多商户商城系统成为众多企业拓展电商业务的关键选择。这类系统允许众多商家在同一平台销售商品,不仅丰富了商品种类,还为消费者带来更多样的购物体验。不过,开发一个多商户商城系统是个复杂的...

取消回复欢迎 发表评论: