百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Spring-data-redis + Lettuce 如何使用 Pipeline

mhr18 2024-11-28 08:35 16 浏览 0 评论

最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?

首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:

  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

execute(RedisCallback),流程是:

对于 executePipelined(RedisCallback),如果使用正确的话,会使用 asyncDedicatedConn 私有连接执行。那么怎么算使用正确呢?

需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate 调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});
复制代码

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});
复制代码

这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?

Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands

Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。

如果原来的命令是这么发送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4
复制代码

那么使用 PIPELINE 之后,命令就是类似于这么发送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4
复制代码

我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。

Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。

配置 Spring-data-redis + Lettuce 使用 Pipeline

Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:

  • DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
  • github.com/spring-proj…

我们可以这样配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnClose
            if (bean instanceof LettuceConnectionFactory) {
                LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
                lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
            }
            return bean;
        }
    };
}
复制代码

我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:

public interface PipeliningFlushPolicy {
    //其实就是默认的每个命令都直接发到 Redis Server
    static PipeliningFlushPolicy flushEachCommand() {
		return FlushEachCommand.INSTANCE;
	}
	//在连接关闭的时候,将命令一起发到 Redis
	static PipeliningFlushPolicy flushOnClose() {
		return FlushOnClose.INSTANCE;
	}
	//手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redis
	static PipeliningFlushPolicy buffered(int bufferSize) {
		return () -> new BufferedFlushing(bufferSize);
	}
}
复制代码

这三个类也都实现了 PipeliningFlushState 接口:

public interface PipeliningFlushState {
    //对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法
    void onOpen(StatefulConnection<?, ?> connection);
    //对于 executePipelined 中的每个命令都会调用这个方法
    void onCommand(StatefulConnection<?, ?> connection);
    //在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法
    void onClose(StatefulConnection<?, ?> connection);
}
复制代码

默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
	INSTANCE;
	@Override
	public PipeliningFlushState newPipeline() {
		return INSTANCE;
	}
	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {}
	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {}
	@Override
	public void onClose(StatefulConnection<?, ?> connection) {}
}
复制代码

对于 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
	INSTANCE;
	@Override
	public PipeliningFlushState newPipeline() {
		return INSTANCE;
	}
	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {
	    //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
		connection.setAutoFlushCommands(false);
	}
	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {
        //收到命令时什么都不做
	}
	@Override
	public void onClose(StatefulConnection<?, ?> connection) {
	    //在 pipeline 关闭的时候发送所有命令
		connection.flushCommands();
		//恢复默认配置,这样连接如果退回连接池不会影响后续使用
		connection.setAutoFlushCommands(true);
	}
}
复制代码

对于 buffered:

private static class BufferedFlushing implements PipeliningFlushState {
	private final AtomicLong commands = new AtomicLong();
	private final int flushAfter;

	public BufferedFlushing(int flushAfter) {
		this.flushAfter = flushAfter;
	}

	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {
	    //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
		connection.setAutoFlushCommands(false);
	}

	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {
	    //如果命令达到指定个数,就发到 Redis
		if (commands.incrementAndGet() % flushAfter == 0) {
			connection.flushCommands();
		}
	}

	@Override
	public void onClose(StatefulConnection<?, ?> connection) {
	    //在 pipeline 关闭的时候发送所有命令
		connection.flushCommands();
		//恢复默认配置,这样连接如果退回连接池不会影响后续使用
		connection.setAutoFlushCommands(true);
	}
}


作者:干货满满张哈希
链接:https://juejin.cn/post/7019847175232290824
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


相关推荐

【预警通报】关于WebLogic存在远程代码执行高危漏洞的预警通报

近日,Oracle官方发布了2021年1月关键补丁更新公告CPU(CriticalPatchUpdate),共修复了包括CVE-2021-2109(WeblogicServer远程代码执行漏洞)...

医院信息系统突发应急演练记录(医院信息化应急演练)

