百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

教你写一个Redis的库,redis 集群驱动

mhr18 2024-11-13 11:07 15 浏览 0 评论



最近跟同事请教了一下 redis 相关的事情,就找来了一下 redis 的驱动,看看这些库是怎么做 redis cluster的 pipeline 以及 transaction的,以下就把相关流程的代码剖析一下,还是有一些有意思的点的。

因为 C 语言比较底层,其他语言感觉描述性都差了一点,我找的是 elixir 的库来看的,质量很高。

事后才发现原来这个elixir 的 redis 库的作者是 elixir 这门语言的核心开发者;P

正文开始。

首先呢, Elixir 的这个库不支持 redis 集群,后来有人基于它扩展成支持简单的集群,所以先讲普通的怎么做,再扩展。

架构

这个库是单进程异步,当你发命令过来时,此库处理完后会马上发给 Redis 服务器,然后就可以接收新的命令,当 Redis Server 答复时,会返回此Reply给你。

一般连接池有通用的库,所以交给调用方来做,库只处理每个连接的请求。

RESP (REdis Serialization Protocol)

ps,上面这个标题就是来自 redis 官网的,明显 RE是 typo。

Redis 用的协议RESP是自己定的文本协议,客户端与服务端直接通过 TCP 连接通讯。

这个文本协议,其实就是对数据的序列化,以下就是规则:

  • For Simple Strings the first byte of the reply is "+"
  • For Errors the first byte of the reply is "-"
  • For Integers the first byte of the reply is ":"
  • For Bulk Strings the first byte of the reply is "#34;
  • For Arrays the first byte of the reply is "*"

对于客户端而言,发过去给服务器的命令其实数据结构就是数组,所以只需要*数组长度\r\n$数组[0]里命令的长度\r\n 数组[0]里命令。

说起来有点抽象,看看实际例子:

  • LLEN mylist 按照协议 encode就变成 *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n 的文本, 数组里有两个字符串,分别是 4 长度的LLEN以及 6 个字符的mylist
  • SET mykey 1按协议 encode 就变成*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n" 数组里有三个字符串,分别是 3 长度的SET以及 5 个字符的mykey,还有 1 个字符的1 可以看看这个库是怎么做的,就是递归拼接,记录数组的长度,最后在最开头拼上*数组长度。
  @doc ~S"""
  Packs a list of Elixir terms to a Redis (RESP) array.

  This function returns an iodata (instead of a binary) because the packed
  result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can
  be converted to a binary with `IO.iodata_to_binary/1`.

  All elements of `elems` are converted to strings with `to_string/1`, hence
  this function supports encoding everything that implements `String.Chars`.

  ## Examples

      iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1])
      iex> IO.iodata_to_binary(iodata)
      "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"

  """
  @crlf_iodata [?\r, ?\n]
  @spec pack([binary]) :: iodata
  def pack(items) when is_list(items) do
    pack(items, [], 0)
  end

  defp pack([item | rest], acc, count) do
    item = to_string(item)
    new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
    pack(rest, new_acc, count + 1)
  end

  defp pack([], acc, count) do
    [?*, Integer.to_string(count), @crlf_iodata, acc]
  end

维护长连接

作为 client 的库,维护长连接,避免频繁创建连接,这个是常规操作。

而有趣的是,作者使用了 erlang OTP自带的状态机框架 gen_statem 来维持 TCP 长连接,这个功能是OTP 19也就是 16 年才推出的,在不知道此作者是 elixir 语言的贡献者前,我还小小的膜拜了一下。

状态机如下图,初始状态不是同步连接,就是connecting 状态;同步的话,成功就是处于 connected 状态。

状态的动作依靠 TCP 的事件消息来驱动,状态转移自己控制。

例子:

  def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
    {:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
    new_data = %{data | socket_owner: socket_owner}
    {:next_state, :connecting, new_data}
  end

以上代码就是在 discconected状态收到 TCP 的{:timeout, :reconnect}消息,创建一个新的TCP socket进程,将状态转移到:connecting。

而 socket 进程在初始化时,会发送connect消息给自己:

  def handle_info(:connect, state) do
    with {:ok, socket, address} <- Connector.connect(state.opts),
         :ok <- setopts(state, socket, active: :once) do
      send(state.conn, {:connected, self(), socket, address})
      {:noreply, %{state | socket: socket}}
    else
      {:error, reason} -> stop(reason, state)
      {:stop, reason} -> stop(reason, state)
    end
  end

成功了,就发送connected消息给原来的状态机进程(也就是 connection 进程),connection进程处于connecting状态时,接受此消息,更新 socket 信息,状态转移到 connected。

  def connecting(
        :info,
        {:connected, owner, socket, address},
        %__MODULE__{socket_owner: owner} = data
      ) do
    if data.backoff_current do
      :telemetry.execute([:redix, :reconnection], %{}, %{
        connection: data.opts[:name] || self(),
        address: address
      })
    end

    data = %{data | socket: socket, backoff_current: nil, connected_address: address}
    {:next_state, :connected, %{data | socket: socket}}
  end

执行命令

Redis 执行命令主要有 Comand、Pipeline以及Trasaction三种概念:

  • command:一问一答式的,客户端等待 server 返回消息;
  • Pipeline:发送一连串命令,这些命令发往 server,不用一问一答,收到命令马上返回。sever 以队列执行,执行完后全部结果返回回来;
  • Trasaction:依靠MULTI/EXEC命令,MULTI命令开始Trasaction,此后发送的命令都存到 server 的队列里,EXEC命令发送后马上这队列里所有命令;期间不会有其他命令影响这些命令的执行。

库里把 Command 命令用 Pipeline来做,其实本质是一样的。

Pipeline

以下的pipeline就是负责用户调用的函数,:gen_statem.cast就是把消息数据传给状态机,接着就是起了一个进程来监控这个连接,挂了就退出;同时阻塞等待状态机完成处理获得数据后发消息过来。

  def pipeline(conn, commands, timeout) do
    conn = GenServer.whereis(conn)

    request_id = Process.monitor(conn)

    # We cast to the connection process knowing that it will reply at some point,
    # either after roughly timeout or when a response is ready.
    cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
    :ok = :gen_statem.cast(conn, cast)

    receive do
      {^request_id, resp} ->
        _ = Process.demonitor(request_id, [:flush])
        resp

      {:DOWN, ^request_id, _, _, reason} ->
        exit(reason)
    end
  end

状态机这块的代码就是:

  def connected(:cast, {:pipeline, commands, from, timeout}, data) do
    {ncommands, data} = get_client_reply(data, commands)

    if ncommands > 0 do
      {counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})

      row = {counter, from, ncommands, _timed_out? = false}
      :ets.insert(data.table, row)

      case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
        :ok ->
          actions =
            case timeout do
              :infinity -> []
              _other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
            end

          {:keep_state, data, actions}

        {:error, _reason} ->
          # The socket owner will get a closed message at some point, so we just move to the
          # disconnected state.
          :ok = data.transport.close(data.socket)
          {:next_state, :disconnected, data}
      end
    else
      reply(from, {:ok, []})
      {:keep_state, data}
    end
  end

没什么特别的,get_client_reply就是处理客户端是否想得到服务器回复的命令的 CLIENT REPLY的各种指令,

  defp get_client_reply([command | rest], ncommands, client_reply) do
    case parse_client_reply(command) do
      :off -> get_client_reply(rest, ncommands, :off)
      :skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
      :skip -> get_client_reply(rest, ncommands, :skip)
      :on -> get_client_reply(rest, ncommands + 1, :on)
      nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
      nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
      nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
    end
  end

接着就是把命令序列号成 RESP,使用data.transport.send发送给服务器,其实 Redis 除了 TCP 外还可以使用SSL/TLS 协议,所以就有了这一层抽象。

如果是 TCP,那么socket 服务就会在 redis 服务器返回消息后,此函数接收自动处理:

  def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
      when transport in [:tcp, :ssl] do
    :ok = setopts(state, socket, active: :once)
    state = new_data(state, data)
    {:noreply, state}
  end

支持Redis Cluster

Redis Cluster 的分布式算法

官网写的很好了,我简单说一下好了。

Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.

Redis Cluster没有用一致性哈希算法,而是用了hash slot(哈希桶)

There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.

redis 会固定分配 16384 个 slots 到不同的节点,用的算法就是对 key 做 CRC16 然后对 16384取模: HASH_SLOT = CRC16(key) mod 16384

例子如下:

Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:

- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.

This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.

用这样的算法,比一致性哈希方便,更有操作性:

Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.

Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.

对于 redis 或者对用户来说,可以轻松地分配移动 slots;

而一致性哈希就只能自己算虚拟节点,并且『祈求』之后请求量多了最终达到想要的平衡了。

#####redix-cluster

原版没有支持集群,zhongwencool/redix-cluster 写了一个简单的包装版本。

只需要看这段,就很清楚为了集群做了些啥:

  @spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
  def pipeline(pipeline, opts) do
    case RedixCluster.Monitor.get_slot_cache do
      {:cluster, slots_maps, slots, version} ->
         pipeline
           |> parse_keys_from_pipeline
           |> keys_to_slot_hashs
           |> is_same_slot_hashs
           |> get_pool_by_slot(slots_maps, slots, version)
           |> query_redis_pool(pipeline, :pipeline, opts)
      {:not_cluster, version, pool_name} ->
         query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
    end
  end

|> 就是类似 unix 的 管道 |,把函数返回值当做下个函数的第一个参数传给他。

get_slot_cache就是获取redis的cluster slots这个记录,并且缓存起来。

CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.

  • parse_keys_from_pipeline 将全部 keys 从Pineline 命令里提取出来
  • keys_to_slot_hashs 找出 各个key 在哪个 hash slot
  • is_same_slot_hashs 判断所有 key 是不是在同一个 hash slot,是的,这个还不支持跨 slot,我在准备帮他写一个
  • get_pool_by_slot 项目用了连接池来管理,所以要根据名字找对应的连接
  • query_redis_pool 就是调用 原来的 Redix 做处理了

