「一文搞懂」Java线程池实现原理
mhr18 2024-12-06 16:54 12 浏览 0 评论
本章内容
线程池简介
Java在使用线程执行程序时,需要调用操作系统内核的API创建一个内核线程,操作系统要为线程分配一系列的资源;当该Java线程被终止时,对应的内核线程也会被回收。因此,频繁的创建和销毁线程需要消耗大量资源。此外,由于CPU核数有限,大量的线程上下文切换会增加系统的性能开销,无限制地创建线程还可能导致内存溢出。为此,Java在JDK1.5版本中引入了线程池。
线程池是一种重用线程的机制,用于提高线程的利用率和管理线程的生命周期,常用于多线程编程和异步编程。
通过引入线程池:
- 1)降低资源消耗:线程池中的线程可以重复使用,避免因频繁创建和销毁线程而带来的性能开销。
- 2)提高响应速度:向线程池中提交任务时,无需创建线程即可执行任务处理。
- 3)方便线程管理:线程池可以对其中的线程进行统一管理、监控,避免因大量创建线程而导致内存溢出。
Java线程实现原理请移步主页查阅->「超级详细」Java线程实现原理。
线程池实现原理
Java线程池的核心实现类为ThreadPoolExecutor。
ThreadPoolExecutor依赖关系
ThreadPoolExecutor依赖关系图:
其中:
- Executor(接口):该接口线程池处理任务的顶级接口,定义了一个用于执行任务的方法execute(Runnable command)。其中参数command为实现了Runnable或Callable接口的Task任务。
- ExecutorService(接口):该接口继承了Executor接口,它扩展了Executor接口,并添加了一些管理线程池的方法(如:提交任务、关闭线程池等)。
- AbstractExecutorService(抽象类):该类实现了ExecutorService接口。
构造函数
如ThreadPoolExecutor依赖关系图所示,ThreadPoolExecutor类提供了四个构造函数,其中原始的构造函数(另外三个构造函数由原始构造函数衍生而来):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
核心参数:
- corePoolSize:线程池中的核心线程数,当向线程池中提交任务时,线程池创建新线程执行任务处理,直到当前线程数达到corePoolSize设定的值。此时提交的新任务会被添加到阻塞队列中,等待其他线程执行完成。默认情况下,空闲的核心线程并不会被回收(即:属性allowCoreThreadTimeOut的值默认为false),如果需要回收空闲的核心线程,设置属性allowCoreThreadTimeOut的值为true即可。
- maximumPoolSize:线程池中的最大线程数,当线程池中的线程数达到corePoolSize设定的值且核心线程均被占用时,如果阻塞队列已满并向线程池中继续提交任务,则创建新的线程执行任务处理,直到当前线程数达到maximumPoolSize设定的值。
- keepAliveTime:线程池中线程的存活时间,该参数只会在线程数大于corePoolSize设定的值时才生效,即:非核心线程的空闲时间超过keepAliveTime设定的值时会被回收。
- unit:线程池中参数keepAliveTime的时间单位,默认为TimeUnit.MILLISECONDS(毫秒),其他时间单位:
- TimeUnit.NANOSECONDS(纳秒)
- TimeUnit.MICROSECONDS(微秒)
- TimeUnit.MILLISECONDS(毫秒)
- TimeUnit.SECONDS(秒)
- TimeUnit.MINUTES(分钟)
- TimeUnit.HOURS(小时)
- TimeUnit.DAYS(天)
- workQueue:线程池中保存任务的阻塞队列,当线程池中的线程数达到corePoolSize设定的值,继续提交任务时会将任务添加到阻塞队列中等待。默认为LinkedBlockingQueue,可选择的阻塞队列:
- LinkedBlockingQueue(基于链表实现的阻塞队列)。
- ArrayBlockingQueue(基于数组实现的阻塞队列)。
- SynchronousQueue(只有一个元素的阻塞队列)。
- PriorityBlockingQueue(实现了优先级的阻塞队列)。
- DelayQueue(实现了延迟功能的阻塞队列)。
- threadFactory:线程池中创建线程的工厂,可以通过自定义线程工厂的方式为线程设置一个便于识别的线程名,默认为DefaultThreadFactory。
- handler:线程池的拒绝策略(又称饱和策略),当线程池中的线程数达到maximumPoolSize设定的值且阻塞队列已满时,继续向线程池中提交任务,就会触发拒绝策略,默认为AbortPolicy。可选的拒绝策略:
- AbortPolicy:丢弃任务,并抛出RejectedExecutionException异常。
- DiscardPolicy:丢弃任务,不抛出异常。
- DiscardOldestPolicy:丢弃队列中最早的未处理的任务,执行当前任务。
- CallerRunsPolicy:由调用者所在的线程来执行任务。
线程池生命周期
在ThreadPoolExecutor类中定义了线程池的五种状态。
源码如下:
// ctl共32位,其中高3位表示线程池运行状态,低29位表示线程池中的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 二进制为0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
RUNNING状态:二进制为111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN状态:二进制为000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP状态:二进制为001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
TIDYING状态:二进制为010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED状态:二进制011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 线程池中的线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中:
ctl是一个AtomicInteger类型的变量:
- 高3位表示线程池的运行状态。
- 低29位表示线程池中的线程数量。
线程池运行状态:
- RUNNING:处于RUNNING状态的线程池会接受新任务提交,同时也会处理任务队列中的任务。
- SHUTDOWN:处于RUNNING状态的线程池不会接受新任务提交,但是会处理任务队列中的任务。
- STOP:处于STOP状态的线程池不会接受新任务提交,也不会处理任务队列中的任务,并且会中断正在执行的任务。
- TIDYING:当所有的任务已终止、workerCount数量为0时,线程池会进入TIDYING状态,进入TIDYING状态会执行钩子方法terminated()。
- TERMINATED:执行完钩子方法terminated()后进入TERMINATED状态,此时线程池已终止。
线程池状态转换,如图所示:
工作线程
Worker(工作线程)是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer(即:AQS)并实现了Runnable接口。其中:
- 通过继承AbstractQueuedSynchronizer类,可以控制在主线程中调用shutdown()方法时不会中断正在执行的工作线程。注意:调用shutdownNow()时会中断正在执行的工作线程。
- 通过实现Runnable接口,可以在run()方法中调用ThreadPoolExecutor#runWorker方法执行任务处理。
提交任务
向线程池中提交任务的方式有两种:
- 1)调用execute()方法。
- 2)调用submit()方法。
execute与submit的区别:
- 1)execute是Executor接口的方法,submit是ExecuteService接口的方法。
- 2)execute的入参为Runnable,submit的入参可以为Runnable、Callable、Runnable和一个返回值。
- 3)execute没有返回值,submit有返回值。
- 4)异常处理:execute会直接抛出异常,submit会在获取结果时抛出异常,如果不获取结果,submit不抛出异常。
以execute()方法为例,ThreadPoolExecutor实现了Executor接口定义的execute(Runnable command) 方法,该方法的主要作用是将任务提交到线程池中的执行。
execute(Runnable command) 方法源码如下:
public void execute(Runnable command) {
// 如果command为null,则抛出空指针异常
if (command == null)
throw new NullPointerException();
// 获取ctl变量值,ctl低29位用来表示线程池中的线程数量。
int c = ctl.get();
// 如果线程池中的线程数小于设定的核心线程数,则将任务封装成Worker线程并调用其start()方法启动线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于运行状态并向任务队列中添加任务成功,则执行如下逻辑(此处线程池中的线程数大于等于核心线程数)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 继续判断线程池处于运行状态,如果线程池不是运行状态且从线程池中移除任务成功,则执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池中线程数为0,则创建Worker线程并执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果创建Worker线程失败,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
ThreadPoolExecutor的execute()方法执行逻辑,如图所示:
处理流程:
- 主线程通过execute()方法向线程池中提交任务:
- 线程池中的线程数小于核心线程数,继续向线程池中提交任务时,创建线程执行任务处理。
- 线程池中的线程数大于等于核心线程数:
- 如果阻塞队列未满,继续向线程池中提交任务时,将任务添加到阻塞队列中,等待处理。
- 如果阻塞队列已满且线程数小于最大线程数,继续向线程池中提交任务时,创建线程执行任务处理。
- 如果阻塞队列已满且线程数大于等于最大线程数,继续向线程池中提交任务时,执行拒绝策略。
执行任务
任务提交到线程池中后,会执行工作线程(Worker)的ThreadPoolExecutor.Worker#run方法执行任务处理,而run()方法中会调用ThreadPoolExecutor#runWorker方法。
ThreadPoolExecutor#runWorker方法中,通过自旋的方式从Worker工作线程或阻塞队列中获取任务进行处理:
- 如果Worker工作线程中存在任务,则执行Worker工作线程中的任务。
- 如果Worker工作线程中不存在任务,则通过getTask()方法从获取任务,并执行该任务。
- 如果Worker工作线程和阻塞队列中没有可执行的任务,则退出并销毁Worker工作线程。
线程池工作流程
线程池工作流程,如图所示:
处理流程:
- 向线程池中提交任务,判断线程池中线程数是否小于核心线程数:
- 线程数小于核心线程数,则创建线程执行任务处理。
- 线程数大于等于核心线程数,则判断线程池中阻塞队列是否已满:
- 阻塞队列未满,则将任务添加到阻塞队列中,等待执行。
- 阻塞队列已满,则判断线程池中的线程数是否小于最大线程数:
- 线程数小于最大线程数,则创建线程执行任务处理。
- 线程数大于等于最大线程数,则执行拒绝策略。
线程池创建方式
线程池的创建方式一般分为两种:
- 通过Executors类创建线程池。
- 通过ThreadPoolExecutor类创建线程池。
通过Executors类创建线程池
Executors类是JDK提供的一个创建线程池的工具类,内部通过调用ThreadPoolExecutor构造函数来实现,通过该类提供的静态方法可以快速创建一些常用线程池。
newFixedThreadPool
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
静态方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
其中:
- 核心线程数为初始化时指定的线程数,核心线程数等于最大线程数。
- 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
- 阻塞队列为LinkedBlockingQueue(默认值)。
newCachedThreadPool
创建一个可缓存的线程池,该线程池的大小为Integer.MAX_VALUE,对于提交的新任务:
- 如果存在空闲线程,则使用空闲线程来执行任务处理。
- 如果不存在空闲线程,则新建一个线程来执行任务处理。
- 如果线程池中的线程数超过处理任务需要的线程数,则空闲线程缓存一段时间(60s)后会被回收。
静态方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
其中:
- 核心线程数为0。
- 最大线程数为Integer.MAX_VALUE。
- 线程存活时间keepAliveTime为60s。
- 阻塞队列为SynchronousQueue(同步队列)。
newSingleThreadExecutor
创建一个单线程化的线程池,它只有一个工作线程来执行任务,所有任务按照先进先出的顺序执行。
静态方法:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其中:
- 核心线程数为1。
- 最大线程数为1。
- 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
- 阻塞队列为LinkedBlockingQueue。
newScheduledThreadPool
创建一个计划线程池,支持定时或周期性的执行任务(如:延时任务)。
静态方法:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
其中:
- 核心线程数为初始化时指定的线程数。
- 最大线程数为Integer.MAX_VALUE。
- 线程存活时间keepAliveTime为0,表示线程空闲时立刻被回收。
- 阻塞队列为DelayedWorkQueue(延时队列)。
其他线程池
Executors工具类除了能快速创建以上的常用线程池外,还可以创建很多其他线程池。如:
- Executors.newSingleThreadScheduledExecutor:创建一个单线程化的计划线程池(newScheduledThreadPool的单线程版本)。
- Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定),该线程池JDK1.8添加。
代码示例:
/**
* @author 南秋同学
* Executors类创建线程池
*/
@Slf4j
public class ExecutorsExample {
/**
* 创建一个固定大小的线程池
*/
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for (int i = 0; i < 10000; i++) {
// 在线程池中执行任务
executor.execute(new Task());
}
}
/**
* 任务线程
*/
static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
log.info("执行线程:{}", Thread.currentThread().getName());
} catch (InterruptedException e) {
log.info("线程中断异常");
}
}
}
}
不推荐使用Executors类创建线程池,主要原因:Executors类创建线程池默认使用LinkedBlockingQueue,LinkedBlockingQueue默认大小为Integer.MAX_VALUE(相当于无界队列),在高负载情况下很容易导致OOM。因此,强烈建议使用有界队列创建线程池。
通过ThreadPoolExecutor类创建线程池
通过ThreadPoolExecutor类手动创建线程池(推荐方式)。
代码示例:
/**
* @author 南秋同学
* ThreadPoolExecutor类创建线程池
*/
@Slf4j
public class ThreadPoolExecutorExample {
/**
* 定义线程池
*/
private static ExecutorService pool ;
public static void main(String[] args) {
// 创建线程池
pool = new ThreadPoolExecutor(1, 2, 0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 在线程池中执行任务
for (int i = 0; i < 10000; i++) {
pool.execute(new ThreadPoolExecutorExample.Task());
}
}
/**
* 任务线程
*/
static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
log.info("执行线程:{}", Thread.currentThread().getName());
} catch (InterruptedException e) {
log.info("线程中断异常");
}
}
}
}
其他推荐方式:通过commons-lang3、com.google.guava等工具包创建线程池,创建示例自行查阅相关资料即可。
线程池大小设置
在并发编程领域,提升性能本质上就是提升硬件的利用率(即:提升CPU和I/O设备综合利用率)的问题。因此,线程池大小需要根据CPU密集型和I/O密集型场景进行设置:
- 对于CPU密集型场景,线程池大小一般设置为:CPU核数 + 1。
- 对于I/O密集型场景,线程池大小一般设置为:CPU核数 * [1 +(I/O耗时 / CPU耗时)]。
以上公式仅作为参考值,具体情况需要根据压测情况进行设置。
【阅读推荐】
更多精彩内容,如:
- Redis系列
- 数据结构与算法系列
- Nacos系列
- MySQL系列
- JVM系列
- Kafka系列
- 并发编程系列
请移步【南秋同学】个人主页进行查阅。内容持续更新中......
【作者简介】
一枚热爱技术和生活的老贝比,专注于Java领域,关注【南秋同学】带你一起学习成长~
相关推荐
- B站收藏视频失效?mybili 收藏夹备份神器完整部署指南
-
本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...
- 中间件推荐初始化配置
-
Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...
- Redis中缓存穿透问题与解决方法
-
缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...
- 后端开发必看!Redis 哨兵机制如何保障系统高可用?
-
你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...
- Redis合集-大Key处理建议
-
以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...
- 深入解析跳跃表:Redis里的"老六"数据结构,专治各种不服
-
大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...
- Redis 中 AOF 持久化技术原理全解析,看完你就懂了!
-
你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...
- Redis合集-必备的几款运维工具
-
Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...
- 别再纠结线程池大小 + 线程数量了,没有固定公式的!
-
我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...
- 网络编程—IO多路复用详解
-
假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...
- 5分钟学会C/C++多线程编程进程和线程
-
前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...
- 尽情阅读,技术进阶,详解mmap的原理
-
1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...
- C++11多线程知识点总结
-
一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...
- 微服务高可用的2个关键技巧,你一定用得上
-
概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...
- Java线程间如何共享与传递数据
-
1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracle位图索引 (63)
- oracle批量插入数据 (62)
- oracle事务隔离级别 (53)
- oracle 空为0 (50)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)