Redis客户端Lettuce源码「二」Lettuce是如何基于Netty建立连接的
mhr18 2024-11-28 08:35 14 浏览 0 评论
lettuce-core版本: 5.1.7.RELEASE
先看一下Lettuce的基本使用方法,使用Lettuce大概分为如下几步:
- 基于Redis连接信息创建RedisClient
- 基于RedisClient创建StatefulRedisConnection
- 从Connection中获取Command,基于Command执行Redis命令操作。
/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//构建RedisClient对象,RedisClient包含了Redis的基本配置信息,可以基于RedisClient创建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");
//创建一个线程安全的StatefulRedisConnection,可以多线程并发对该connection操作,底层只有一个物理连接.
StatefulRedisConnection<String, String> connection = client.connect();
//获取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种command
RedisStringCommands<String, String> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);
//获取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种command
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<String> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}
先看一张建立连接的时序图,有一个直观的印象。
lettuce源码--建立redis连接
RedisClient
一个可扩展、线程安全的RedisClient,支持sync、async、reactor执行模式。
RedisClient.create只是传入了一些配置信息,此时并没有创建连接。
// 使用默认的ClientResource
public static RedisClient create(String uri) {
LettuceAssert.notEmpty(uri, "URI must not be empty");
return new RedisClient(null, RedisURI.create(uri));
}
// ClientResources中包含了一些配置和线程池信息,是一个比较重的资源,多个RedisClient可以共享同一个ClientResource
protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
super(clientResources);
assertNotNull(redisURI);
this.redisURI = redisURI;
setDefaultTimeout(redisURI.getTimeout());
}
RedisClient.connnect
可以看到connect方法有一些重载方法,默认的是用UTF8 String对key和value序列化,通过传入RedisCodec支持自定义的对Key和Value的序列化方式。
public StatefulRedisConnection<String, String> connect() {
return connect(newStringStringCodec());
}
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
checkForRedisURI();
//connectStandaloneAsync是异步创建connection,返回的是Future对象,通过getConnection转为同步操作
return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
}
//异步转同步操作
protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
try {
return connectionFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
} catch (Exception e) {
if (e instanceof ExecutionException) {
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());
}
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
}
}
RedisClient.connectStandaloneAsync
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
assertNotNull(codec);
checkValidRedisURI(redisURI);
logger.debug("Trying to get a Redis connection for: " + redisURI);
//创建一个有状态的EndPoint用于抽象底层channel的实现,DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command。同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠)。
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
RedisChannelWriter writer = endpoint;
//进一步封装,添加支持过期时间的执行命令
if (CommandExpiryWriter.isSupported(clientOptions)) {
writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
}
//创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。此时并没有真正的创建物理连接,该类本身是无状态、线程安全的。
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
//异步创建Redis物理连接,返回future对象。后面可以看到future中返回的对象其实还是上面的connection
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));
future.whenComplete((channelHandler, throwable) -> {
if (throwable != null) {
connection.close();
}
});
return future;
}
//StatefulRedisConnectionImpl的构造函数,此时已经创建了sync、async、reactive三种类型的RedisCommand。基于RedisCodec对key和value序列化,通过write把命令真正的发出去。
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
super(writer, timeout);
this.codec = codec;
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();
}
RedisClient.connectStatefulAsync
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
RedisCodec<K, V> codec, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//构建ConnectionBuidler,通过ConnectionBuilder来创建connection
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
//填充StatefulRedisConnectionImpl
connectionBuilder.connection(connection);
//控制RedisClient行为的一些配置参数
connectionBuilder.clientOptions(clientOptions);
//ClientResource包含了一些EventLoopGroup信息
connectionBuilder.clientResources(clientResources);
//配置commandHandlerSupplier,这个commandHandler很重要,是实现StatefulRedisConnectionImpl线程安全的关键,后面会详细讲。
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//connectionBuilder填充Bootstrap等更多的信息
//getSocketAddressSupplier是根据redisURI获取真正的Redis连接信息,如:sentinel模式下,需要从sentinel获取到真实的redis连接地址
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
//配置netty的channeltype
channelType(connectionBuilder, redisURI);
if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}
}
//初始化channel,在这一步才真正的异步的去创建物理连接
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
ConnectionFuture<?> sync = future;
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
//连接成功之后发送auth命令,做密码的验证
sync = sync.thenCompose(channelHandler -> {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getPassword());
return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args);
});
}
//设置clientName,从Redis服务端执行client list可以看到clientname
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
sync = sync.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
//选择db
if (redisURI.getDatabase() != 0) {
sync = sync.thenCompose(channelHandler -> {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getDatabase());
return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args);
});
}
//返回connection对象
return sync.thenApply(channelHandler -> (S) connection);
}
RedisClient.connectionBuilder
//为ConnectionBuidler填充更多的信息,如Bootstrap、channelGroup
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
//创建Netty客户端的Bootstrap对象
Bootstrap redisBootstrap = new Bootstrap();
//Bootstrap的一些配置参数,具体可以参考Netty的相关书籍(Netty权威指南)
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
SocketOptions socketOptions = getOptions().getSocketOptions();
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
//keepAlive参数,默认为true
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
//tcp_nodelay参数,默认为true
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
connectionBuilder.timeout(redisURI.getTimeout());
connectionBuilder.password(redisURI.getPassword());
//把构建出来的bootStrap对象赋值给connectionBuidler,由connectionBuilder创建连接
connectionBuilder.bootstrap(redisBootstrap);
//Netty的相关参数配置,待研究
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
//配置socket地址提供者
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}
RedisClient.initializeChannelAsync
//初始化redis连接,返回ChannelFuture对象
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {
Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();
if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
//创建socketAddressFuture 对象,当socketAddressSupplier异步获取SocketAddress成功之后会把SocketAddress数据放入该对象中
CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
//创建channelReadyFuture,当连接建立成功之后会把Channel对象放入该对象中
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
//配置获取SocketAddress异步操作之后的操作:
//1. 把SocketAddress对象放入socketAddressFuture中
//2. 基于SocketAddress调用initializeChannelAsync0方法真正去建立连接
socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
.subscribe(redisAddress -> {
if (channelReadyFuture.isCancelled()) {
return;
}
//异步建立真正的连接,如果建立成功会把生产的Channel对象放入channelReadyFuture中
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
//建立连接成功之后返回的还是connectionBuilder的connection对象,即StatefulRedisConnectionImpl
return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}
RedisClient.initializeChannelAsync0
//真正的去建立Redis物理连接,这里面有很多基于Future的异步操作,如果看不太懂,建议先看看Future的相关知识,多看几遍。
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {
logger.debug("Connecting to Redis at {}", redisAddress);
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//创建PlainChannelInitializer对象,PlainChannelIntializer对象会在Channel初始化的时候添加很多Handlers(Netty的Handler概念可以参考Netty权威指南),如:CommandEncoder、CommandHandler(非常重要的Handler)、ConnectionWatchdog(实现断线重连)
RedisChannelInitializer initializer = connectionBuilder.build();
//RedisChannelInitializer配置到Bootstrap中
redisBootstrap.handler(initializer);
//调用一些通过ClientResources自定义的回调函数
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
//获取initFuture 对象,如果Channel初始化完成,可以通过该对象获取到初始化的结果
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
//真正的通过Netty异步的方式去建立物理连接,返回ChannelFuture对象
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
//配置异常处理
channelReadyFuture.whenComplete((c, t) -> {
if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});
connectFuture.addListener(future -> {
//异常处理
if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.endpoint().initialState();
//赋值channelReadyFuture告知出现异常了
channelReadyFuture.completeExceptionally(future.cause());
return;
}
//当Channel初始化完成之后,根据初始化的结果做判断
initFuture.whenComplete((success, throwable) -> {
//如果异常为空,则初始化成功。
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
//把成功之后的结果赋值给channelReadyFuture对象
channelReadyFuture.complete(connectFuture.channel());
return;
}
//如果初始化Channel的过程中出现异常的处理逻辑
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.endpoint().initialState();
Throwable failure;
if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout(), throwable);
} else {
failure = throwable;
}
//赋值channelReadyFuture告知出现异常了
channelReadyFuture.completeExceptionally(failure);
});
});
}
至此,Redis的Connection的建立连接的主流程就结束了,具体的一些逻辑如:断线重连是如何实现的,Redis模式下是怎么基于Sentinel获取Redis实际连接的等等会在后续的文章中介绍。
- 上一篇:Redis Client常用命令
- 下一篇:【Redis】常用命令介绍
相关推荐
- 【预警通报】关于WebLogic存在远程代码执行高危漏洞的预警通报
-
近日,Oracle官方发布了2021年1月关键补丁更新公告CPU(CriticalPatchUpdate),共修复了包括CVE-2021-2109(WeblogicServer远程代码执行漏洞)...
- 医院信息系统突发应急演练记录(医院信息化应急演练)
-
信息系统突发事件应急预案演练记录演练内容信息系统突发事件应急预案演练参与人员信息科参与科室:全院各部门日期xxxx-xx-xx时间20:00至24:00地点信息科记录:xxx1、...
- 一文掌握怎么利用Shell+Python实现完美版的多数据源备份程序
-
简介:在当今数字化时代,无论是企业还是个人,数据的安全性和业务的连续性都是至关重要的。数据一旦丢失,可能会造成无法估量的损失。因此,如何有效地对分布在不同位置的数据进行备份,尤其是异地备份,成为了一个...
- docker搭建系统环境(docker搭建centos)
-
Docker安装(CentOS7)1.卸载旧版Docker#检查已安装版本yumlistinstalled|grepdocker#卸载旧版本yumremove-ydocker.x...
- 基础篇:数据库 SQL 入门教程(sql数据库入门书籍推荐)
-
SQL介绍什么是SQLSQL指结构化查询语言,是用于访问和处理数据库的标准的计算机语言。它使我们有能力访问数据库,可与多种数据库程序协同工作,如MSAccess、DB2、Informix、M...
- Java21杀手级新特性!3行代码性能翻倍
-
导语某券商系统用这招,交易延迟从12ms降到0.8ms!本文揭秘Oracle官方未公开的Record模式匹配+虚拟线程深度优化+向量API神操作,代码量直降70%!一、Record模式匹配(代码量↓8...
- 一文读懂JDK21的虚拟线程(java虚拟线程)
-
概述JDK21已于2023年9月19日发布,作为Oracle标准Java实现的一个LTS版本发布,发布了15想新特性,其中虚拟线程呼声较高。虚拟线程是JDK21中引入的一项重要特性,它是一种轻量级的...
- 效率!MacOS下超级好用的Linux虚拟工具:Lima
-
对于MacOS用户来说,搭建Linux虚拟环境一直是件让人头疼的事。无论是VirtualBox还是商业的VMware,都显得过于笨重且配置复杂。今天,我们要介绍一个轻巧方便的纯命令行Linux虚拟工具...
- 所谓SaaS(所谓三维目标一般都应包括)
-
2010年前后,一个科技媒体的主编写一些关于云计算的概念性问题,就可以作为头版头条了。那时候的云计算,更多的还停留在一些概念性的问题上。而基于云计算而生的SaaS更是“养在深闺人未识”,一度成为被IT...
- ORA-00600 「25027」 「x」报错(报错0xc0000001)
-
问题现象:在用到LOB大对象的业务中,进行数据的插入,失败了,在报警文件中报错:ORA-00600:内部错误代码,参数:[25027],[10],[0],[],[],[],[],[...
- 安卓7源码编译(安卓源码编译环境lunch失败,uname命令找不到)
-
前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...
- 编译安卓源码(编译安卓源码 电脑配置)
-
前面已经下载好源码了,接下来是下载手机对应的二进制驱动执行编译源码命令下载厂商驱动https://developers.google.com/android/drivers?hl=zh-cn搜索NGI...
- 360 Vulcan Team首战告捷 以17.5万美金强势领跑2019“天府杯“
-
2019年11月16日,由360集团、百度、腾讯、阿里巴巴、清华大学与中科院等多家企业和研究机构在成都联合主办了2019“天府杯”国际网络安全大赛暨2019天府国际网络安全高峰论坛。而开幕当日最激荡人...
- Syslog 日志分析与异常检测技巧(syslog发送日志配置)
-
系统日志包含有助于分析网络设备整体运行状况的重要信息。然而,理解并从中提取有效数据往往颇具挑战。本文将详解从基础命令行工具到专业日志管理软件的全流程分析技巧,助你高效挖掘Syslog日志价值。Gr...
- 从Oracle演进看数据库技术的发展(从oracle演进看数据库技术的发展的过程)
-
数据库技术发展本质上是应用需求驱动与基础架构演进的双向奔赴,如何分析其技术发展的脉络和方向?考虑到oracle数据库仍然是这个领域的王者,以其为例,管中窥豹,对其从Oracle8i到23ai版本的核...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (74)
- oracle基目录 (50)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (53)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)