简单来说,这个库就是残废的,哈哈哈。。。

不支持分布不同 slot,就是玩具。

作者:HioHio
链接:https://juejin.im/post/5ed9e0a8e51d457b3b2cab20

相关推荐

MySQL数据库中,数据量越来越大,有什么具体的优化方案么?

个人的观点,这种大表的优化,不一定上来就要分库分表,因为表一旦被拆分,开发、运维的复杂度会直线上升,而大多数公司和开发人员是欠缺这种能力的。所以MySQL中几百万甚至小几千万的表,先考虑做单表的优化。...

Redis的Bitmap(位图):签到打卡、用户在线状态,用它一目了然

你是不是每天打开APP,第一时间就是去“签到打卡”?或者在社交软件里,看到你的朋友头像旁边亮着“在线”的绿灯?这些看似简单的功能背后,都隐藏着一个有趣而高效的数据结构。如果让你来设计一个签到系统:用户...

想知道有多少人看了你的文章?Redis HyperLogLog几KB就搞定!

作为一名内容创作者,你每天最期待的,除了文章阅读量蹭蹭上涨,是不是还特别想知道,到底有多少个“独立用户”阅读了你的文章?这个数字,我们通常称为“UV”(UniqueVisitors),它比总阅读量更...

Redis的“HyperLogLog”:统计网站日活用户,省内存又高效的神器

你可能从未听过这个拗口的名字——“HyperLogLog”,它听起来就像是某个高深莫测的数学公式。但请相信我,理解它的核心思想并不难,而且一旦你掌握了它,你会发现它在处理大数据统计问题时,简直就是“救...

阿里云国际站:为什么我的云服务器运行缓慢?

本文由【云老大】TG@yunlaoda360撰写一、网络性能瓶颈带宽不足现象:上传/下载速度慢,远程连接卡顿。排查:通过阿里云控制台查看网络流量峰值是否接近带宽上限34。解决:升级带宽(如从1M提...

Java 近期新闻:Jakarta EE 11和Spring AI更新、WildFly 36.0 Beta、Infinispan

作者|MichaelRedlich译者|明知山策划|丁晓昀OpenJDKJEP503(移除32位x86移植版本)已从“ProposedtoTarget”状态进入到“T...

腾讯云国际站:怎样设置自动伸缩应对流量高峰?

云计算平台服务以阿里云为例:开通服务与创建伸缩组:登录阿里云控制台,找到弹性伸缩服务并开通。创建伸缩组时,选择地域与可用区,定义伸缩组内最小/最大实例数,绑定已有VPC虚拟交换机。实例模板需...

【案例分享】如何利用京东云建设高可用业务架构

本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业...

Spring Security在前后端分离项目中的使用

1文章导读SpringSecurity是Spring家族中的一个安全管理框架,可以和SpringBoot项目很方便的集成。SpringSecurity框架的两大核心功能:认证和授权认证:...

Redis与Java集成的最佳实践

Redis与Java集成的最佳实践在当今互联网飞速发展的时代,缓存技术的重要性毋庸置疑。Redis作为一款高性能的分布式缓存数据库,与Java语言的结合更是如虎添翼。今天,我们就来聊聊Redis与Ja...

Redis在Java项目中的应用与数据持久化

Redis在Java项目中的应用与数据持久化Redis简介:为什么我们需要它?在Java项目中,Redis就像一位不知疲倦的快跑选手,总能在关键时刻挺身而出。作为一个内存数据库,它在处理高并发请求时表...

Redis 集群最大节点个数是多少?

Redis集群最大节点个数取决于Redis的哈希槽数量,因为每个节点可以负责多个哈希槽。在Redis3.0之前,Redis集群最多支持16384个哈希槽,因此最大节点数为16384个。但是在Redi...

Java开发岗面试宝典:分布式相关问答详解

今天千锋广州Java小编就给大家分享一些就业面试宝典之分布式相关问题,一起来看看吧!1.Redis和Memcache的区别?1、存储方式Memecache把数据全部存在内存之中,断电后会挂掉,数据不...

当Redis内存不足时,除了加内存,还有哪些曲线救国的办法?

作为“速度之王”的Redis,其高性能的秘密武器之一就是将数据存储在内存中。然而,内存资源是有限且昂贵的。当你的Redis实例开始告警“内存不足”,或者写入请求被阻塞时,最直接的解决方案似乎就是“加内...

商品详情页那么多信息,Redis的“哈希”如何优雅存储?

你每天网购时,无论是打开淘宝、京东还是拼多多,看到的商品详情页都琳琅满目:商品名称、价格、库存、图片、描述、评价数量、销量。这些信息加起来,多的惊人。那么问题来了:这些海量的商品信息,程序是去哪里取出...

取消回复欢迎 发表评论: