教你写一个Redis的库,redis 集群驱动
mhr18 2024-11-13 11:07 24 浏览 0 评论
最近跟同事请教了一下 redis 相关的事情,就找来了一下 redis 的驱动,看看这些库是怎么做 redis cluster的 pipeline 以及 transaction的,以下就把相关流程的代码剖析一下,还是有一些有意思的点的。
因为 C 语言比较底层,其他语言感觉描述性都差了一点,我找的是 elixir 的库来看的,质量很高。
事后才发现原来这个elixir 的 redis 库的作者是 elixir 这门语言的核心开发者;P
正文开始。
首先呢, Elixir 的这个库不支持 redis 集群,后来有人基于它扩展成支持简单的集群,所以先讲普通的怎么做,再扩展。
架构
这个库是单进程异步,当你发命令过来时,此库处理完后会马上发给 Redis 服务器,然后就可以接收新的命令,当 Redis Server 答复时,会返回此Reply给你。
一般连接池有通用的库,所以交给调用方来做,库只处理每个连接的请求。
RESP (REdis Serialization Protocol)
ps,上面这个标题就是来自 redis 官网的,明显 RE是 typo。
Redis 用的协议RESP是自己定的文本协议,客户端与服务端直接通过 TCP 连接通讯。
这个文本协议,其实就是对数据的序列化,以下就是规则:
- For Simple Strings the first byte of the reply is "+"
- For Errors the first byte of the reply is "-"
- For Integers the first byte of the reply is ":"
- For Bulk Strings the first byte of the reply is "#34;
- For Arrays the first byte of the reply is "*"
对于客户端而言,发过去给服务器的命令其实数据结构就是数组,所以只需要*数组长度\r\n$数组[0]里命令的长度\r\n 数组[0]里命令。
说起来有点抽象,看看实际例子:
- LLEN mylist 按照协议 encode就变成 *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n 的文本, 数组里有两个字符串,分别是 4 长度的LLEN以及 6 个字符的mylist
- SET mykey 1按协议 encode 就变成*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n" 数组里有三个字符串,分别是 3 长度的SET以及 5 个字符的mykey,还有 1 个字符的1 可以看看这个库是怎么做的,就是递归拼接,记录数组的长度,最后在最开头拼上*数组长度。
@doc ~S"""
Packs a list of Elixir terms to a Redis (RESP) array.
This function returns an iodata (instead of a binary) because the packed
result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can
be converted to a binary with `IO.iodata_to_binary/1`.
All elements of `elems` are converted to strings with `to_string/1`, hence
this function supports encoding everything that implements `String.Chars`.
## Examples
iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1])
iex> IO.iodata_to_binary(iodata)
"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"
"""
@crlf_iodata [?\r, ?\n]
@spec pack([binary]) :: iodata
def pack(items) when is_list(items) do
pack(items, [], 0)
end
defp pack([item | rest], acc, count) do
item = to_string(item)
new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
pack(rest, new_acc, count + 1)
end
defp pack([], acc, count) do
[?*, Integer.to_string(count), @crlf_iodata, acc]
end
维护长连接
作为 client 的库,维护长连接,避免频繁创建连接,这个是常规操作。
而有趣的是,作者使用了 erlang OTP自带的状态机框架 gen_statem 来维持 TCP 长连接,这个功能是OTP 19也就是 16 年才推出的,在不知道此作者是 elixir 语言的贡献者前,我还小小的膜拜了一下。
状态机如下图,初始状态不是同步连接,就是connecting 状态;同步的话,成功就是处于 connected 状态。
状态的动作依靠 TCP 的事件消息来驱动,状态转移自己控制。
例子:
def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
{:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
new_data = %{data | socket_owner: socket_owner}
{:next_state, :connecting, new_data}
end
以上代码就是在 discconected状态收到 TCP 的{:timeout, :reconnect}消息,创建一个新的TCP socket进程,将状态转移到:connecting。
而 socket 进程在初始化时,会发送connect消息给自己:
def handle_info(:connect, state) do
with {:ok, socket, address} <- Connector.connect(state.opts),
:ok <- setopts(state, socket, active: :once) do
send(state.conn, {:connected, self(), socket, address})
{:noreply, %{state | socket: socket}}
else
{:error, reason} -> stop(reason, state)
{:stop, reason} -> stop(reason, state)
end
end
成功了,就发送connected消息给原来的状态机进程(也就是 connection 进程),connection进程处于connecting状态时,接受此消息,更新 socket 信息,状态转移到 connected。
def connecting(
:info,
{:connected, owner, socket, address},
%__MODULE__{socket_owner: owner} = data
) do
if data.backoff_current do
:telemetry.execute([:redix, :reconnection], %{}, %{
connection: data.opts[:name] || self(),
address: address
})
end
data = %{data | socket: socket, backoff_current: nil, connected_address: address}
{:next_state, :connected, %{data | socket: socket}}
end
执行命令
Redis 执行命令主要有 Comand、Pipeline以及Trasaction三种概念:
- command:一问一答式的,客户端等待 server 返回消息;
- Pipeline:发送一连串命令,这些命令发往 server,不用一问一答,收到命令马上返回。sever 以队列执行,执行完后全部结果返回回来;
- Trasaction:依靠MULTI/EXEC命令,MULTI命令开始Trasaction,此后发送的命令都存到 server 的队列里,EXEC命令发送后马上这队列里所有命令;期间不会有其他命令影响这些命令的执行。
库里把 Command 命令用 Pipeline来做,其实本质是一样的。
Pipeline
以下的pipeline就是负责用户调用的函数,:gen_statem.cast就是把消息数据传给状态机,接着就是起了一个进程来监控这个连接,挂了就退出;同时阻塞等待状态机完成处理获得数据后发消息过来。
def pipeline(conn, commands, timeout) do
conn = GenServer.whereis(conn)
request_id = Process.monitor(conn)
# We cast to the connection process knowing that it will reply at some point,
# either after roughly timeout or when a response is ready.
cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
:ok = :gen_statem.cast(conn, cast)
receive do
{^request_id, resp} ->
_ = Process.demonitor(request_id, [:flush])
resp
{:DOWN, ^request_id, _, _, reason} ->
exit(reason)
end
end
状态机这块的代码就是:
def connected(:cast, {:pipeline, commands, from, timeout}, data) do
{ncommands, data} = get_client_reply(data, commands)
if ncommands > 0 do
{counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})
row = {counter, from, ncommands, _timed_out? = false}
:ets.insert(data.table, row)
case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
:ok ->
actions =
case timeout do
:infinity -> []
_other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
end
{:keep_state, data, actions}
{:error, _reason} ->
# The socket owner will get a closed message at some point, so we just move to the
# disconnected state.
:ok = data.transport.close(data.socket)
{:next_state, :disconnected, data}
end
else
reply(from, {:ok, []})
{:keep_state, data}
end
end
没什么特别的,get_client_reply就是处理客户端是否想得到服务器回复的命令的 CLIENT REPLY的各种指令,
defp get_client_reply([command | rest], ncommands, client_reply) do
case parse_client_reply(command) do
:off -> get_client_reply(rest, ncommands, :off)
:skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
:skip -> get_client_reply(rest, ncommands, :skip)
:on -> get_client_reply(rest, ncommands + 1, :on)
nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
end
end
接着就是把命令序列号成 RESP,使用data.transport.send发送给服务器,其实 Redis 除了 TCP 外还可以使用SSL/TLS 协议,所以就有了这一层抽象。
如果是 TCP,那么socket 服务就会在 redis 服务器返回消息后,此函数接收自动处理:
def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
when transport in [:tcp, :ssl] do
:ok = setopts(state, socket, active: :once)
state = new_data(state, data)
{:noreply, state}
end
支持Redis Cluster
Redis Cluster 的分布式算法
官网写的很好了,我简单说一下好了。
Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.
Redis Cluster没有用一致性哈希算法,而是用了hash slot(哈希桶)
There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.
redis 会固定分配 16384 个 slots 到不同的节点,用的算法就是对 key 做 CRC16 然后对 16384取模: HASH_SLOT = CRC16(key) mod 16384
例子如下:
Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:
- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.
This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.
用这样的算法,比一致性哈希方便,更有操作性:
Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.
Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.
对于 redis 或者对用户来说,可以轻松地分配移动 slots;
而一致性哈希就只能自己算虚拟节点,并且『祈求』之后请求量多了最终达到想要的平衡了。
#####redix-cluster
原版没有支持集群,zhongwencool/redix-cluster 写了一个简单的包装版本。
只需要看这段,就很清楚为了集群做了些啥:
@spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
def pipeline(pipeline, opts) do
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
pipeline
|> parse_keys_from_pipeline
|> keys_to_slot_hashs
|> is_same_slot_hashs
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(pipeline, :pipeline, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
end
end
|> 就是类似 unix 的 管道 |,把函数返回值当做下个函数的第一个参数传给他。
get_slot_cache就是获取redis的cluster slots这个记录,并且缓存起来。
CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.
- parse_keys_from_pipeline 将全部 keys 从Pineline 命令里提取出来
- keys_to_slot_hashs 找出 各个key 在哪个 hash slot
- is_same_slot_hashs 判断所有 key 是不是在同一个 hash slot,是的,这个还不支持跨 slot,我在准备帮他写一个
- get_pool_by_slot 项目用了连接池来管理,所以要根据名字找对应的连接
- query_redis_pool 就是调用 原来的 Redix 做处理了
简单来说,这个库就是残废的,哈哈哈。。。
不支持分布不同 slot,就是玩具。
作者:HioHio
链接:https://juejin.im/post/5ed9e0a8e51d457b3b2cab20
相关推荐
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
-
AI-generatedimageAsianFin--Dubaiisrapidlytransformingitselffromadesertoilhubintoaglob...
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
-
TMTPOST--OpenAIisescalatingthepricewarinlargelanguagemodel(LLM)whileseekingpartnershi...
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
-
,抓住风口(iOS用户请用电脑端打开小程序)本期要点:详解2025年大热点你好,我是王煜全,这里是王煜全要闻评论。最近,有个词被各个科技大佬反复提及——AIAgent,智能体。黄仁勋在CES展的发布...
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
-
1、本文属于mini商城系列文档的第0章,由于篇幅原因,这篇文章拆成了6部分,本文属于第5部分2、mini商城项目详细文档及代码见CSDN:https://blog.csdn.net/Eclipse_...
- Python+Appium环境搭建与自动化教程
-
以下是保姆级教程,手把手教你搭建Python+Appium环境并实现简单的APP自动化测试:一、环境搭建(Windows系统)1.安装Python访问Python官网下载最新版(建议...
- 零配置入门:用VSCode写Java代码的正确姿
-
一、环境准备:安装JDK,让电脑“听懂”Java目标:安装Java开发工具包(JDK),配置环境变量下载JDKJava程序需要JDK(JavaDevelopmentKit)才能运行和编译。以下是两...
- Mycat的搭建以及配置与启动(mycat2)
-
1、首先开启服务器相关端口firewall-cmd--permanent--add-port=9066/tcpfirewall-cmd--permanent--add-port=80...
- kubernetes 部署mysql应用(k8s mysql部署)
-
这边仅用于测试环境,一般生产环境mysql不建议使用容器部署。这里假设安装mysql版本为mysql8.0.33一、创建MySQL配置(ConfigMap)#mysql-config.yaml...
- Spring Data Jpa 介绍和详细入门案例搭建
-
1.SpringDataJPA的概念在介绍SpringDataJPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射(Object-Re...
- 量子点格棋上线!“天衍”邀您执子入局
-
你是否能在策略上战胜量子智能?这不仅是一场博弈更是一次量子智力的较量——量子点格棋正式上线!试试你能否赢下这场量子智局!游戏玩法详解一笔一画间的策略博弈游戏目标:封闭格子、争夺领地点格棋的基本目标是利...
- 美国将与阿联酋合作建立海外最大的人工智能数据中心
-
当地时间5月15日,美国白宫宣布与阿联酋合作建立人工智能数据中心园区,据称这是美国以外最大的人工智能园区。阿布扎比政府支持的阿联酋公司G42及多家美国公司将在阿布扎比合作建造容量为5GW的数据中心,占...
- 盘后股价大涨近8%!甲骨文的业绩及指引超预期?
-
近期,美股的AI概念股迎来了一波上升行情,微软(MSFT.US)频创新高,英伟达(NVDA.US)、台积电(TSM.US)、博通(AVGO.US)、甲骨文(ORCL.US)等多股亦出现显著上涨。而从基...
- 甲骨文预计新财年云基础设施营收将涨超70%,盘后一度涨8% | 财报见闻
-
甲骨文(Oracle)周三盘后公布财报显示,该公司第四财季业绩超预期,虽然云基建略微逊于预期,但管理层预计2026财年云基础设施营收预计将增长超过70%,同时资本支出继上年猛增三倍后,新财年将继续增至...
- Springboot数据访问(整合MongoDB)
-
SpringBoot整合MongoDB基本概念MongoDB与我们之前熟知的关系型数据库(MySQL、Oracle)不同,MongoDB是一个文档数据库,它具有所需的可伸缩性和灵活性,以及所需的查询和...
- Linux环境下,Jmeter压力测试的搭建及报错解决方法
-
概述 Jmeter最早是为了测试Tomcat的前身JServ的执行效率而诞生的。到目前为止,它的最新版本是5.3,其测试能力也不再仅仅只局限于对于Web服务器的测试,而是涵盖了数据库、JM...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Dubai's AI Boom Lures Global Tech as Emirate Reinvents Itself as Middle East's Silicon Gateway
- OpenAI Releases o3-pro, Cuts o3 Prices by 80% as Deal with Google Cloud Reported to Make for Compute Needs
- 黄仁勋说AI Agent才是未来!但究竟有些啥影响?
- 商城微服务项目组件搭建(五)——Kafka、Tomcat等安装部署
- Python+Appium环境搭建与自动化教程
- 零配置入门:用VSCode写Java代码的正确姿
- Mycat的搭建以及配置与启动(mycat2)
- kubernetes 部署mysql应用(k8s mysql部署)
- Spring Data Jpa 介绍和详细入门案例搭建
- 量子点格棋上线!“天衍”邀您执子入局
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (56)
- oracle 乐观锁 (53)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)