Redis(单机+集群) 批量操作之 pipeline
mhr18 2024-11-10 09:49 40 浏览 0 评论
业务场景
最近项目中场景需要get一批key的value,因为redis的get操作(不单单是get命令)是阻塞的,如果循环取值的话,就算是内网,耗时也是巨大的。所以想到了redis的pipeline命令。
pipeline简介
非pipeline:client一个请求,redis server一个响应,期间client阻塞
Pipeline:redis的管道命令,允许client将多个请求依次发给服务器(redis的客户端,如jedisCluster,lettuce等都实现了对pipeline的封装),过程中而不需要等待请求的回复,在最后再一并读取结果即可。
单机版
单机版比较简单,直接上代码
//换成真实的redis实例
Jedis jedis = new Jedis();
//获取管道
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
p.get(i + "");
}
//获取结果
List<Object> results = p.syncAndReturnAll();
集群版
因为 JedisCluster 本身不支持 pipeline ,所以我们需要对 JedisCluster 进行一些封装。
还是一样,直接上代码
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;
@Slf4j
public class JedisClusterPipeline extends PipelineBase implements Closeable {
/**
* 用于获取 JedisClusterInfoCache
*/
private JedisSlotBasedConnectionHandler connectionHandler;
/**
* 根据hash值获取连接
*/
private JedisClusterInfoCache clusterInfoCache;
/**
* 也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
* JedisCluster继承于BinaryJedisCluster
* 在BinaryJedisCluster,connectionHandler属性protected修饰的,所以需要反射
*
*
* 而 JedisClusterInfoCache 属性在JedisClusterConnectionHandler中,但是这个类是抽象类,
* 但它有一个实现类JedisSlotBasedConnectionHandler
*/
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
}
/**
* 根据顺序存储每个命令对应的Client
*/
private Queue<Client> clients = new LinkedList<>();
/**
* 用于缓存连接
* 一次pipeline过程中使用到的jedis缓存
*/
private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
/**
* 是否有数据在缓存区
*/
private boolean hasDataInBuf = false;
/**
* 根据jedisCluster实例生成对应的JedisClusterPipeline
* 通过此方式获取pipeline进行操作的话必须调用close()关闭管道
* 调用本类里pipelineXX方法则不用close(),但建议最好还是在finally里调用一下close()
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public JedisClusterPipeline() {
}
public void setJedisCluster(JedisCluster jedis) {
connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
}
/**
* 刷新集群信息,当集群信息发生变更时调用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步读取所有数据 并按命令顺序返回一个列表
*
* @return 按照命令的顺序返回所有的数据
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<>();
innerSync(responseList);
return responseList;
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
if (hasDataInBuf) {
flushCachedData(jedis);
}
jedis.close();
}
jedisMap.clear();
hasDataInBuf = false;
}
private void flushCachedData(Jedis jedis) {
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
}
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
// 根据pool从缓存中获取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
}
hasDataInBuf = true;
return jedis;
}
public static void pipelineSetEx(String[] keys, String[] values, int[] exps,JedisCluster jedisCluster) {
operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0, len = keys.length; i < len; i++) {
p.setex(keys[i], exps[i], values[i]);
}
return p.syncAndReturnAll();
}
});
}
public static List<Map<String, String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0, len = keys.length; i < len; i++) {
p.hgetAll(keys[i]);
}
return p.syncAndReturnAll();
}
});
}
public static List<Boolean> pipelineSismember(String[] keys, String members,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0, len = keys.length; i < len; i++) {
p.sismember(keys[i], members);
}
return p.syncAndReturnAll();
}
});
}
public static <O> List pipeline(BiConsumer<O, JedisClusterPipeline> function, O obj,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
function.accept(obj, jcp);
return jcp.syncAndReturnAll();
}
});
}
private void innerSync(List<Object> formatted) {
HashSet<Client> clientSet = new HashSet<>();
try {
for (Client client : clients) {
// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
// size相同说明所有的client都已经添加,就不用再调用add方法了
if (clientSet.size() != jedisMap.size()) {
clientSet.add(client);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
if (clientSet.size() != jedisMap.size()) {
// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
for (Jedis jedis : jedisMap.values()) {
if (clientSet.contains(jedis.getClient())) {
continue;
}
flushCachedData(jedis);
}
}
hasDataInBuf = false;
close();
}
}
private static Field getField(Class<?> cls, String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
}
}
@SuppressWarnings({"unchecked" })
private static <T> T getValue(Object obj, Field field) {
try {
return (T)field.get(obj);
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("get value fail", e);
throw new RuntimeException(e);
}
}
private static <T> T operate(Command command) {
try {
return command.execute();
} catch (Exception e) {
log.error("redis operate error");
throw new RuntimeException(e);
}
}
interface Command {
/**
* 具体执行命令
*
* @param <T>
* @return
*/
<T> T execute();
}
}
使用demo
public Object testPipelineOperate() {
// String[] keys = {"dylan1","dylan2"};
// String[] values = {"dylan1-v1","dylan2-v2"};
// int[] exps = {100,200};
// JedisClusterPipeline.pipelineSetEx(keys, values, exps, jedisCluster);
long start = System.currentTimeMillis();
List<String> keyList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
keyList.add(i + "");
}
// List<String> pipeline = JedisClusterPipeline.pipeline(this::getValue, keyList, jedisCluster);
// List<String> pipeline = JedisClusterPipeline.pipeline(this::getHashValue, keyList, jedisCluster);
String[] keys = {"dylan-test1", "dylan-test2"};
List<Map<String, String>> all = JedisClusterPipeline.pipelineHgetAll(keys, jedisCluster);
long end = System.currentTimeMillis();
System.out.println("testPipelineOperate cost:" + (end-start));
return Response.success(all);
}
预览
相关推荐
- Java面试宝典之问答系列(java面试回答)
-
以下内容,由兆隆IT云学院就业部根据多年成功就业服务经验提供:1.写出从数据库表Custom中查询No、Name、Num1、Num2并将Name以姓名显示、计算出的和以总和显示的SQL。SELECT...
- ADG (Active Data Guard) 数据容灾架构下,如何配置 Druid 连接池?
-
如上图的数据容灾架构下,上层应用如果使用Druid连接池,应该如何配置,才能在数据库集群节点切换甚至主备数据中心站点切换的情况下,上层应用不需要变动(无需修改配置也无需重启);即数据库节点宕机/...
- SpringBoot多数据源dynamic-datasource快速入门
-
一、简介dynamic-datasourc是一个基于SpringBoot的快速集成多数据源的启动器,其主要特性如下:支持数据源分组,适用于多种场景纯粹多库读写分离一主多从混合模式。支持...
- SpringBoot项目快速开发框架JeecgBoot——项目简介及系统架构!
-
项目简介及系统架构JeecgBoot是一款基于SpringBoot的开发平台,它采用前后端分离架构,集成的框架有SpringBoot2.x、SpringCloud、AntDesignof...
- 常见文件系统格式有哪些(文件系统类型有哪几种)
-
PART.01常见文件系统格式有哪些常见的文件系统格式有很多,通常根据使用场景(Windows、Linux、macOS、移动设备、U盘、硬盘等)有所不同。以下是一些主流和常见的文件系统格式及其特点:一...
- Oracle MySQL Operator部署集群(oracle mysql group by)
-
以下是使用OracleMySQLOperator部署MySQL集群的完整流程及关键注意事项:一、部署前准备安装MySQLOperator通过Helm安装Operator到Ku...
- LibreOffice加入"转向Linux"运动
-
LibreOffice项目正准备削减部分Windows支持,并鼓励用户切换到Linux系统。自Oracle放弃OpenOffice后,支持和指导LibreOffice开发的文档基金会对未来有着明确的观...
- Oracle Linux 10发布:UEK 8.1、后量子加密、增强开发工具等
-
IT之家6月28日消息,科技媒体linuxiac昨日(6月27日)发布博文,报道称OracleLinux10正式发布,完全二进制兼容(binarycompatibility...
- 【mykit-data】 数据库同步工具(数据库同步工具 开源)
-
项目介绍支持插件化、可视化的数据异构中间件,支持的数据异构方式如下MySQL<——>MySQL(增量、全量)MySQL<——>Oracle(增量、全量)Oracle...
- oracle关于xml的解析(oracle读取xml节点的属性值)
-
有时需要在存储过程中处理xml,oracle提供了相应的函数来进行处理,xmltype以及相关的函数。废话少说,上代码:selectxmltype(SIConfirmOutput).extract...
- 如何利用DBSync实现数据库同步(通过dblink同步数据库)
-
DBSync是一款通用型的数据库同步软件,能侦测数据表之间的差异,能实时同步差异数据,从而使双方始终保持一致。支持各种数据库,支持异构同步、增量同步,且提供永久免费版。本文介绍其功能特点及大致用法,供...
- MYSQL存储引擎InnoDB(八十):InnoDB静态数据加密
-
InnoDB支持独立表空间、通用表空间、mysql系统表空间、重做日志和撤消日志的静态数据加密。从MySQL8.0.16开始,还支持为模式和通用表空间设置加密默认值,这允许DBA控制在这些模...
- JDK高版本特性总结与ZGC实践(jdk高版本兼容低版本吗)
-
美团信息安全技术团队核心服务升级JDK17后,性能与稳定性大幅提升,机器成本降低了10%。高版本JDK与ZGC技术令人惊艳,且JavaAISDK最低支持JDK17。本文总结了JDK17的主要...
- 4 种 MySQL 同步 ES 方案,yyds!(两个mysql数据库自动同步的方法)
-
本文会先讲述数据同步的4种方案,并给出常用数据迁移工具,干货满满!不BB,上文章目录:1.前言在实际项目开发中,我们经常将MySQL作为业务数据库,ES作为查询数据库,用来实现读写分离,...
- 计算机Java培训课程包含哪些内容?其实就这六大块
-
不知不觉秋天已至,如果你还处于就业迷茫期,不如来学习Java。对于非科班小白来说,Java培训会更适合你。提前了解下计算机Java培训课程内容,会有助于你后续学习。下面,我就从六个部分为大家详细介绍...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Java面试宝典之问答系列(java面试回答)
- ADG (Active Data Guard) 数据容灾架构下,如何配置 Druid 连接池?
- SpringBoot多数据源dynamic-datasource快速入门
- SpringBoot项目快速开发框架JeecgBoot——项目简介及系统架构!
- 常见文件系统格式有哪些(文件系统类型有哪几种)
- Oracle MySQL Operator部署集群(oracle mysql group by)
- LibreOffice加入"转向Linux"运动
- Oracle Linux 10发布:UEK 8.1、后量子加密、增强开发工具等
- 【mykit-data】 数据库同步工具(数据库同步工具 开源)
- oracle关于xml的解析(oracle读取xml节点的属性值)
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)