百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何使用SSE实现服务器高性能实时发送通知

mhr18 2024-12-08 15:04 17 浏览 0 评论

技术方案

为了实现的实时通知系统,实现了以下内容:

  • Server-Sent Events (SSE) :使用 SSE 在服务器和客户端之间建立实时、单向的通道,允许在通知发生时立即传递。
  • Redis Pub/Sub: 使用 Redis pub/sub 作为消息系统,可以让多个发布者向多个订阅者发送消息。
  • Notification Write API: 开发了一个通知写入 API 来收集来自其他应用程序的通知, 此 API 向 Redis 发送通知。
  • Notification Read API: 也开发了通知读取API,订阅Redis,监听所有的通知。 收到通知后,读取 API 将它们发送到通过 SSE 连接到它的客户端。

为什么选择SSE

1.HTTP 轮询虽然是一个可行的选项,但不提供即时通知传递。 使用轮询,必须以特定时间间隔对服务器执行 ping 操作以检查是否有新事件,这可能会占用大量资源且效率低下,尤其是在许多用户订阅了通知的情况下。

2.另一方面,SSE 在服务器和客户端之间提供实时、单向的通道,允许在通知发生时立即传送,提供了实时性的保障。

3.虽然 WebSocket是实时通信的实现方案之一,但与 SSE 相比,它们需要更复杂的实现,虽然允许服务器和客户端之间进行双向通信,但由于只需要向客户端推送通知,因此选择 SSE 作为更简单、更高效的解决方案。

Redis 发布/订阅

为了从多个应用程序 pod 发送通知,使用了 Redis pub/sub。 Redis pub/sub 是一个消息系统,可以让多个发布者向多个订阅者发送消息。

import { Redis } from 'ioredis';

@Injectable()
export class RedisService implements OnModuleInit {

  ...

  async onModuleInit() {
    this.subscriber = new Redis({
      sentinels: this.redisSentinelConfig.addresses,
      name: this.redisSentinelConfig.masterName,
      password: this.redisSentinelConfig.password,
    });
    
    await this.subscriber.subscribe(this.redisSentinelConfig.channelName, async (err, count) => {
      if (err) {
        this.logger.error(`Failed to subscribe: ${err.message}`);
        return;
      }
      this.logger.log(`Subscribed successfully! This client is currently subscribed to ${count} channels`);
    });
    
    this.subscriber.on('message', async (channel, message: string) => {
      const liveNotification: LiveNotification = JSON.parse(message);
      await this.liveNotificationService.emit(liveNotification);
    });
  }
}

系统设计

为了收集来自其他应用程序的通知,开发了Notification Write API,此 API 向 Redis 发送通知。 Notification Read API订阅Redis,监听所有通知。 收到通知后,读取 API 将通过 SSE把消息发送到 连接的客户端。

由于多个读取 API 可能运行在不同的应用程序 Pod 中,因此每个读取 API 都会收到来自 Redis 的所有通知。Notification Read API将根据连接到当前 pod 的客户端过滤通知。 确保只向每个连接的客户端发送相关通知。


import { AuthGuard } from '@nestjs/passport';
import { Controller, Sse, UseGuards } from '@nestjs/common';

@Controller('/live-notification')
export class LiveNotificationController {
  constructor(private readonly liveNotificationService: LiveNotificationService) {}

  @Sse()
  @ApiBearerAuth()
  @UseGuards(AuthGuard('jwt'))
  public getEventsBySeller(@Tracers() tracers: ITracers, @SellerId() sellerId: number) {
    return this.liveNotificationService.subscribeForSeller(sellerId);
  }
}
 

import { EventEmitter } from 'events';
import { filter, fromEvent } from 'rxjs';

@Injectable()
export class LiveNotificationService implements OnModuleInit {
  
  private readonly emitter = new EventEmitter();
  
  ...
  
  public async emit(data: LiveNotification) {
    this.emitter.emit('liveNotification', { data });
  }
  
  public subscribeForSeller(sellerId: number) {
    const source = fromEvent(this.emitter, 'liveNotification');
    return source.pipe(
      filter(({ data: liveNotification }) => 
        liveNotification?.content == 'heartbeat' || 
        liveNotification?.sellerId == sellerId)
    );
  }
}
    

心跳消息

为避免服务器和浏览器之间的连接关闭,需要发送心跳消息来检测断开连接。为了解决这个问题,实现了每 30 秒发送一次心跳消息,从而解决了这个问题。

onModuleInit (): any { 
  setInterval ( () => { 
    const emitterListenerCount = this . emitter . listenerCount ( 'liveNotification' ); 
    this . logger . log ( `Heartbeat 消息发送到具有活动发射器侦听器计数的 SSE 客户端:${emitterListenerCount} ` ); 
    this . emitter . emit ( 'liveNotification' , { data : { content : 'heartbeat' } });
  }, 30000 ); 
}

性能表现

目前的通知系统有 90,000 个并发连接,在生产环境中运行 15 个 pod 来处理负载。每个 Kubernetes pod 消耗大约 800 MB 的内存和 300 Mi 的 CPU 资源。

相关推荐

MySQL数据库中,数据量越来越大,有什么具体的优化方案么?

个人的观点,这种大表的优化,不一定上来就要分库分表,因为表一旦被拆分,开发、运维的复杂度会直线上升,而大多数公司和开发人员是欠缺这种能力的。所以MySQL中几百万甚至小几千万的表,先考虑做单表的优化。...

Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然

你是不是每天打开APP,第一时间就是去“签到打卡”?或者在社交软件里,看到你的朋友头像旁边亮着“在线”的绿灯?这些看似简单的功能背后,都隐藏着一个有趣而高效的数据结构。如果让你来设计一个签到系统:用户...

想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!

作为一名内容创作者,你每天最期待的,除了文章阅读量蹭蹭上涨,是不是还特别想知道,到底有多少个“独立用户”阅读了你的文章?这个数字,我们通常称为“UV”(UniqueVisitors),它比总阅读量更...

Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器

你可能从未听过这个拗口的名字——“HyperLogLog”,它听起来就像是某个高深莫测的数学公式。但请相信我,理解它的核心思想并不难,而且一旦你掌握了它,你会发现它在处理大数据统计问题时,简直就是“救...

阿里云国际站:为什么我的云服务器运行缓慢?

本文由【云老大】TG@yunlaoda360撰写一、网络性能瓶颈带宽不足现象:上传/下载速度慢,远程连接卡顿。排查:通过阿里云控制台查看网络流量峰值是否接近带宽上限34。解决:升级带宽(如从1M提...

Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan

作者|MichaelRedlich译者|明知山策划|丁晓昀OpenJDKJEP503(移除32位x86移植版本)已从“ProposedtoTarget”状态进入到“T...

腾讯云国际站:怎样设置自动伸缩应对流量高峰?

云计算平台服务以阿里云为例:开通服务与创建伸缩组:登录阿里云控制台,找到弹性伸缩服务并开通。创建伸缩组时,选择地域与可用区,定义伸缩组内最小/最大实例数,绑定已有VPC虚拟交换机。实例模板需...

【案例分享】如何利用京东云建设高可用业务架构

本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业...

Spring Security在前后端分离项目中的使用

1文章导读SpringSecurity是Spring家族中的一个安全管理框架,可以和SpringBoot项目很方便的集成。SpringSecurity框架的两大核心功能:认证和授权认证:...

Redis与Java集成的最佳实践

Redis与Java集成的最佳实践在当今互联网飞速发展的时代,缓存技术的重要性毋庸置疑。Redis作为一款高性能的分布式缓存数据库,与Java语言的结合更是如虎添翼。今天,我们就来聊聊Redis与Ja...

Redis在Java项目中的应用与数据持久化

Redis在Java项目中的应用与数据持久化Redis简介:为什么我们需要它?在Java项目中,Redis就像一位不知疲倦的快跑选手,总能在关键时刻挺身而出。作为一个内存数据库,它在处理高并发请求时表...

Redis 集群最大节点个数是多少?

Redis集群最大节点个数取决于Redis的哈希槽数量,因为每个节点可以负责多个哈希槽。在Redis3.0之前,Redis集群最多支持16384个哈希槽,因此最大节点数为16384个。但是在Redi...

Java开发岗面试宝典:分布式相关问答详解

今天千锋广州Java小编就给大家分享一些就业面试宝典之分布式相关问题,一起来看看吧!1.Redis和Memcache的区别?1、存储方式Memecache把数据全部存在内存之中,断电后会挂掉,数据不...

当Redis内存不足时,除了加内存,还有哪些曲线救国的办法?

作为“速度之王”的Redis,其高性能的秘密武器之一就是将数据存储在内存中。然而,内存资源是有限且昂贵的。当你的Redis实例开始告警“内存不足”,或者写入请求被阻塞时,最直接的解决方案似乎就是“加内...

商品详情页那么多信息,Redis的“哈希”如何优雅存储?

你每天网购时,无论是打开淘宝、京东还是拼多多,看到的商品详情页都琳琅满目:商品名称、价格、库存、图片、描述、评价数量、销量。这些信息加起来,多的惊人。那么问题来了:这些海量的商品信息,程序是去哪里取出...

取消回复欢迎 发表评论: