如何使用SSE实现服务器高性能实时发送通知
mhr18 2024-12-08 15:04 17 浏览 0 评论
技术方案
为了实现的实时通知系统,实现了以下内容:
- Server-Sent Events (SSE) :使用 SSE 在服务器和客户端之间建立实时、单向的通道,允许在通知发生时立即传递。
- Redis Pub/Sub: 使用 Redis pub/sub 作为消息系统,可以让多个发布者向多个订阅者发送消息。
- Notification Write API: 开发了一个通知写入 API 来收集来自其他应用程序的通知, 此 API 向 Redis 发送通知。
- Notification Read API: 也开发了通知读取API,订阅Redis,监听所有的通知。 收到通知后,读取 API 将它们发送到通过 SSE 连接到它的客户端。
为什么选择SSE
1.HTTP 轮询虽然是一个可行的选项,但不提供即时通知传递。 使用轮询,必须以特定时间间隔对服务器执行 ping 操作以检查是否有新事件,这可能会占用大量资源且效率低下,尤其是在许多用户订阅了通知的情况下。
2.另一方面,SSE 在服务器和客户端之间提供实时、单向的通道,允许在通知发生时立即传送,提供了实时性的保障。
3.虽然 WebSocket是实时通信的实现方案之一,但与 SSE 相比,它们需要更复杂的实现,虽然允许服务器和客户端之间进行双向通信,但由于只需要向客户端推送通知,因此选择 SSE 作为更简单、更高效的解决方案。
Redis 发布/订阅
为了从多个应用程序 pod 发送通知,使用了 Redis pub/sub。 Redis pub/sub 是一个消息系统,可以让多个发布者向多个订阅者发送消息。
import { Redis } from 'ioredis';
@Injectable()
export class RedisService implements OnModuleInit {
...
async onModuleInit() {
this.subscriber = new Redis({
sentinels: this.redisSentinelConfig.addresses,
name: this.redisSentinelConfig.masterName,
password: this.redisSentinelConfig.password,
});
await this.subscriber.subscribe(this.redisSentinelConfig.channelName, async (err, count) => {
if (err) {
this.logger.error(`Failed to subscribe: ${err.message}`);
return;
}
this.logger.log(`Subscribed successfully! This client is currently subscribed to ${count} channels`);
});
this.subscriber.on('message', async (channel, message: string) => {
const liveNotification: LiveNotification = JSON.parse(message);
await this.liveNotificationService.emit(liveNotification);
});
}
}
系统设计
为了收集来自其他应用程序的通知,开发了Notification Write API,此 API 向 Redis 发送通知。 Notification Read API订阅Redis,监听所有通知。 收到通知后,读取 API 将通过 SSE把消息发送到 连接的客户端。
由于多个读取 API 可能运行在不同的应用程序 Pod 中,因此每个读取 API 都会收到来自 Redis 的所有通知。Notification Read API将根据连接到当前 pod 的客户端过滤通知。 确保只向每个连接的客户端发送相关通知。
import { AuthGuard } from '@nestjs/passport';
import { Controller, Sse, UseGuards } from '@nestjs/common';
@Controller('/live-notification')
export class LiveNotificationController {
constructor(private readonly liveNotificationService: LiveNotificationService) {}
@Sse()
@ApiBearerAuth()
@UseGuards(AuthGuard('jwt'))
public getEventsBySeller(@Tracers() tracers: ITracers, @SellerId() sellerId: number) {
return this.liveNotificationService.subscribeForSeller(sellerId);
}
}
import { EventEmitter } from 'events';
import { filter, fromEvent } from 'rxjs';
@Injectable()
export class LiveNotificationService implements OnModuleInit {
private readonly emitter = new EventEmitter();
...
public async emit(data: LiveNotification) {
this.emitter.emit('liveNotification', { data });
}
public subscribeForSeller(sellerId: number) {
const source = fromEvent(this.emitter, 'liveNotification');
return source.pipe(
filter(({ data: liveNotification }) =>
liveNotification?.content == 'heartbeat' ||
liveNotification?.sellerId == sellerId)
);
}
}
心跳消息
为避免服务器和浏览器之间的连接关闭,需要发送心跳消息来检测断开连接。为了解决这个问题,实现了每 30 秒发送一次心跳消息,从而解决了这个问题。
onModuleInit (): any {
setInterval ( () => {
const emitterListenerCount = this . emitter . listenerCount ( 'liveNotification' );
this . logger . log ( `Heartbeat 消息发送到具有活动发射器侦听器计数的 SSE 客户端:${emitterListenerCount} ` );
this . emitter . emit ( 'liveNotification' , { data : { content : 'heartbeat' } });
}, 30000 );
}
性能表现
目前的通知系统有 90,000 个并发连接,在生产环境中运行 15 个 pod 来处理负载。每个 Kubernetes pod 消耗大约 800 MB 的内存和 300 Mi 的 CPU 资源。
相关推荐
- MySQL数据库中,数据量越来越大,有什么具体的优化方案么?
-
个人的观点,这种大表的优化,不一定上来就要分库分表,因为表一旦被拆分,开发、运维的复杂度会直线上升,而大多数公司和开发人员是欠缺这种能力的。所以MySQL中几百万甚至小几千万的表,先考虑做单表的优化。...
- Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然
-
你是不是每天打开APP,第一时间就是去“签到打卡”?或者在社交软件里,看到你的朋友头像旁边亮着“在线”的绿灯?这些看似简单的功能背后,都隐藏着一个有趣而高效的数据结构。如果让你来设计一个签到系统:用户...
- 想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!
-
作为一名内容创作者,你每天最期待的,除了文章阅读量蹭蹭上涨,是不是还特别想知道,到底有多少个“独立用户”阅读了你的文章?这个数字,我们通常称为“UV”(UniqueVisitors),它比总阅读量更...
- Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器
-
你可能从未听过这个拗口的名字——“HyperLogLog”,它听起来就像是某个高深莫测的数学公式。但请相信我,理解它的核心思想并不难,而且一旦你掌握了它,你会发现它在处理大数据统计问题时,简直就是“救...
- 阿里云国际站:为什么我的云服务器运行缓慢?
-
本文由【云老大】TG@yunlaoda360撰写一、网络性能瓶颈带宽不足现象:上传/下载速度慢,远程连接卡顿。排查:通过阿里云控制台查看网络流量峰值是否接近带宽上限34。解决:升级带宽(如从1M提...
- Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan
-
作者|MichaelRedlich译者|明知山策划|丁晓昀OpenJDKJEP503(移除32位x86移植版本)已从“ProposedtoTarget”状态进入到“T...
- 腾讯云国际站:怎样设置自动伸缩应对流量高峰?
-
云计算平台服务以阿里云为例:开通服务与创建伸缩组:登录阿里云控制台,找到弹性伸缩服务并开通。创建伸缩组时,选择地域与可用区,定义伸缩组内最小/最大实例数,绑定已有VPC虚拟交换机。实例模板需...
- 【案例分享】如何利用京东云建设高可用业务架构
-
本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业...
- Spring Security在前后端分离项目中的使用
-
1文章导读SpringSecurity是Spring家族中的一个安全管理框架,可以和SpringBoot项目很方便的集成。SpringSecurity框架的两大核心功能:认证和授权认证:...
- Redis与Java集成的最佳实践
-
Redis与Java集成的最佳实践在当今互联网飞速发展的时代,缓存技术的重要性毋庸置疑。Redis作为一款高性能的分布式缓存数据库,与Java语言的结合更是如虎添翼。今天,我们就来聊聊Redis与Ja...
- Redis在Java项目中的应用与数据持久化
-
Redis在Java项目中的应用与数据持久化Redis简介:为什么我们需要它?在Java项目中,Redis就像一位不知疲倦的快跑选手,总能在关键时刻挺身而出。作为一个内存数据库,它在处理高并发请求时表...
- Redis 集群最大节点个数是多少?
-
Redis集群最大节点个数取决于Redis的哈希槽数量,因为每个节点可以负责多个哈希槽。在Redis3.0之前,Redis集群最多支持16384个哈希槽,因此最大节点数为16384个。但是在Redi...
- Java开发岗面试宝典:分布式相关问答详解
-
今天千锋广州Java小编就给大家分享一些就业面试宝典之分布式相关问题,一起来看看吧!1.Redis和Memcache的区别?1、存储方式Memecache把数据全部存在内存之中,断电后会挂掉,数据不...
- 当Redis内存不足时,除了加内存,还有哪些曲线救国的办法?
-
作为“速度之王”的Redis,其高性能的秘密武器之一就是将数据存储在内存中。然而,内存资源是有限且昂贵的。当你的Redis实例开始告警“内存不足”,或者写入请求被阻塞时,最直接的解决方案似乎就是“加内...
- 商品详情页那么多信息,Redis的“哈希”如何优雅存储?
-
你每天网购时,无论是打开淘宝、京东还是拼多多,看到的商品详情页都琳琅满目:商品名称、价格、库存、图片、描述、评价数量、销量。这些信息加起来,多的惊人。那么问题来了:这些海量的商品信息,程序是去哪里取出...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- MySQL数据库中,数据量越来越大,有什么具体的优化方案么?
- Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然
- 想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!
- Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器
- 阿里云国际站:为什么我的云服务器运行缓慢?
- Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan
- 腾讯云国际站:怎样设置自动伸缩应对流量高峰?
- 【案例分享】如何利用京东云建设高可用业务架构
- Spring Security在前后端分离项目中的使用
- Redis与Java集成的最佳实践
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)