信息系统突发事件应急预案演练记录演练内容信息系统突发事件应急预案演练参与人员信息科参与科室:全院各部门日期xxxx-xx-xx时间20:00至24:00地点信息科记录:xxx1、...

一文掌握怎么利用Shell+Python实现完美版的多数据源备份程序

简介:在当今数字化时代,无论是企业还是个人,数据的安全性和业务的连续性都是至关重要的。数据一旦丢失,可能会造成无法估量的损失。因此,如何有效地对分布在不同位置的数据进行备份,尤其是异地备份,成为了一个...

docker搭建系统环境(docker搭建centos)

Docker安装(CentOS7)1.卸载旧版Docker#检查已安装版本yumlistinstalled|grepdocker#卸载旧版本yumremove-ydocker.x...

基础篇:数据库 SQL 入门教程(sql数据库入门书籍推荐)

SQL介绍什么是SQLSQL指结构化查询语言,是用于访问和处理数据库的标准的计算机语言。它使我们有能力访问数据库,可与多种数据库程序协同工作,如MSAccess、DB2、Informix、M...

Java21杀手级新特性!3行代码性能翻倍

导语某券商系统用这招,交易延迟从12ms降到0.8ms!本文揭秘Oracle官方未公开的Record模式匹配+虚拟线程深度优化+向量API神操作,代码量直降70%!一、Record模式匹配(代码量↓8...

一文读懂JDK21的虚拟线程(java虚拟线程)

概述JDK21已于2023年9月19日发布,作为Oracle标准Java实现的一个LTS版本发布,发布了15想新特性,其中虚拟线程呼声较高。虚拟线程是JDK21中引入的一项重要特性,它是一种轻量级的...

效率!MacOS下超级好用的Linux虚拟工具:Lima

对于MacOS用户来说,搭建Linux虚拟环境一直是件让人头疼的事。无论是VirtualBox还是商业的VMware,都显得过于笨重且配置复杂。今天,我们要介绍一个轻巧方便的纯命令行Linux虚拟工具...

所谓SaaS(所谓三维目标一般都应包括)

2010年前后,一个科技媒体的主编写一些关于云计算的概念性问题,就可以作为头版头条了。那时候的云计算,更多的还停留在一些概念性的问题上。而基于云计算而生的SaaS更是“养在深闺人未识”,一度成为被IT...

ORA-00600 「25027」 「x」报错(报错0xc0000001)

问题现象:在用到LOB大对象的业务中,进行数据的插入,失败了,在报警文件中报错:ORA-00600:内部错误代码,参数:[25027],[10],[0],[],[],[],[],[...

安卓7源码编译(安卓源码编译环境lunch失败,uname命令找不到)

前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...

编译安卓源码(编译安卓源码 电脑配置)

前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...

360 Vulcan Team首战告捷 以17.5万美金强势领跑2019“天府杯“

2019年11月16日,由360集团、百度、腾讯、阿里巴巴、清华大学与中科院等多家企业和研究机构在成都联合主办了2019“天府杯”国际网络安全大赛暨2019天府国际网络安全高峰论坛。而开幕当日最激荡人...

Syslog 日志分析与异常检测技巧(syslog发送日志配置)

系统日志包含有助于分析网络设备整体运行状况的重要信息。然而,理解并从中提取有效数据往往颇具挑战。本文将详解从基础命令行工具到专业日志管理软件的全流程分析技巧,助你高效挖掘Syslog日志价值。Gr...

从Oracle演进看数据库技术的发展(从oracle演进看数据库技术的发展的过程)

数据库技术发展本质上是应用需求驱动与基础架构演进的双向奔赴,如何分析其技术发展的脉络和方向?考虑到oracle数据库仍然是这个领域的王者,以其为例,管中窥豹,对其从Oracle8i到23ai版本的核...

取消回复欢迎 发表评论: