百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

深入剖析Redis客户端Jedis的特性和原理

mhr18 2024-12-01 08:53 16 浏览 0 评论

一、开篇


Redis作为目前通用的缓存选型,因其高性能而倍受欢迎。Redis的2.x版本仅支持单机模式,从3.0版本开始引入集群模式。


Redis的Java生态的客户端当中包含Jedis、Redisson、Lettuce,不同的客户端具备不同的能力是使用方式,本文主要分析Jedis客户端。


Jedis客户端同时支持单机模式、分片模式、集群模式的访问模式,通过构建Jedis类对象实现单机模式下的数据访问,通过构建ShardedJedis类对象实现分片模式的数据访问,通过构建JedisCluster类对象实现集群模式下的数据访问。


Jedis客户端支持单命令和Pipeline方式访问Redis集群,通过Pipeline的方式能够提高集群访问的效率。


本文的整体分析基于Jedis的3.5.0版本进行分析,相关源码均参考此版本。


二、Jedis访问模式对比


Jedis客户端操作Redis主要分为三种模式,分表是单机模式、分片模式、集群模式。


  • 单机模式主要是创建Jedis对象来操作单节点的Redis,只适用于访问单个Redis节点。


  • 分片模式(ShardedJedis)主要是通过创建ShardedJedisPool对象来访问分片模式的多个Redis节点,是Redis没有集群功能之前客户端实现的一个数据分布式方案,本质上是客户端通过一致性哈希来实现数据分布式存储。


  • 集群模式(JedisCluster)主要是通过创建JedisCluster对象来访问集群模式下的多个Redis节点,是Redis3.0引入集群模式后客户端实现的集群访问访问,本质上是通过引入槽(slot)概念以及通过CRC16哈希槽算法来实现数据分布式存储。


单机模式不涉及任何分片的思想,所以我们着重分析分片模式集群模式的理念。


2.1 分片模式


  • 分片模式本质属于基于客户端的分片,在客户端实现如何根据一个key找到Redis集群中对应的节点的方案。


  • Jedis的客户端分片模式采用一致性Hash来实现,一致性Hash算法的好处是当Redis节点进行增减时只会影响新增或删除节点前后的小部分数据,相对于取模等算法来说对数据的影响范围较小。


  • Redis在大部分场景下作为缓存进行使用,所以不用考虑数据丢失致使缓存穿透造成的影响,在Redis节点增减时可以不用考虑部分数据无法命中的问题。


分片模式的整体应用如下图所示,核心在于客户端的一致性Hash策略。


(引用自:www.cnblogs.com)


2.2 集群模式


  • 集群模式本质属于服务器分片技术,由Redis集群本身提供分片功能,从Redis 3.0版本开始正式提供。


  • 集群的原理是:一个 Redis 集群包含16384 个哈希槽(Hash slot), Redis保存的每个键都属于这16384个哈希槽的其中一个, 集群使用公式CRC16(key)%16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键key的CRC16校验和 。


  • 集群中的每个节点负责处理一部分哈希槽。举个例子, 一个集群可以有三个哈希槽, 其中:

节点 A 负责处理 0 号至 5500 号哈希槽。

节点 B 负责处理 5501 号至 11000 号哈希槽。

节点 C 负责处理 11001 号至 16383 号哈希槽。


  • Redis在集群模式下对于key的读写过程首先将对应的key值进行CRC16计算得到对应的哈希值,将哈希值对槽位总数取模映射到对应的槽位,最终映射到对应的节点进行读写。以命令set("key", "value")为例子,它会使用CRC16算法对key进行计算得到哈希值28989,然后对16384进行取模得到12605,最后找到12605对应的Redis节点,最终跳转到该节点执行set命令。


  • 集群模式的整体应用如下图所示,核心在于集群哈希槽的设计以及重定向命令。


(引用自:https://www.jianshu.com)


三、Jedis的基础用法


// Jedis单机模式的访问public void main(String[] args) { // 创建Jedis对象 jedis = new Jedis("localhost", 6379); // 执行hmget操作 jedis.hmget("foobar", "foo"); // 关闭Jedis对象 jedis.close();} // Jedis分片模式的访问public void main(String[] args) { HostAndPort redis1 = HostAndPortUtil.getRedisServers().get(0); HostAndPort redis2 = HostAndPortUtil.getRedisServers().get(1); List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(2); JedisShardInfo shard1 = new JedisShardInfo(redis1); JedisShardInfo shard2 = new JedisShardInfo(redis2); // 创建ShardedJedis对象 ShardedJedis shardedJedis = new ShardedJedis(shards); // 通过ShardedJedis对象执行set操作 shardedJedis.set("a", "bar");} // Jedis集群模式的访问public void main(String[] args) { // 构建redis的集群池 Set<HostAndPort> nodes = new HashSet<>(); nodes.add(new HostAndPort("127.0.0.1", 7001)); nodes.add(new HostAndPort("127.0.0.1", 7002)); nodes.add(new HostAndPort("127.0.0.1", 7003));  // 创建JedisCluster JedisCluster cluster = new JedisCluster(nodes);  // 执行JedisCluster对象中的方法 cluster.set("cluster-test", "my jedis cluster test"); String result = cluster.get("cluster-test");}


Jedis通过创建Jedis的类对象来实现单机模式下的数据访问,通过构建JedisCluster类对象来实现集群模式下的数据访问。


要理解Jedis的访问Redis的整个过程,可以通过先理解单机模式下的访问流程,在这个基础上再分析集群模式的访问流程会比较合适。


四、Jedis单机模式的访问


Jedis访问单机模式Redis的整体流程图如下所示,从图中可以看出核心的流程包含Jedis对象的创建以及通过Jedis对象实现Redis的访问。


熟悉Jedis访问单机Redis的过程,本身就是需要了解Jedis的创建过程以及执行Redis命令的过程。


  • Jedis的创建过程核心在于创建Jedis对象以及Jedis内部变量Client对象。

  • Jedis访问Redis的过程在于通过Jedis内部的Client对象访问Redis。



4.1 创建过程


Jedis本身的类关系图如下图所示,从图中我们能够看到Jedis继承自BinaryJedis类。


在BinaryJedis类中存在和Redis对接的Client类对象,Jedis通过父类的BinaryJedis的Client对象实现Redis的读写。



Jedis类在创建过程中通过父类BinaryJedis创建了Client对象,而了解Client对象是进一步理解访问过程的关键。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {  protected JedisPoolAbstract dataSource = ;  public Jedis(final String host, final int port) { // 创建父类BinaryJedis对象 super(host, port); }} public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands, AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {  // 访问redis的Client对象 protected Client client = ;  public BinaryJedis(final String host, final int port) { // 创建Client对象访问redis client = new Client(host, port); }}


Client类的类关系图如下图所示,Client对象继承自BinaryClient和Connection类。在BinaryClient类中存在Redis访问密码等相关参数,在Connection类在存在访问Redis的socket对象以及对应的输入输出流。本质上Connection是和Redis进行通信的核心类。



Client类在创建过程中初始化核心父类Connection对象,而Connection是负责和Redis直接进行通信。

public class Client extends BinaryClient implements Commands { public Client(final String host, final int port) { super(host, port); }} public class BinaryClient extends Connection { // 存储和Redis连接的相关信息 private boolean isInMulti; private String user; private String password; private int db; private boolean isInWatch;  public BinaryClient(final String host, final int port) { super(host, port); }} public class Connection implements Closeable { // 管理和Redis连接的socket信息及对应的输入输出流 private JedisSocketFactory jedisSocketFactory; private Socket socket; private RedisOutputStream outputStream; private RedisInputStream inputStream; private int infiniteSoTimeout = 0; private boolean broken = false;  public Connection(final String host, final int port, final boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { // 构建DefaultJedisSocketFactory来创建和Redis连接的Socket对象 this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); }}


4.2 访问过程


以Jedis执行set命令为例,整个过程如下:

  • Jedis的set操作是通过Client的set操作来实现的。

  • Client的set操作是通过父类Connection的sendCommand来实现。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands, AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands { @Override public String set(final String key, final String value) { checkIsInMultiOrPipeline(); // client执行set操作 client.set(key, value); return client.getStatusCodeReply(); }} public class Client extends BinaryClient implements Commands { @Override public void set(final String key, final String value) { // 执行set命令 set(SafeEncoder.encode(key), SafeEncoder.encode(value)); }} public class BinaryClient extends Connection { public void set(final byte[] key, final byte[] value) { // 发送set指令 sendCommand(SET, key, value); }} public class Connection implements Closeable { public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { // socket连接redis connect(); // 按照redis的协议发送命令 Protocol.sendCommand(outputStream, cmd, args); } catch (JedisConnectionException ex) { } }}


五、Jedis分片模式的访问


基于前面已经介绍的Redis分片模式的一致性Hash的原理来理解Jedis的分片模式的访问。


关于Redis分片模式的概念:Redis在3.0版本之前没有集群模式的概念,这导致单节点能够存储的数据有限,通过Redis的客户端如Jedis在客户端通过一致性Hash算法来实现数据的分片存储。


本质上Redis的分片模式跟Redis本身没有任何关系,只是通过客户端来解决单节点数据有限存储的问题。


ShardedJedis访问Redis的核心在于构建对象的时候初始化一致性Hash对象,构建一致性Hash经典的Hash值和node的映射关系。构建完映射关系后执行set等操作就是Hash值到node的寻址过程,寻址完成后直接进行单节点的操作。



5.1 创建过程


ShardedJedis的创建过程在于父类的Sharded中关于一致性Hash相关的初始化过程,核心在于构建一致性的虚拟节点以及虚拟节点和Redis节点的映射关系。


源码中最核心的部分代码在于根据根据权重映射成未160个虚拟节点,通过虚拟节点来定位到具体的Redis节点。

public class Sharded<R, S extends ShardInfo<R>> {  public static final int DEFAULT_WEIGHT = 1; // 保存虚拟节点和redis的node节点的映射关系 private TreeMap<Long, S> nodes; // hash算法 private final Hashing algo; // 保存redis节点和访问该节点的Jedis的连接信息 private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();  public Sharded(List<S> shards, Hashing algo) { this.algo = algo; initialize(shards); }  private void initialize(List<S> shards) { nodes = new TreeMap<>(); // 遍历每个redis的节点并设置hash值到节点的映射关系 for (int i = 0; i != shards.size(); ++i) { final S shardInfo = shards.get(i); // 根据权重映射成未160个虚拟节点 int N = 160 * shardInfo.getWeight(); if (shardInfo.getName() == ) for (int n = 0; n < N; n++) { // 构建hash值和节点映射关系 nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo); } else for (int n = 0; n < N; n++) { nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo); } // 保存每个节点的访问对象 resources.put(shardInfo, shardInfo.createResource()); } }}


5.2 访问过程


ShardedJedis的访问过程就是一致性Hash的计算过程,核心的逻辑就是:通过Hash算法对访问的key进行Hash计算生成Hash值,根据Hash值获取对应Redis节点,根据对应的Redis节点获取对应的访问对象Jedis。


获取访问对象Jedis之后就可以直接进行命令操作。

public class Sharded<R, S extends ShardInfo<R>> {  public static final int DEFAULT_WEIGHT = 1; private TreeMap<Long, S> nodes; private final Hashing algo; // 保存redis节点和访问该节点的Jedis的连接信息 private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();  public R getShard(String key) { // 根据redis节点找到对应的访问对象Jedis return resources.get(getShardInfo(key)); }  public S getShardInfo(String key) { return getShardInfo(SafeEncoder.encode(getKeyTag(key))); }  public S getShardInfo(byte[] key) { // 针对访问的key生成对应的hash值 // 根据hash值找到对应的redis节点 SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); }}


六、Jedis集群模式的访问


基于前面介绍的Redis的集群原理来理解Jedis的集群模式的访问。


Jedis能够实现key和哈希槽的定位的核心机制在于哈希槽和Redis节点的映射,而这个发现过程基于Redis的cluster slot命令。


关于Redis集群操作的命令:Redis通过cluster slots会返回Redis集群的整体状况。返回每一个Redis节点的信息包含:


  • 哈希槽起始编号

  • 哈希槽结束编号

  • 哈希槽对应master节点,节点使用IP/Port表示

  • master节点的第一个副本

  • master节点的第二个副本


127.0.0.1:30001> cluster slots1) 1) (integer) 0 // 开始槽位 2) (integer) 5460 // 结束槽位 3) 1) "127.0.0.1" // master节点的host 2) (integer) 30001 // master节点的port 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // 节点的编码 4) 1) "127.0.0.1" // slave节点的host 2) (integer) 30004 // slave节点的port 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // 节点的编码2) 1) (integer) 5461 2) (integer) 10922 3) 1) "127.0.0.1" 2) (integer) 30002 3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013" 4) 1) "127.0.0.1" 2) (integer) 30005 3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"3) 1) (integer) 10923 2) (integer) 16383 3) 1) "127.0.0.1" 2) (integer) 30003 3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1" 4) 1) "127.0.0.1" 2) (integer) 30006 3) "58e6e48d41228013e5d9c1c37c5060693925e97e"


Jedis访问集群模式Redis的整体流程图如下所示,从图中可以看出核心的流程包含JedisCluster对象的创建以及通过JedisCluster对象实现Redis的访问。


JedisCluster对象的创建核心在于创建JedisClusterInfoCache对象并通过集群发现来建立slot和集群节点的映射关系。


JedisCluster对Redis集群的访问在于获取key所在的Redis节点并通过Jedis对象进行访问。



6.1 创建过程


JedisCluster的类关系如下图所示,在图中可以看到核心变量JedisSlotBasedConnectionHandler对象。



JedisCluster的父类BinaryJedisCluster创建了JedisSlotBasedConnectionHandler对象,该对象负责和Redis的集群进行通信。

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands { public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {  // 访问父类BinaryJedisCluster super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap); }} public class BinaryJedisCluster implements BinaryJedisClusterCommands, MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable { public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String user, String password, String clientName, GenericObjectPoolConfig poolConfig, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {  // 创建JedisSlotBasedConnectionHandler对象 this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);  this.maxAttempts = maxAttempts; }}


JedisSlotBasedConnectionHandler的核心在于创建并初始化JedisClusterInfoCache对象,该对象缓存了Redis集群的信息。


JedisClusterInfoCache对象的初始化过程通过initializeSlotsCache来完成,主要目的用于实现集群节点和槽位发现。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler { public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String user, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {  super(nodes, poolConfig, connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap); }} public abstract class JedisClusterConnectionHandler implements Closeable { public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {  // 创建JedisClusterInfoCache对象 this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);  // 初始化jedis的Slot信息 initializeSlotsCache(nodes, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier); }   private void initializeSlotsCache(Set<HostAndPort> startNodes, int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { for (HostAndPort hostAndPort : startNodes) {  try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) {  // 通过discoverClusterNodesAndSlots进行集群发现 cache.discoverClusterNodesAndSlots(jedis); return; } catch (JedisConnectionException e) { } } }}


JedisClusterInfoCache的nodes用来保存Redis集群的节点信息,slots用来保存槽位和集群节点的信息。


nodes和slots维持的对象都是JedisPool对象,该对象维持了和Redis的连接信息。集群的发现过程由discoverClusterNodesAndSlots来实现,本质是执行Redis的集群发现命令cluster slots实现的。

public class JedisClusterInfoCache { // 负责保存redis集群的节点信息 private final Map<String, JedisPool> nodes = new HashMap<>(); // 负责保存redis的槽位和redis节点的映射关系 private final Map<Integer, JedisPool> slots = new HashMap<>();  // 负责集群的发现逻辑 public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock();  try { reset(); List<Object> slots = jedis.clusterSlots();  for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj;  if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 获取redis节点对应的槽位信息 List<Integer> slotNums = getAssignedSlotArray(slotInfo);  // hostInfos int size = slotInfo.size(); for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.isEmpty()) { continue; }  HostAndPort targetNode = generateHostAndPort(hostInfos); // 负责保存redis节点信息 setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { // 负责保存槽位和redis节点的映射关系 assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock(); } }  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); // 保存槽位和对应的JedisPool对象 for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } }  public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { // 生产redis节点对应的nodeKey String nodeKey = getNodeKey(node); JedisPool existingPool = nodes.get(nodeKey); if (existingPool != ) return existingPool; // 生产redis节点对应的JedisPool JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, user, password, 0, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier); // 保存redis节点的key和对应的JedisPool对象 nodes.put(nodeKey, nodePool); return nodePool; } finally { w.unlock(); } }}


JedisPool的类关系如下图所示,其中内部internalPool是通过apache common pool来实现的池化。



JedisPool内部的internalPool通过JedisFactory的makeObject来创建Jedis对象。


每个Redis节点都会对应一个JedisPool对象,通过JedisPool来管理Jedis的申请释放复用等。

public class JedisPool extends JedisPoolAbstract {  public JedisPool() { this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT); }} public class JedisPoolAbstract extends Pool<Jedis> {  public JedisPoolAbstract() { super(); }} public abstract class Pool<T> implements Closeable { protected GenericObjectPool<T> internalPool;  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) { if (this.internalPool != ) { try { closeInternalPool(); } catch (Exception e) { } } this.internalPool = new GenericObjectPool<>(factory, poolConfig); }} class JedisFactory implements PooledObjectFactory<Jedis> {  @Override public PooledObject<Jedis> makeObject() throws Exception { // 创建Jedis对象 final HostAndPort hp = this.hostAndPort.get(); final Jedis jedis = new Jedis(hp.getHost(), hp.getPort(), connectionTimeout, soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);  try { // Jedis对象连接 jedis.connect(); if (user != ) { jedis.auth(user, password); } else if (password != ) { jedis.auth(password); } if (database != 0) { jedis.select(database); } if (clientName != ) { jedis.clientSetname(clientName); } } catch (JedisException je) { jedis.close(); throw je; } // 将Jedis对象包装成DefaultPooledObject进行返回 return new DefaultPooledObject<>(jedis); }}


6.2 访问过程


JedisCluster访问Redis的过程通过JedisClusterCommand来实现重试机制,最终通过Jedis对象来实现访问。


从实现的角度来说JedisCluster是在Jedis之上封装了一层,进行集群节点定位以及重试机制等。


以set命令为例,整个访问通过JedisClusterCommand实现如下:

  • 计算key所在的Redis节点。

  • 获取Redis节点对应的Jedis对象。

  • 通过Jedis对象进行set操作。


public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {  @Override public String set(final String key, final String value, final SetParams params) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.set(key, value, params); } }.run(key); }}


JedisClusterCommand的run方法核心主要定位Redis的key所在的Redis节点,然后获取与该节点对应的Jedis对象进行访问。


在Jedis对象访问异常后,JedisClusterCommand会进行重试操作并按照一定策略执行renewSlotCache方法进行重集群节点重发现动作。

public abstract class JedisClusterCommand<T> { public T run(String key) { // 针对key进行槽位的计算 return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, ); }  private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {  Jedis connection = ; try {  if (redirect != ) { connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); if (redirect instanceof JedisAskDataException) { connection.asking(); } } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { // 根据slot去获取Jedis对象 connection = connectionHandler.getConnectionFromSlot(slot); } } // 执行真正的Redis的命令 return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) {  releaseConnection(connection); connection = ;  if (attempts <= 1) { // 保证最后两次机会去重新刷新槽位和节点的对应的信息 this.connectionHandler.renewSlotCache(); } // 按照重试次数进行重试操作 return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); } catch (JedisRedirectionException jre) { // 针对返回Move命令立即触发重新刷新槽位和节点的对应信息 if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); }  releaseConnection(connection); connection = ;  return runWithRetries(slot, attempts - 1, false, jre); } finally { releaseConnection(connection); } }}


JedisSlotBasedConnectionHandler的cache对象维持了slot和node的映射关系,通过getConnectionFromSlot方法来获取该slot对应的Jedis对象。

public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {  protected final JedisClusterInfoCache cache;  @Override public Jedis getConnectionFromSlot(int slot) { // 获取槽位对应的JedisPool对象 JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != ) { // 从JedisPool对象中获取Jedis对象 return connectionPool.getResource(); } else { // 获取失败就重新刷新槽位信息 renewSlotCache(); connectionPool = cache.getSlotPool(slot); if (connectionPool != ) { return connectionPool.getResource(); } else { //no choice, fallback to new connection to random node return getConnection(); } } }}


七、Jedis的Pipeline实现


Pipeline的技术核心思想是将多个命令发送到服务器而不用等待回复,最后在一个步骤中读取该答复。这种模式的好处在于节省了请求响应这种模式的网络开销。


Redis的普通命令如set和Pipeline批量操作的核心的差别在于set命令的操作会直接发送请求到Redis并同步等待结果返回,而Pipeline的操作会发送请求但不立即同步等待结果返回,具体的实现可以从Jedis的源码一探究竟。


原生的Pipeline在集群模式下相关的key必须Hash到同一个节点才能生效,原因在于Pipeline下的Client对象只能其中的一个节点建立了连接。


在集群模式下归属于不同节点的key能够使用Pipeline就需要针对每个key保存对应的节点的client对象,在最后执行获取数据的时候一并获取。本质上可以认为在单节点的Pipeline的基础上封装成一个集群式的Pipeline


7.1 Pipeline用法分析


Pipeline访问单节点的Redis的时候,通过Jedis对象的Pipeline方法返回Pipeline对象,其他的命令操作通过该Pipeline对象进行访问。


Pipeline从使用角度来分析,会批量发送多个命令并最后统一使用syncAndReturnAll来一次性返回结果。

public void pipeline() { jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500); Pipeline p = jedis.pipelined(); // 批量发送命令到redis p.set("foo", "bar"); p.get("foo"); // 同步等待响应结果 List<Object> results = p.syncAndReturnAll();  assertEquals(2, results.size()); assertEquals("OK", results.get(0)); assertEquals("bar", results.get(1)); }  public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {  @Override public Response<String> set(final String key, final String value) { // 发送命令 getClient(key).set(key, value); // pipeline的getResponse只是把待响应的请求聚合到pipelinedResponses对象当中 return getResponse(BuilderFactory.STRING); }}  public class Queable {  private Queue<Response<?>> pipelinedResponses = new LinkedList<>(); protected <T> Response<T> getResponse(Builder<T> builder) { Response<T> lr = new Response<>(builder); // 统一保存到响应队列当中 pipelinedResponses.add(lr); return lr; }}  public class Pipeline extends MultiKeyPipelineBase implements Closeable {  public List<Object> syncAndReturnAll() { if (getPipelinedResponseLength() > 0) { // 根据批量发送命令的个数即需要批量返回命令的个数,通过client对象进行批量读取 List<Object> unformatted = client.getMany(getPipelinedResponseLength()); List<Object> formatted = new ArrayList<>(); for (Object o : unformatted) { try { // 格式化每个返回的结果并最终保存在列表中进行返回 formatted.add(generateResponse(o).get()); } catch (JedisDataException e) { formatted.add(e); } } return formatted; } else { return java.util.Collections.<Object> emptyList(); } }}


普通set命令发送请求给Redis后立即通过getStatusCodeReply来获取响应结果,所以这是一种请求响应的模式。


getStatusCodeReply在获取响应结果的时候会通过flush()命令强制发送报文到Redis服务端然后通过读取响应结果。

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands, AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {  @Override public String set(final byte[] key, final byte[] value) { checkIsInMultiOrPipeline(); // 发送命令 client.set(key, value); // 等待请求响应 return client.getStatusCodeReply(); }}  public class Connection implements Closeable { public String getStatusCodeReply() { // 通过flush立即发送请求 flush(); // 处理响应请求 final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); if ( == resp) { return ; } else { return SafeEncoder.encode(resp); } }}  public class Connection implements Closeable { protected void flush() { try { // 针对输出流进行flush操作保证报文的发出 outputStream.flush(); } catch (IOException ex) { broken = true; throw new JedisConnectionException(ex); } }}


八、结束语


Jedis作为Redis官方首选的Java客户端开发包,支持绝大部分的Redis的命令,也是日常中使用较多的Redis客户端。


了解了Jedis的实现原理,除了能够支持Redis的日常操作外,还能更好的应对Redis的额外操作诸如扩容时的技术选型。


通过介绍Jedis针对单机模式、分配模式、集群模式三种场景访问方式,让大家有个从宏观到微观的理解过程,掌握Jedis的核心思想并更好的应用到实践当中。


  • 终于!12年后Golang支持泛型了!(内含10个实例)

  • 浅谈 Apache APISIX 的可观测性

  • 百度爱番番数据分析体系的架构与实践

  • 云原生环境下对“多活”架构的思考

  • 从C++转向Rust需要注意哪些问题?


技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。


高可用架构
改变互联网的构建方式

相关推荐

【推荐】一个开源免费、AI 驱动的智能数据管理系统,支持多数据库

如果您对源码&技术感兴趣,请点赞+收藏+转发+关注,大家的支持是我分享最大的动力!!!.前言在当今数据驱动的时代,高效、智能地管理数据已成为企业和个人不可或缺的能力。为了满足这一需求,我们推出了这款开...

Pure Storage推出统一数据管理云平台及新闪存阵列

PureStorage公司今日推出企业数据云(EnterpriseDataCloud),称其为组织在混合环境中存储、管理和使用数据方式的全面架构升级。该公司表示,EDC使组织能够在本地、云端和混...

对Java学习的10条建议(对java课程的建议)

不少Java的初学者一开始都是信心满满准备迎接挑战,但是经过一段时间的学习之后,多少都会碰到各种挫败,以下北风网就总结一些对于初学者非常有用的建议,希望能够给他们解决现实中的问题。Java编程的准备:...

SQLShift 重大更新:Oracle→PostgreSQL 存储过程转换功能上线!

官网:https://sqlshift.cn/6月,SQLShift迎来重大版本更新!作为国内首个支持Oracle->OceanBase存储过程智能转换的工具,SQLShift在过去一...

JDK21有没有什么稳定、简单又强势的特性?

佳未阿里云开发者2025年03月05日08:30浙江阿里妹导读这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上...

「松勤软件测试」网站总出现404 bug?总结8个原因,不信解决不了

在进行网站测试的时候,有没有碰到过网站崩溃,打不开,出现404错误等各种现象,如果你碰到了,那么恭喜你,你的网站出问题了,是什么原因导致网站出问题呢,根据松勤软件测试的总结如下:01数据库中的表空间不...

Java面试题及答案最全总结(2025版)

大家好,我是Java面试陪考员最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、MySQL、JVM、Redis、Linux、Sprin...

数据库日常运维工作内容(数据库日常运维 工作内容)

#数据库日常运维工作包括哪些内容?#数据库日常运维工作是一个涵盖多个层面的综合性任务,以下是详细的分类和内容说明:一、数据库运维核心工作监控与告警性能监控:实时监控CPU、内存、I/O、连接数、锁等待...

分布式之系统底层原理(上)(底层分布式技术)

作者:allanpan,腾讯IEG高级后台工程师导言分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持。本文从分布式事务这个概念切入,尝试对分布式事务...

oracle 死锁了怎么办?kill 进程 直接上干货

1、查看死锁是否存在selectusername,lockwait,status,machine,programfromv$sessionwheresidin(selectsession...

SpringBoot 各种分页查询方式详解(全网最全)

一、分页查询基础概念与原理1.1什么是分页查询分页查询是指将大量数据分割成多个小块(页)进行展示的技术,它是现代Web应用中必不可少的功能。想象一下你去图书馆找书,如果所有书都堆在一张桌子上,你很难...

《战场兄弟》全事件攻略 一般事件合同事件红装及隐藏职业攻略

《战场兄弟》全事件攻略,一般事件合同事件红装及隐藏职业攻略。《战场兄弟》事件奖励,事件条件。《战场兄弟》是OverhypeStudios制作发行的一款由xcom和桌游为灵感来源,以中世纪、低魔奇幻为...

LoadRunner(loadrunner录制不到脚本)

一、核心组件与工作流程LoadRunner性能测试工具-并发测试-正版软件下载-使用教程-价格-官方代理商的架构围绕三大核心组件构建,形成完整测试闭环:VirtualUserGenerator(...

Redis数据类型介绍(redis 数据类型)

介绍Redis支持五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及Zset(sortedset:有序集合)。1、字符串类型概述1.1、数据类型Redis支持...

RMAN备份监控及优化总结(rman备份原理)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

取消回复欢迎 发表评论: