百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

并发与高并发线程池

mhr18 2025-05-14 14:56 30 浏览 0 评论

前言

之前的例子中基本上都用到了线程池,一般我们都是把任务初始化好之后直接丢到线程池就可以了,使用起来非常简单方便。

主体概要

  • 线程池与new Thread对比
  • 线程池的几个类介绍
  • 线程池的几种状态
  • ThreadPoolExecutor的几个方法
  • 线程池的合理配置

主体内容

一、线程池与new Thread对比

new Thread弊端

1.每次new Thread都要新建一个对象,性能差。

2.线程缺少统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者OOM。

3.缺少更多功能,如更多执行、定期执行、线程中断。

所以,我们一般不常用Thread,这里就不再细讲了。

线程池好处

1.重用存在的线程,减少对象创建,消亡的开销,性能佳。

2.可有效的控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。

3.提供定时执行、定期执行、单线程、并发数控制等功能。

二、线程池的几个类介绍

1.ThreadPoolExecutor

我们来看看ThreadPoolExecutor可以接收的几个参数来做初始化。

(1)corePoolSize:核心线程数量

(2)maximumPoolSize:线程最大线程数

(3)workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响

(4)keepAliveTime:线程没有任务执行时最多保持多久时间终止(当线程池的线程数量大于corePoolSize,如果这时没有新的任务提交,核心线程不会立即销毁,而是让他等待,直到超过这里的keepAliveTime)

(5)unit:keepAliveTime的时间单位

(6)ThreadFactoryL:线程工厂,用来创建线程

(7)rejectHandler:当拒绝处理任务时的策略

如果运行的线程数少于我们的corePoolSize,直接创建新线程来处理任务,即使线程池中的其他线程是空闲的;

如果运行的线程数大于我们的corePoolSize,且小于我们的maximumPoolSize,则只有当workQueue满了的时候,才创建新的线程去处理任务;

如果我们设置的corePoolSize和maximumPoolSize相同的话,那么创建的线程池的大小是固定的,这个时候如果有新任务提交,并且workQueue没满的时候,就把请求放到workQueue里面,等待有空闲的线程来这个队列取出任务;

如果运行的线程数大于maximumPoolSize的时候,这时workQueue也已经满了,那么它就要指定我们后面要讲的指定策略参数来处理。

接下来我们详细介绍一下workQueue队列:

它是保存等待执行的任务的一个阻塞队列,当我们提交一个新的任务到线程池的时候,线程池会根据当前线程池中正在运行着的数量来决定该任务的处理方式,处理方式总共有三种:直接切换,无界队列,有界队列。直接切换就是之前提到的SynchronousQueue,使用的无界队列一般是使用链表队列-LinkedBlockingQueue,如果采用无界队列,线程池中能创建的最大线程数就是corePoolSize。我们这里介绍的workQueue的有界队列,一般是ArrayBlockingQueue,使用这种方式呢我们可以把线程池最大线程数目限制为maximumPoolSize。

详细介绍一下rejectHandler的四种策略:

如果workQueue阻塞队列满了,并且没有空闲的线程池,此时,继续提交任务,需要采取一种策略来处理这个任务。
线程池总共提供了四种策略:

  • 直接抛出异常,这也是默认的策略。实现类为AbortPolicy。
  • 用调用者所在的线程来执行任务。实现类为CallerRunsPolicy。、
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为DiscardPolicy。

三、线程池的几种状态

如图,线程池的5种状态转换如下:


分别来介绍一下这5种状态:

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对阻塞队列中已添加的任务进行处理。
(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理阻塞队列中已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理阻塞队列中已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 状态说明:当所有的任务已终止,ctl(ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount))记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

Linuxc/c++服务器开发高阶视频,电子书学习资料后台私信【架构】获取

内容包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,MongoDB,ZK,流媒体,P2P,K8S,Docker,TCP/IP,协程,DPDK多个高级知识点。

四、ThreadPoolExecutor的几个方法

1.基础方法

  • execute():提交任务,交给线程池执行
  • submit():提交任务,能够返回执行结果。(execute+Future)
  • shutdown():关闭线程池,等待任务都执行完
  • shutdownNow():关闭线程池,不等待任务执行完

2.监控方法

  • getTaskCount():线程池已执行和未执行的任务总数
  • getCompletedTaskCount():已完成的任务数量
  • getPoolSize():线程池当前的线程数量
  • getActiveCount():当前线程池中正在执行任务的线程数量

3.线程池类图

如图所示,J.U.C中有三个Executor接口:

  • Executor:一个运行新任务的简单接口;
  • ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
  • ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。

而我们的ThreadPoolExecutor是功能最强大的,因为它可以自定义参数。

4.J.U.C框架是极其强大的,他还为我们提供了许多额外的方法

  • Executors.newCachedThreadPool():可以创建一个可缓存的线程池,如果线程池长度超过了处理的需要,可以灵活回收空闲线程;如果没有可以回收的,那么就新建线程。
  • Executors.newFixedThreadPool():它创建的是一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待。
  • Executors.newScheduledThreadPool():它创建的也是一个定长的线程池,支持定时以及周期性的任务执行。
  • Executors.newSingleThreadPool():它创建的是一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指令顺序执行。

5 接下来我们分别对这几个方法做一个了解

(1)这是newCachedThreadPool的基本方法

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

还有一个可以传入指定的ThreadFactory参数

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

(2)这是newFixedThreadPool的基本方法

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

同样还有一个可以传入指定的ThreadFactory参数

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

(3)这是newScheduledThreadPool的基本方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

同样的,可以传入指定的threadFactory参数

 public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

(4)这是newSingleThreadExecutor的基本方法

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

同样的,也可以传入指定的threadFactory参数

 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

需要注意的是,他们返回值都是ExecutorService对象,而非ThreadPoolExecutor对象,因此缺少监控方法和部分基本方法,只有shutdown(),submit(),shutdownNow()基本方法,这是ExecutorService的局限性。

6.最后,分别演示一下代码例子

(1)newCachedThreadPool

mport lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:52:22.217 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample1 - task:1
21:52:22.217 [pool-1-thread-4] INFO com.practice.aqs.ThreadPoolExample1 - task:3
21:52:22.217 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample1 - task:0
21:52:22.217 [pool-1-thread-8] INFO com.practice.aqs.ThreadPoolExample1 - task:7
21:52:22.217 [pool-1-thread-6] INFO com.practice.aqs.ThreadPoolExample1 - task:5
21:52:22.217 [pool-1-thread-5] INFO com.practice.aqs.ThreadPoolExample1 - task:4
21:52:22.217 [pool-1-thread-7] INFO com.practice.aqs.ThreadPoolExample1 - task:6
21:52:22.217 [pool-1-thread-10] INFO com.practice.aqs.ThreadPoolExample1 - task:9
21:52:22.217 [pool-1-thread-9] INFO com.practice.aqs.ThreadPoolExample1 - task:8
21:52:22.217 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample1 - task:2

(2)

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExample2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:55:17.350 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:0
21:55:17.350 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:1
21:55:17.350 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:2
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:5
21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:4
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:6
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:8
21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:7
21:55:17.354 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:3
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:9

(3)newSingleThreadExecutor

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExample3 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:57:01.913 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:0
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:1
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:2
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:3
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:4
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:5
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:6
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:7
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:8
21:57:01.917 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:9

(4)newScheduledThreadPool(注意:返回值与以上不同)

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                log.info("schedule run");
            }
        },3, TimeUnit.SECONDS);//延时3秒执行该输出任务
        scheduledExecutorService.shutdown();
    }
}

结果(这个结果是延时3秒出来的):

22:03:22.303 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run

Process finished with exit code 0

除了schedule()方法,他还有scheduleAtFixedRate(以指定的速率去执行任务),scheduleWithFixedDelay(以指定的一个延迟执行任务)

先说scheduleAtFixedRate

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

     
       scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
               log.info("schedule run");
           }
       },1,3,TimeUnit.SECONDS);//延迟一秒,每隔3秒执行一次任务
    }
}

结果(注意观察时间间隔):

22:10:22.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:25.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:28.168 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:31.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:34.167 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample4 - schedule run

五、线程池的合理配置

1.CPU密集型

尽量使用较小的线程池,一般Cpu核心数+1

因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销

2.IO密集型

方法一:可以使用较大的线程池,一般CPU核心数 * 2

IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用cpu时间

方法二:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

3、混合型

可以将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,按情况而定。

相关推荐

一文带您了解数据库的行列之争:行式与列式存储的异同

数据库存储格式是数据库管理系统中一个至关重要的方面,它直接影响到数据的组织和检索效率。在数据库中,有两种主要的存储格式,即行式存储和列式存储。这两者采用截然不同的方法来组织和存储数据,各自具有一系列优...

NL2SQL(三)开源项目怎么选:talk is cheap, show me the code!

老规矩,先看效果下面的demo来自试用的SuperSonic,将会在下面详细介绍:大模型时代Text-to-SQL特点随着基于LLM技术的发展,RAG/AIAgent/Fine...

JDK25长期支持版九月降临:18项王炸功能全解析

Java要放大招啦!9月份推出的JDK25长期支持版已经锁定18个超能力,从稳定值到结构化并发,还有Linux系统下的"预知未来"性能分析!下面我用打游戏的术语给你们掰扯明白:1、飞...

OceanBase 推出单机版 高度兼容MySQL和Oracle

【环球网科技综合报道】3月27日,独立数据库厂商OceanBase正式发布单机版产品。据悉,这一产品基于自主研发的单机分布式一体化架构设计,具备极简数据库架构和高度兼容性,为中小规模业务提供兼具性能与...

黄远邦:应对7月1日闰秒对Oracle数据库影响

由于今年7月1日全世界会多出一秒,这可能对时间敏感的IT系统造成较大影响。中亦科技数据库团队对此问题做了深入的研究,并对用户系统提出了相应的解决方法及建议。中亦科技数据库产品总监黄远邦认为,闰秒调整会...

MySQL数据库密码忘记了,怎么办?(mysql 数据库密码)

#头条创作挑战赛#MySQL数据库密码忘记了且没有其他可以修改账号密码的账户时怎么办呢?登录MySQL,密码输入错误/*密码错误,报如下错误*/[root@TESTDB~]#mysql-u...

Chinese AI Talent in Spotlight as Nvidia and Meta Escalate Talent War

OntherightisBanghuaZhu,ChiefResearchScientistatNVIDIATMTPOST--SiliconValley’stoptech...

用Cursor开启JAVA+AI生涯(javascirpt怎么开启)

Cursor是基于VSCode开发的一款编辑器,支持多种语言的开发编辑。与传统的开发工具相比,它有多种优势:与AI无缝集成,响应速度快,占用内存小。但很多同学在"起步"过程中遇到了...

毕业十年了,自从做了开发用了很多软件,但距离写开发工具还很远

办公系统类:办公软件Word、Excel、PowerPoint三大必备技能+腾讯/金山在线文档解压缩操作:7-zip/winrar文件文本处理:Notepad++(文本编辑器正则表达式超级好...

盘点Java中最没用的知识⑤:这3个老古董你还在代码里“考古”?

一、Stack类:“继承Vector”的历史bug,为何成了性能拖油瓶?你是不是在学Java集合时,老师说过“栈结构用Stack类”?是不是在老代码里见过"newStack<>(...

Gemini 2.5 Pro 0506发布,编程最强大模型, 碾压 Claude3.7 sonnent

一、Gemini2.5Pro(I/Oedition)发布1、为何叫I/Oedition?谷歌史上最强编程模型Gemini2.5Pro(I/Oedition)发布,具体型号是Gemin...

如何让无聊变得有趣(附本人大量美图)

文/图:金冬成在这条长300公里的公路上,我已经来回往返了无数次。3小时车程,一个人,想想都是多么无聊的一件事。其实,人生道路上,类似这种无聊的事情有很多很多。无聊的事情、枯燥的工作,往往让我们容易失...

Oracle 推出 Java 24,增强 AI 支持和后量子加密

导读:Oracle宣布正式发布Java24,该语言增加了几个新功能,例如StreamGatherersAPI和Class-FileAPI的可用性,以及专门为AI推理和量子安全设计...

公司ERP突然变慢?“索引重建”这颗“药”可不能随便吃!

各位老板、IT小哥、财务小姐姐,有没有遇到过公司ERP系统突然卡顿得像“老爷车”,点个按钮半天没反应,急得直跺脚?这时候,可能有人会跳出来说:“我知道,重建一下数据库索引就好了!”听起来像个“神操作”...

基于Java实现,支持在线发布API接口读取数据库,有哪些工具?

基于java实现,不需要编辑就能发布api接口的,有哪些工具、平台?还能一键发布、快速授权和开放提供给第三方请求调用接口的解决方案。架构方案设计:以下是一些基于Java实现的无需编辑或只需少量编辑...

取消回复欢迎 发表评论: