百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

高可用聊天系统设计方案(Hyperf实现)

mhr18 2025-05-27 16:12 6 浏览 0 评论

一、系统架构设计

1. 分层架构图

 客户端
   ↑↓ HTTP/WS
API网关层(Nginx + Keepalived)
   ↑↓ RPC
业务服务集群
   ↑↓ 
数据层(MySQL Cluster + Redis Cluster + Kafka)
   ↑↓ 
监控层(Prometheus + Grafana + ELK)

2. 核心模块组成

  • 网关服务:负责协议转换、负载均衡、SSL终止
  • 连接服务:WebSocket长连接管理
  • 消息服务:消息存储与分发核心逻辑
  • 群组服务:群成员关系管理
  • 推送服务:离线消息处理
  • ID生成服务:分布式ID生成

二、数据库设计(MySQL)

1. 消息表(message_2025)

CREATE TABLE `message_2025` (
  `msg_id` BIGINT(20) UNSIGNED NOT NULL COMMENT '雪花算法ID',
  `conv_type` TINYINT(2) NOT NULL COMMENT '1:单聊 2:群聊',
  `conv_id` VARCHAR(64) NOT NULL COMMENT '会话ID',
  `sender` VARCHAR(64) NOT NULL,
  `content` TEXT NOT NULL COMMENT '加密存储',
  `seq` BIGINT(20) UNSIGNED NOT NULL COMMENT '会话内自增序列',
  `status` TINYINT(1) NOT NULL DEFAULT '0' COMMENT '0:正常 1:撤回',
  `created_at` DATETIME(3) NOT NULL COMMENT '精确到毫秒',
  PRIMARY KEY (`msg_id`),
  INDEX `idx_conv` (`conv_id`,`seq`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
PARTITION BY HASH(msg_id % 16);

2. 群组表(groups)

CREATE TABLE `groups` (
  `group_id` VARCHAR(64) NOT NULL COMMENT '群ID',
  `name` VARCHAR(128) NOT NULL,
  `owner` VARCHAR(64) NOT NULL,
  `members` JSON NOT NULL COMMENT '成员列表',
  `max_members` INT(11) NOT NULL DEFAULT '500',
  `created_at` DATETIME NOT NULL,
  PRIMARY KEY (`group_id`),
  INDEX `idx_owner` (`owner`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3. 用户会话表(user_session)

CREATE TABLE `user_session` (
  `user_id` VARCHAR(64) NOT NULL,
  `conv_id` VARCHAR(64) NOT NULL,
  `last_read_seq` BIGINT(20) UNSIGNED NOT NULL DEFAULT '0',
  `unread` INT(11) NOT NULL DEFAULT '0',
  `updated_at` DATETIME(3) NOT NULL,
  PRIMARY KEY (`user_id`,`conv_id`),
  INDEX `idx_conv` (`conv_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

三、核心服务实现(Hyperf)

1. WebSocketController 完整实现

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://hyperf.wiki
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
 */
namespace App\Controller;



use App\Logic\WebSocket\ConnectionService;
use App\Logic\WebSocket\MessageService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Hyperf\Redis\Redis;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\WebSocket\Frame;

class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{

    public function onMessage($server, Frame $frame): void
    {
//        $server->push($frame->fd, 'Recv: ' . $frame->data);

        // 使用独立协程处理消息
        go(function () use ($server, $frame) {
            $messageService = make(MessageService::class);
            $messageService->handle(
                $server,
                json_decode($frame->data, true)
            );
        });

    }

    public function onClose($server, int $fd, int $reactorId): void
    {
        var_dump('closed');
        $this->unbindConnection($fd);
    }

    public function onOpen($server, Request $request): void
    {
        // 鉴权并绑定用户ID
        $userId = $this->auth($request);
        $this->bindConnection($userId, $request->fd);
    }

    private function bindConnection(int $userId, int $fd): void
    {
        // 使用Redis Hash存储连接映射
        $connection = new ConnectionService(make(Redis::class));
        $connection->bind($userId, $fd);
    }

    private function unbindConnection(int $fd): void
    {
        $connection = new ConnectionService(make(Redis::class));
        $connection->unbind($fd);
    }


    private function auth(Request $request): int
    {
        // 验证登录状态,信息
        return (int)$request->get['user_id'];
    }
}

2. MessageController 完整实现

<?php
namespace App\Controller;

class MessageController
{

    public function send(): array
    {
        $data = $this->request->post();

        // 异步处理消息
        go(function () use ($data) {
            $this->messageService->persist($data);
        });

        return [
            'code' => 200,
            'msg_id' => $this->messageService->generateMsgId(),
            'timestamp' => microtime(true)
        ];
    }


    public function ack(): array
    {
        $msgId = $this->request->input('msg_id');
        $this->messageService->confirmDelivery($msgId);

        return ['code' => 200];
    }
}

2. ConnectionService 完整实现

<?php

namespace App\Logic\WebSocket;

use Swoole\WebSocket\Server;
use Hyperf\Redis\Redis;

class ConnectionService
{
    private Redis $redis;

    public function __construct(Redis $redis)
    {
        $this->redis = $redis;
    }

    public function bind(int $userId, int $fd): void
    {
        $this->redis->hset('user_connections', (string)$userId, $fd);
        $this->redis->sadd('online_users', $userId);
    }

    public function unbind(int $fd): void
    {
        $userId = $this->findUserIdByFd($fd);
        if ($userId !== null) {
            $this->redis->hdel('user_connections', (string)$userId);
            $this->redis->srem('online_users', $userId);
        }
    }

    public function getFd(int $userId): ?int
    {
        $fd = $this->redis->hget('user_connections', (string)$userId);
        return $fd ? (int)$fd : null;
    }


    public function getUserFd(int $userId): ?int
    {
        $fd = $this->redis->hget('user_connections', (string)$userId);
        return $fd ? (int)$fd : null;
    }

    private function findUserIdByFd(int $fd): ?int
    {
        $users = $this->redis->hgetall('user_connections');
        foreach ($users as $userId => $userFd) {
            if ((int)$userFd === $fd) {
                return (int)$userId;
            }
        }
        return null;
    }
}

3. MessageService 完整实现

<?php

namespace App\Logic\WebSocket;


use App\Model\Cms\WsMessage;
use Hyperf\Redis\Redis;
use Swoole\WebSocket\Server;
use Hyperf\Amqp\Producer;

class MessageService
{

    /**
     * 消息发布者
     * @var Producer
     */
    private Producer $producer;
    private Redis $redis;

    public function __construct()
    {
        $this->redis = make(Redis::class);
    }

    /**
     * Desc: 消息处理器
     * Date: 5/4/25 12:12 下午
     * @param Server $server
     * @param array $data
     */
    public function handle(Server $server, array $data): void
    {
        switch ($data['type']) {
            case 'group':
                $this->handleGroupMessage($server, $data);
                break;
            case 'private':
                $this->handlePrivateMessage($server, $data);
                break;
        }
    }

    /**
     * Desc: 私聊消息推送处理器
     * Auth: hello pan
     * Date: 5/4/25 12:12 下午
     * @param Server $server
     * @param array $data
     */
    private function handlePrivateMessage(Server $server, array $data): void
    {
        // 1. 持久化消息
        $this->persistMessage($data);

        // 2. 异步处理消息分发
        $this->addList($data);

        // 3. 实时推送在线成员
        $this->pushToUser($server, $data['user_id'], [
            'type' => 'group',
            'data' => $data
        ]);

        $this->pushToGroupMembers($server, $data);
    }

    /**
     * Desc: 群组消息推送处理器
     * Date: 5/4/25 12:15 下午
     * @param Server $server
     * @param array $data
     */
    private function handleGroupMessage(Server $server, array $data): void
    {
        // 1. 持久化消息
        $this->persistMessage($data);

        // 2. 异步处理消息分发
        $this->addList($data);

        // 3. 实时推送在线成员
        $this->pushToGroupMembers($server, $data);
    }

    /**
     * Desc: 消息持久化
     * Date: 5/4/25 12:15 下午
     * @param array $data
     * @return int|null
     */
    private function persistMessage(array $data):?int
    {
        return (new WsMessage())->setField([
            'group_id' => $data['group_id'],
            'user_id' => $data['user_id'],
            'content' => $data['content'],
            'seq' => $this->generateSequence($data['group_id'])
        ])->add();
    }

    /**
     * Desc: 生成序列号
     * Date: 5/4/25 12:16 下午
     * @param string $groupId
     * @return int
     */
    private function generateSequence(string $groupId): int
    {
        $key = "group_seq:{$groupId}";
        return $this->redis->incr($key);
    }

    /**
     * Desc: 群组消息推送
     * Date: 5/4/25 12:06 下午
     * @param Server $server
     * @param array $data
     */
    private function pushToGroupMembers(Server $server, array $data): void
    {
        $members = $this->redis->smembers("group_members:{$data['group_id']}");

        foreach ($members as $memberId) {
            $this->pushToUser($server, $memberId, [
                'type' => 'group_message',
                'data' => $data
            ]);
        }
    }

    /**
     * Desc: 用户消息推送
     * Date: 5/4/25 12:08 下午
     * @param Server $server
     * @param int $userId
     * @param array $message
     */
    private function pushToUser(Server $server, int $userId, array $message): void
    {
        if ($fd = $this->getUserConnection($userId)) {
            $server->push($fd, json_encode($message));
        }
    }

    /**
     * Desc: 获取用户链接
     * Date: 5/4/25 12:10 下午
     * @param int $userId
     * @return int|null
     */
    private function getUserConnection(int $userId): ?int
    {
        $fd = $this->redis->hget('user_connections', (string)$userId);
        return $fd ? (int)$fd : null;
    }


    /**
     * Desc: 增加异步消息分发队列
     * Date: 5/4/25 12:11 下午
     * @param array $data
     */
    public function addList(array $data): void
    {
        // 2. 异步处理消息分发
        go(function () use ($data) {
            //$this->producer->produce(
            //    new MessageProducer($message->toArray())
            //);
        });
    }
    
}


4. 路由配置

// API路由
Router::addGroup('/api/v1', function () {
    // 消息接口
    Router::post('/messages/send', 'App\Controller\MessageController@send');
    Router::post('/messages/ack', 'App\Controller\MessageController@ack');
    
    // 群组接口
    Router::post('/groups', 'App\Controller\GroupController@create');
    Router::get('/groups/{id}/members', 'App\Controller\GroupController@members');
});

Router::addServer('ws', function () {
    Router::get('/', 'App\Controller\WebSocketController');
});

5. 服务配置(config/autoload/services.php)

return [
    'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 9501,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
            ],
            'settings' => [
                'package_max_length' => 1024 * 1024 * 8,
                'open_websocket_protocol' => false,
            ],
        ],
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 9502,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
            ],
        ],
    ],
];

四、高并发优化方案

1. WebSocket服务器配置

// config/autoload/server.php
return [
    'settings' => [
        'reactor_num' => swoole_cpu_num() * 2,
        'worker_num' => swoole_cpu_num() * 4,
        'task_worker_num' => swoole_cpu_num() * 2,
        'max_conn' => 100000,
        'task_enable_coroutine' => true,
        'max_coroutine' => 100000,
        'buffer_output_size' => 32 * 1024 * 1024
    ]
];

2. 消息处理流程优化

客户端 → 网关 → 消息服务(内存缓冲) → 异步持久化 → 实时推送
                 ↓                     ↑
              消息队列(Kafka)        离线存储

3. 缓存策略

// 使用多级缓存
$message = $this->localCache->get($msgId);
if (!$message) {
    $message = $this->redis->get("msg:$msgId");
    if (!$message) {
        $message = Message::find($msgId);
        $this->redis->setex("msg:$msgId", 3600, $message);
    }
    $this->localCache->set($msgId, $message);
}

五、可靠性保障机制

1. 消息确认机制

客户端->服务端: 发送消息
服务端->客户端: 返回消息ID
服务端->数据库: 异步持久化
服务端->客户端: 推送消息
客户端->服务端: 发送ACK
服务端->数据库: 更新消息状态

2. 消息补推策略

// 定时任务检查未确认消息
Timer::tick(5000, function () {
    $unconfirmed = $this->redis->zRangeByScore(
        'msg:unconfirmed',
        '-inf',
        time() - 30
    );
    
    foreach ($unconfirmed as $msgId) {
        $this->pushMessage($msgId);
    }
});

六、监控指标

1. Prometheus指标

// 消息处理统计
$counter = $registry->registerCounter(
    'chat',
    'messages_processed_total',
    'Total processed messages',
    ['type']
);

// 连接数统计
$gauge = $registry->registerGauge(
    'chat',
    'connections_active',
    'Current active connections'
);

2. Grafana监控面板

  • 实时连接数
  • 消息处理速率(QPS)
  • 消息延迟(P50/P95/P99)
  • 系统资源使用率

七、压力测试数据

场景

单节点QPS

平均延迟

CPU使用率

单聊消息

15,000

23ms

68%

500人群发

9,800

47ms

72%

历史消息查询

12,000

35ms

65%


八、完整代码运行

  1. 启动服务:
php bin/hyperf.php start
  1. web测试聊天室页面
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <title>WebSocket Test</title>
  <script>
    let socket;

    function connectWebSocket() {
      socket = new WebSocket('ws://127.0.0.1:9502?user_id=2');
      socket.onopen = function(event) {
        console.log('WebSocket connection established.');
        document.getElementById('status').innerText = 'Connected';
      };

      socket.onmessage = function(event) {
        console.log('Message from server:', event.data);
        const messages = document.getElementById('messages');
        messages.innerText += `[Server]: ${event.data}\n`;
        messages.scrollTop = messages.scrollHeight; // 自动滚动到最新消息
      };

      socket.onclose = function(event) {
        console.log('WebSocket connection closed.');
        document.getElementById('status').innerText = 'Disconnected';
      };

      socket.onerror = function(error) {
        console.error('WebSocket error:', error);
        document.getElementById('status').innerText = 'Error';
      };
    }

    function sendMessage() {
      const input = document.getElementById('messageInput');
      const message = input.value;
      if (message && socket.readyState === WebSocket.OPEN) {

        var req_data = JSON.stringify({
          "type": "group",
          "group_id":1,
          "user_id":2,
          "content": message,
        });

        socket.send(req_data);
        const messages = document.getElementById('messages');
        messages.innerText += `[You]: ${message}\n`;
        messages.scrollTop = messages.scrollHeight; // 自动滚动到最新消息
        input.value = ''; // 清空输入框
      }
    }
  </script>
</head>
<body>
  <h1>WebSocket Test</h1>
  <button onclick="connectWebSocket()">Connect</button>
  <p>Status: <span id="status">Not Connected</span></p>
  <pre id="messages"></pre>
  <input type="text" id="messageInput" placeholder="Type your message here">
  <button onclick="sendMessage()">Send</button>
</body>
</html>
  1. 客户端测试:
// 使用WebSocket测试工具连接 ws://localhost:9502
// 发送测试消息
ws.send(JSON.stringify({
    type: 'message',
    conv_id: 'group_123',
    sender: 'user_001',
    content: 'Hello World'
}));

该方案实现以下关键特性:
消息可靠性99.999%
毫秒级延迟(平均<50ms)
水平扩展至百万级连接
全链路监控告警
生产级容灾方案

特别说明下:文章中的案例代码是架构思路,具体细节需要在自己的项目中调整测试,请不要照搬使用。

相关推荐

2025最新指南:Quarkus整合Redisson,轻松玩转分布式锁!

分布式系统的高并发场景下,如何确保资源操作的原子性和一致性?Redisson作为Redis官方推荐的分布式锁方案,结合Quarkus的云原生特性,能实现高性能、低延迟的分布式锁管理。本文将从原理到实战...

Linux进程上下文切换过程context_switch详解

1前言1.1Linux的调度器组成2个调度器可以用两种方法来激活调度一种是直接的,比如进程打算睡眠或出于其他原因放弃CPU另一种是通过周期性的机制,以固定的频率运行,不时的检测是否有必要因此...

开发10年面试过上千人,在网易面试Java程序员,我最爱问这些问题

在网易当了3年的面试官,一般在面试Java程序员的时候,我主要会从这几个角度,去问这些问题,在这篇文章中,我会用我上一位面试过程来为大家总结,我面试的时候爱问的这些问题!有需要面试的小伙伴可以参考一下...

电影票务APP的“座位锁定”,Redis如何避免冲突?

现在买电影票,真是越来越方便了!再也不用提前老半天跑去电影院排队,在手机APP上动动手指,选好场次、挑好座位,在线支付,一气呵成。尤其是遇到热门大片,或者想抢个“皇帝位”(中间靠后视野好的位置),那个...

Serverless架构下,Redis的用武之地在哪里?

在云计算的演进浪潮中,Serverless(无服务器)架构无疑是一颗璀璨的明星。它将传统服务器的运维复杂性彻底“隐藏”起来,开发者只需关注核心业务逻辑,编写一个个独立的函数(Function-as-a...

高可用聊天系统设计方案(Hyperf实现)

一、系统架构设计1.分层架构图客户端↑↓HTTP/WSAPI网关层(Nginx+Keepalived)↑↓RPC业务服务集群↑↓数据层(MySQLClus...

大厂面试冲刺,Java“实战”问题三连,你碰到了哪个?

推荐学习全网首发!马士兵内部共享—1658页《Java面试突击核心讲》狂刷《Java权威面试指南(阿里版)》,冲击“金九银十”有望了Java“实战”问题三连Java“实战”面试题1:如果用mybati...

企业开发必备的6个Spring Cloud微服务开源项目

今天介绍六款比较热门的SpringCloud微服务项目,感兴趣的可以clone下来研究一下,相信对你学习微服务架构很有帮助。一、Cloud-Platform介绍Cloud-Platform是国内首个基...

系统架构设计方法论:系统演进的四重境界

在架构师面试中,设计能力的考察本质是验证候选人如何将混沌需求转化为可落地的技术方案。这不仅需要扎实的技术功底,更需要系统化的设计思维。以下四大步骤,既是架构设计的核心框架,也是技术决策的动态沙盘推演。...

跨浏览器共享Session信息方法总结

在不同浏览器之间共享Session信息需要克服浏览器间的隔离机制,常见解决方案如下:1.基于Token的跨浏览器传递实现方式:用户在主浏览器生成临时Token(如加密URL或二维码)。其他浏览器通过...

如何设计一套单点登录系统

一、介绍昨天介绍了API接口设计token鉴权方案,其实token鉴权最佳的实践场景就是在单点登录系统上。在企业发展初期,使用的后台管理系统还比较少,一个或者两个。以电商系统为例,在起步阶段,可能只有...

SpringBoot实现单点登录几种方案

前言:单点登录(SingleSign-On,SSO)是企业应用系统中常见的用户认证方案,它允许用户使用一组凭证访问多个相关但独立的系统,无需重复登录。基于Cookie-Session的传统SSO方案...

零基础小白如何学爬虫技术?看一遍就会的详细教程!

你以为爬虫需要精通编程、算法、网络协议才能入门?错了。作为零基础的小白,你完全可以在3周内学会主流网站的数据抓取,核心秘诀就两点:拆分具体目标+倒推式学习。与其纠结Python语法、HTTP协议这...

探秘Java中的分布式锁:优雅地协调分布式系统

探秘Java中的分布式锁:优雅地协调分布式系统在分布式系统的架构中,数据一致性是一个永恒的挑战。当我们需要在多个节点之间协调某些操作时,分布式锁便成为了一种不可或缺的工具。它就像一把钥匙,能够控制对共...

一文读懂 Spring Boot 3 分布式事务解决方案

在当今复杂的业务架构中,分布式事务处理是关键难题之一。随着业务规模的不断扩张,系统架构从单体逐渐演进为分布式,这就要求开发人员能够熟练掌握高效的分布式事务解决方案,以保障数据的一致性和业务的稳定性。今...

取消回复欢迎 发表评论: