百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何设计一款基于 MySQL 实现的 Message Queue

mhr18 2024-12-04 13:11 18 浏览 0 评论

EMS

Extend MySQL Stream;

一种基于 MySQL 实现的 stream 队列.

功能

  1. 集群消费、广播消费
  2. 自动重试、死信队列
  3. 快速重置消息位点,快速回放消息,快速查询消息
  4. 消息可基于磁盘积压、消息可快速清理
  5. 监控 group 积压,topic 消息量排行,消息链路追踪,消息消费超时告警;
  6. 读写性能 1200-3000 QPS 左右

写入设计

msg id 就是 topic 维度的自增 id,可对多个 topic 并发写入

  • 针对一个 topic,需要有物理 physics offset, 每次写入,topic 维度的 physics_offset 自增加一 写入需要上锁吗? 看怎么写, 如果使用非原子的形式自增 id,比如数据的的方式,先查出最大 id,再加一,那么必须加锁 如果使用 redis 自增特性实现, 为每个 topic 配置一个自增 key, 则可以避免加锁. redis 实现虽然性能好, 如为配置aof,宕机则可能导致丢失数据, 此时,会出现 offset 重复异常, 过一会随着继续自增, 也就恢复了. topic 维度的自增 id 如果使用 mysql 实现, 性能不堪受辱,因此,此处使用 redis 自增实现(可配置为 mysql 实现); 经过测试,笔记本电脑,单 topic 20 并发写入,qps 在 1000-1500 左右(local mysql & local redis),基本满足业务需求。 考虑到高可用性和业务场景,此处无法使用批量插入
  • 所有的 topic 和 msg 都写入的这一张表中,表数据定时清理,消费完的消息,可提前删除。
  • 注意,本方案写入性能瓶颈是 MySQL Server 的性能瓶颈。

读取设计

  1. 假设针对一个 topic,只有一个 consumer,只需循环读取,然后更新 offset 即可。 但结合实际业务场景,这种基本不存在,所以,忽略这种场景。 通常,一个 topic 有多个 consumer group(简称 tg), 一个 consumer group 有多个 client(jvm or thread)
  2. 如果一个 topic + group(简称 tg),有多个 consumer,每个 consumer 有多个线程,读取和更新 offset 则会有并发问题, 如下图。
  • 这个 client id,我们将其设计为,ip + pid + uuid + thread id; ip 和 pid 可帮助我们追溯问题 uuid 简单防重复 thread id,一种性能优化,下面细说。
  • 结合实际业务场景,且遵循 simple is better 原则,读取时,使用上锁的方式解决并发问题。锁的粒度就是 tg
  • 考虑到要实现基本的顺序读取和防止重复消费,多线程并发时,我们应当实现基于自增的形式读取 msg;每个 clientid 读取消息后,都会记录一个简单的log,并在 tg 维度增加一个 max offset
  • 每次读取消息时,每个 client 都需要去检查当前想要读取的 tg 是否已经有【其他 client】在操作 max offset。即,我们将锁的粒度缩小到了 max offset; 整体原则是,一个 t + g 的 max offset,同时只能有一个 thread 操作(写和更新) 如果有其他人在读取,则阻塞 如果没有其他人在读取,则锁住这个 tg, 并批量拉取一定数量的消息 id, 对这个 tg 维度的 max offset + n 批量插入这个 tg + clientid offset log,表明这个消息被这个 clientid 读取了,同时也间接更新了 max offset(order by offset) 释放锁 拉取刚刚读取的 msg id list 里面的消息体 交给业务处理消息
  1. ack
    1. 对于集群消息,如何保证在断电情况下,消息不丢失,使用数据库存储消息, 写入即不会丢失, 但消费时, 如果刚刚读进内存就立刻宕机,则需要在重启时恢复消息.
    2. 每个 client get 到消息后,都需要记录 msg pid,consumer group,state(start、done,retry)为 start 状态
    3. ack success,将 log update 为 done 状态
    4. ack fail 后,将 log update 为 retry 状态,同时将消息存入重试队列 这里需要上锁吗?其实是不需要的,因为更新的维度是 client id 的 log,不存在并发更新. 这里更新状态是表示这些消息已经处理结束了,否则无法判定宕机场景。 对于 start 状态的消息,定时任务会去检查 如果 client 还存活,超过 1 分钟(可配),则将其捞出,放进重试队列,并在 10s 进行第一次重试 如果 client 还存活,则立刻将其捞出,放进重试队列,并在 10s 进行第一次重试
    5. ack 是批量的,ack 失败,仅会导致重复消费。
  2. 广播消息
    1. 是否为广播消息由 topic 确定
    2. 广播消息不需要上锁,每一个订阅该 topic 的 client 都会读取到该消息
    3. 广播消息不需要 ack,不需要记录成功或失败或重试,仅需要内存里记录 offset
    4. 推荐尽可能使用集群模式,使用集群模式模拟广播模式
  3. client id
    1. 只有 consumer 需要 client id
    2. client id 由 ip pid uuid + thread id 组成, 可溯源.
    3. client id 需要续约(5s),如果机器宕机,则会被自动清除,且他的 start 状态的消息会进入重试队列,交给同 group 的其他 client
    4. client id 可以自己主动注销,注销前,自己内存的消息应当被优雅消费结束,一般来讲,kill -15 的 jvm 都会主动注销 client id;

核心表设计

  1. topic 表:记录topic 元信息
  2. group 表:记录 group 订阅元信息
  3. msg 表:msg总表,记录写入的信息,包含 body 和 topic 维度的自增 offset,类似 rocketmq commit log 该表会被多个 consumer 消费的消息 该表会被定制删除过期数据
  4. retry msg 表,消费失败、超时的消息,会进入该表,并按阶梯定时消费
  5. dead msg 表,消费重试 16(any config) 次的消息,会进入该表
  6. topic_group_log 表:记录 consumer group client 的 msg 消费记录,包含 state(start、done,retry) 字段,可 ack 该表的记录行数会非常多,单行数据较少,可自动删除 done 的记录

如上文所说,由于本方案未采用常见的多 queue 和多 partition 的设计,因此瓶颈在于上图提到的分布式锁的设计上,具体链路为 consumer group client 在集群消费时, 为了让并发读取的 thread 拉取到的消息尽可能准确,使用上锁的方式来实现。

总体看下来, 可以简单理解为, ems 失去了性能, 却拥有了所有.


作者:莫那鲁道
链接:https://juejin.cn/post/7274555781486231609

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: