百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Spring-Data-Redis 连接泄漏,我 TM 人傻了

mhr18 2024-12-08 14:47 16 浏览 0 评论

本文基于 Spring Data Redis 2.4.9

最近线上又出事儿了,新上线了一个微服务系统,上线之后就开始报各种发往这个系统的请求超时,这是咋回事呢

还是经典的通过 JFR 去定位,对于历史某些请求响应慢,我一般按照如下流程去看:

  1. 是否有 STW(Stop-the-world,参考我的另一篇文章:JVM相关 - SafePoint 与 Stop The World 全解):
  2. 是否有 GC 导致的长时间 STW
  3. 是否有其他原因导致进程所有线程进入 safepoint 导致 STW
  4. 是否 IO 花了太长时间,例如调用其他微服务,访问各种存储(硬盘,数据库,缓存等等)
  5. 是否在某些锁上面阻塞太长时间?
  6. 是否 CPU 占用过高,哪些线程导致的?

通过 JFR 发现是很多 HTTP 线程在一个锁上面阻塞了,这个锁是从 Redis 连接池获取连接的锁。我们的项目使用的 spring-data-redis,底层客户端使用 lettuce。为何会阻塞在这里呢?经过分析,我发现 spring-data-redis 存在连接泄漏的问题

我们先来简单介绍下 Lettuce,简单来说 Lettuce 就是使用 Project Reactor + Netty 实现的 Redis 非阻塞响应式客户端。spring-data-redis 是针对 Redis 操作的统一封装。我们项目使用的是 spring-data-redis + Lettuce 的组合。

为了和大家尽量说明白问题的原因,这里先将 spring-data-redis + lettuce API 结构简单介绍下。

首先 lettuce 官方,是不推荐使用连接池的,但是官方没有说,这是什么情况下的决定。这里先放上结论:

  • 如果你的项目中,使用的 spring-data-redis + lettuce,并且使用的都是 Redis 简单命令,没有使用 Redis 事务,Pipeline 等等,那么不使用连接池,是最好的(并且你没有关闭 Lettuce 连接共享,这个默认是开启的)。
  • 如果你的项目中,大量使用了 Redis 事务,那么最好还是使用连接池
  • 其实更准确地说,如果你使用了大量会触发 execute(SessionCallback) 的命令,最好使用连接池,如果你使用的都是 execute(RedisCallback) 的命令,就不太有必要使用连接池了。如果大量使用 Pipeline,最好还是使用连接池。

接下来介绍下 spring-data-redis 的 API 原理。在我们的项目中,主要使用 spring-data-redis 的两个核心 API,即同步的 RedisTemplate 和异步的 ReactiveRedisTemplate。我们这里主要以同步的 RedisTemplate 为例子,说明原理。ReactiveRedisTemplate 其实就是做了异步封装,Lettuce 本身就是异步客户端,所以 ReactiveRedisTemplate 其实实现更简单。

RedisTemplate 的一切 Redis 操作,最终都会被封装成两种操作对象,一是 RedisCallback<T>

public interface RedisCallback<T> {
	@Nullable
	T doInRedis(RedisConnection connection) throws DataAccessException;
}

是一个 Functional Interface,入参是 RedisConnection,可以通过使用 RedisConnection 操作 Redis。可以是若干个 Redis 操作的集合。大部分 RedisTemplate 的简单 Redis 操作都是通过这个实现的。例如 Get 请求的源码实现就是:

//在 RedisCallback 的基础上增加统一反序列化的操作
abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
	private Object key;

	public ValueDeserializingRedisCallback(Object key) {
		this.key = key;
	}

	public final V doInRedis(RedisConnection connection) {
		byte[] result = inRedis(rawKey(key), connection);
		return deserializeValue(result);
	}

	@Nullable
	protected abstract byte[] inRedis(byte[] rawKey, RedisConnection connection);
}

//Redis Get 命令的实现

public V get(Object key) {

	return execute(new ValueDeserializingRedisCallback(key) {

		@Override
		protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
		    //使用 connection 执行 get 命令
			return connection.get(rawKey);
		}
	}, true);
}

另一种是SessionCallback<T>

public interface SessionCallback<T> {

	@Nullable
	<K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException;
}

SessionCallback也是一个 Functional Interface,方法体也是可以放若干个命令。顾名思义,即在这个方法中的所有命令,都是会共享同一个会话,即使用的 Redis 连接是同一个并且不能被共享的。一般如果使用 Redis 事务则会使用这个实现。

RedisTemplate 的 API 主要是以下这几个,所有的命令底层实现都是这几个 API:

  • execute(RedisCallback<?> action) 和 executePipelined(final SessionCallback<?> session):执行一系列 Redis 命令,是所有方法的基础,里面使用的连接资源会在执行后自动释放
  • executePipelined(RedisCallback<?> action) 和 executePipelined(final SessionCallback<?> session):使用 PipeLine 执行一系列命令,连接资源会在执行后自动释放
  • executeWithStickyConnection(RedisCallback<T> callback):执行一系列 Redis 命令,连接资源不会自动释放,各种 Scan 命令就是通过这个方法实现的,因为 Scan 命令会返回一个 Cursor,这个 Cursor 需要保持连接(会话),同时交给用户决定什么时候关闭。

通过源码我们可以发现,RedisTemplate 的三个 API 在实际应用的时候,经常会发生互相嵌套递归的情况。

例如如下这种:

redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        orders.forEach(order -> {
            connection.hashCommands().hSet(orderKey.getBytes(), order.getId().getBytes(), JSON.toJSONBytes(order));
        });
        return null;
    }
});

redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        orders.forEach(order -> {
            redisTemplate.opsForHash().put(orderKey, order.getId(), JSON.toJSONString(order));
        });
        return null;
    }
});

是等价的。redisTemplate.opsForHash().put()其实调用的是 execute(RedisCallback) 方法,这种就是 executePipelined 与 execute(RedisCallback) 嵌套,由此我们可以组合出各种复杂的情况,但是里面使用的连接是怎么维护的呢?

其实这几个方法获取连接的时候,使用的都是:RedisConnectionUtils.doGetConnection 方法,去获取连接并执行命令。对于 Lettuce 客户端,获取的是一个 org.springframework.data.redis.connection.lettuce.LettuceConnection. 这个连接封装包含两个实际 Lettuce Redis 连接,分别是:

private final @Nullable StatefulConnection<byte[], byte[]> asyncSharedConn;

private @Nullable StatefulConnection<byte[], byte[]> asyncDedicatedConn;
  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

我们通过一个简单例子来看一下执行流程,首先是一个简单命令:redisTemplate.opsForValue().get("test"),根据之前的源码分析,我们知道,底层其实就是 execute(RedisCallback),流程是:

可以看出,如果使用的是 RedisCallback,那么其实不需要绑定连接,不涉及事务。Redis 连接会在回调内返回。需要注意的是,如果是调用 executePipelined(RedisCallback),需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate 调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});

然后,我们尝试将其加入事务中,由于我们的目的不是真的测试事务,只是为了演示问题,所以,仅仅是用 SessionCallback 将 GET 命令包装起来:

redisTemplate.execute(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        return operations.opsForValue().get("test");
    }
});

这里最大的区别就是,外层获取连接的时候,这次是 bind = true 即将连接与当前线程绑定,用于保持会话连接。外层流程如下:

里面的 SessionCallback 其实就是 redisTemplate.opsForValue().get("test"),使用的是共享的连接,而不是独占的连接,因为我们这里还没开启事务(即执行 multi 命令),如果开启了事务使用的就是独占的连接,流程如下:

由于 SessionCallback 需要保持连接,所以流程有很大变化,首先需要绑定连接,其实就是获取连接放入 ThreadLocal 中。同时,针对 LettuceConnection 进行了封装,我们主要关注这个封装有一个引用计数的变量。每嵌套一次 execute 就会将这个计数 + 1,执行完之后,就会将这个计数 -1, 同时每次 execute 结束的时候都会检查这个引用计数,如果引用计数归零,就会调用 LettuceConnection.close()

接下来再来看,如果是 executePipelined(SessionCallback) 会怎么样:

List<Object> objects = redisTemplate.executePipelined(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        operations.opsForValue().get("test");
        return null;
    }
});

其实与第二个例子在流程上的主要区别在于,使用的连接不是共享连接,而是直接是独占的连接

最后我们再来看一个例子,如果是在 execute(RedisCallback) 中执行基于 executeWithStickyConnection(RedisCallback<T> callback) 的命令会怎么样,各种 SCAN 就是基于 executeWithStickyConnection(RedisCallback<T> callback) 的,例如:

redisTemplate.execute(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build());
        //scan 最后一定要关闭,这里采用 try-with-resource
        try (scan) {
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
});

这里 Session callback 的流程,如下图所示,因为处于 SessionCallback,所以 executeWithStickyConnection 会发现当前绑定了连接,于是标记 + 1,但是并不会标记 - 1,因为 executeWithStickyConnection 可以将资源暴露到外部,例如这里的 Cursor,需要外部手动关闭。

在这个例子中,会发生连接泄漏,首先执行:

redisTemplate.execute(new SessionCallback<Object>() {
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build());
        //scan 最后一定要关闭,这里采用 try-with-resource
        try (scan) {
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
});

这样呢,LettuceConnection 会和当前线程绑定,并且在结束时,引用计数不为零,而是 1。并且 cursor 关闭时,会调用 LettuceConnection 的 close。但是 LettuceConnection 的 close 的实现,其实只是标记状态,并且把独占的连接 asyncDedicatedConn 关闭,由于当前没有使用到独占的连接,所以为空,不需要关闭;如下面源码所示:

LettuceConnection:

@Override
public void close() throws DataAccessException {
	super.close();

	if (isClosed) {
		return;
	}

	isClosed = true;

	if (asyncDedicatedConn != null) {
		try {
			if (customizedDatabaseIndex()) {
				potentiallySelectDatabase(defaultDbIndex);
			}
			connectionProvider.release(asyncDedicatedConn);
		} catch (RuntimeException ex) {
			throw convertLettuceAccessException(ex);
		}
	}

	if (subscription != null) {
		if (subscription.isAlive()) {
			subscription.doClose();
		}
		subscription = null;
	}

	this.dbIndex = defaultDbIndex;
}

之后我们继续执行一个 Pipeline 命令:

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        redisTemplate.opsForValue().get("test");
        return null;
    }
});

这时候由于连接已经绑定到当前线程,同时同上上一节分析我们知道第一步解开释放这个绑定,但是调用了 LettuceConnection 的 close。执行这个代码,会创建一个独占连接,并且,由于计数不能归零,导致连接一直与当前线程绑定,这样,这个独占连接一直不会关闭(如果有连接池的话,就是一直不返回连接池)

即使后面我们手动关闭这个链接,但是根据源码,由于状态 isClosed 已经是 true,还是不能将独占链接关闭。这样,就会造成连接泄漏

针对这个 Bug,我已经向 spring-data-redis 一个 Issue:Lettuce Connection Leak while using execute(SessionCallback) and executeWithStickyConnection in same thread by random turn

  • 尽量避免使用 SessionCallback,尽量仅在需要使用 Redis 事务的时候,使用 SessionCallback。
  • 使用 SessionCallback 的函数单独封装,将事务相关的命令单独放在一起,并且外层尽量避免再继续套 RedisTemplate 的 execute 相关函数。

相关推荐

「完结13章」MySQL、Redis、MongoDB 数据库一课通

「完结13章」MySQL、Redis、MongoDB数据库一课通获课》jzit.top/14766/ACID与CAP的博弈主要体现在分布式系统的一致性实现上。ACID是数据库事务正确执行所必须满足的...

Redis和传统数据库(比如MySQL)是对手还是队友?

聊技术圈的事儿,有时候就像看武侠小说。总有那么几位“成名已久”的老前辈,也有一些“天赋异禀”的后起之秀。咱们今天聊的两位主角,就是这样的存在:传统关系型数据库(比如MySQL、Oracle、SQLS...

高性能异步io机制:io_uring

io_uring是linux内核5.10引入的异步io接口。相比起用户态的DPDK、SPDK,io_uring作为内核的一部分,通过mmap的方式实现用户和内核共享内存,并基于m...

Linux 应用开发 - 必须掌握的 5 个底层 IO 函数

底层输入输出(Low-LevelInput/Output)这篇文章主要介绍Linux原生的IO操作(LowIO),你可能会想不是有跨平台的ANSIC可以使用么,为啥还要学习底层IO...

超专业解析!10分钟带你搞懂Linux中直接I/O原理

我们先看一张图:这张图大体上描述了Linux系统上,应用程序对磁盘上的文件进行读写时,从上到下经历了哪些事情。这篇文章就以这张图为基础,介绍Linux在I/O上做了哪些事情。文件系统什么是...

Java面试题及答案总结(2025版持续更新)

大家好,我是Java面试分享最近很多小伙伴在忙着找工作,给大家整理了一份非常全面的Java面试题及答案。涉及的内容非常全面,包含:Spring、Redis、Mybatis、JVM、Nginx、Kafk...

物联网(IoT)设备产生海量数据,Redis如何助力实时处理?

咱们这“Redis三部曲”可算是聊到点子上了!前面咱说了它在云时代的风光,也盘了它和国产数据库的“爱恨情仇”,还展望了它在边缘计算的“新战场”。今天,咱们聚焦一个更具体、更火爆的领域——物联网(IoT...

几种 TCP 连接中出现 RST 的情况

现在是一个网络时代了。应该不少程序员在编程中需要考虑多机、局域网、广域网的各种问题。所以网络知识也是避免不了学习的。而且笔者一直觉得TCP/IP网络知识在一个程序员知识体系中必需占有一席之地的。在...

Redis连接使用报RDB error错误

该错误信息:Errorinexecution;nestedexceptionisio.lettuce.core.RedisCommandExecutionException:MISC...

lua 语法介绍与 NGINX lua 高级用法实战操作

一、概述lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:https://www.lua.org/二、l...

Python教程——20.协程 - 2

异步编程asyncio.Future对象Task继承Future,Task对象内部中的await结果的处理基于Future对象来的在Future对象中会保存当前执行的这个协程任务的状态,如果当...

“我的足迹”、“浏览历史”,Redis如何快速记录与展示?

咱们在网上“买买买”、“逛逛逛”的时候,总会留下各种各样的“足迹”。无论是电商APP里你最近浏览过的商品,视频网站上你刚刚看过的剧集,还是新闻客户端里你点开过的文章……这些“历史记录”,有时候还真挺有...

你手机上的“消息推送”,Redis可能参与其中

手机上那些时不时就“叮咚”一下的消息推送,确实是咱们数字生活里不可或缺的一部分。这篇咱们就来聊聊,Redis这位“消息灵通人士”,是如何在这场“信息接力赛”中大显身手,确保那些重要的、有趣的通知,能够...

短视频APP的“附近的人”,Redis如何快速匹配?

刷短视频,除了看各种搞笑段子、才艺展示,有时候是不是也想看看“同城”或者“附近”的人都在发些啥有意思的内容?或者,平台也会时不时地给你推荐一些“附近正在直播”的主播,让你感觉一下子拉近了和这个虚拟世界...

微信朋友圈的点赞、评论,Redis在背后默默付出

微信朋友圈,这片小小的“自留地”,承载了我们多少喜怒哀乐、生活点滴啊!一张精心修饰的照片,一段随感而发的文字,发出去之后,最期待的是什么?那必须是屏幕下方不断冒出来的小红心和一条条真诚(或者商业互吹)...

取消回复欢迎 发表评论: