百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

并发与高并发线程池

mhr18 2025-05-14 14:56 6 浏览 0 评论

前言

之前的例子中基本上都用到了线程池,一般我们都是把任务初始化好之后直接丢到线程池就可以了,使用起来非常简单方便。

主体概要

  • 线程池与new Thread对比
  • 线程池的几个类介绍
  • 线程池的几种状态
  • ThreadPoolExecutor的几个方法
  • 线程池的合理配置

主体内容

一、线程池与new Thread对比

new Thread弊端

1.每次new Thread都要新建一个对象,性能差。

2.线程缺少统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或者OOM。

3.缺少更多功能,如更多执行、定期执行、线程中断。

所以,我们一般不常用Thread,这里就不再细讲了。

线程池好处

1.重用存在的线程,减少对象创建,消亡的开销,性能佳。

2.可有效的控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。

3.提供定时执行、定期执行、单线程、并发数控制等功能。

二、线程池的几个类介绍

1.ThreadPoolExecutor

我们来看看ThreadPoolExecutor可以接收的几个参数来做初始化。

(1)corePoolSize:核心线程数量

(2)maximumPoolSize:线程最大线程数

(3)workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响

(4)keepAliveTime:线程没有任务执行时最多保持多久时间终止(当线程池的线程数量大于corePoolSize,如果这时没有新的任务提交,核心线程不会立即销毁,而是让他等待,直到超过这里的keepAliveTime)

(5)unit:keepAliveTime的时间单位

(6)ThreadFactoryL:线程工厂,用来创建线程

(7)rejectHandler:当拒绝处理任务时的策略

如果运行的线程数少于我们的corePoolSize,直接创建新线程来处理任务,即使线程池中的其他线程是空闲的;

如果运行的线程数大于我们的corePoolSize,且小于我们的maximumPoolSize,则只有当workQueue满了的时候,才创建新的线程去处理任务;

如果我们设置的corePoolSize和maximumPoolSize相同的话,那么创建的线程池的大小是固定的,这个时候如果有新任务提交,并且workQueue没满的时候,就把请求放到workQueue里面,等待有空闲的线程来这个队列取出任务;

如果运行的线程数大于maximumPoolSize的时候,这时workQueue也已经满了,那么它就要指定我们后面要讲的指定策略参数来处理。

接下来我们详细介绍一下workQueue队列:

它是保存等待执行的任务的一个阻塞队列,当我们提交一个新的任务到线程池的时候,线程池会根据当前线程池中正在运行着的数量来决定该任务的处理方式,处理方式总共有三种:直接切换,无界队列,有界队列。直接切换就是之前提到的SynchronousQueue,使用的无界队列一般是使用链表队列-LinkedBlockingQueue,如果采用无界队列,线程池中能创建的最大线程数就是corePoolSize。我们这里介绍的workQueue的有界队列,一般是ArrayBlockingQueue,使用这种方式呢我们可以把线程池最大线程数目限制为maximumPoolSize。

详细介绍一下rejectHandler的四种策略:

如果workQueue阻塞队列满了,并且没有空闲的线程池,此时,继续提交任务,需要采取一种策略来处理这个任务。
线程池总共提供了四种策略:

  • 直接抛出异常,这也是默认的策略。实现类为AbortPolicy。
  • 用调用者所在的线程来执行任务。实现类为CallerRunsPolicy。、
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为DiscardPolicy。

三、线程池的几种状态

如图,线程池的5种状态转换如下:


分别来介绍一下这5种状态:

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对阻塞队列中已添加的任务进行处理。
(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理阻塞队列中已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理阻塞队列中已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 状态说明:当所有的任务已终止,ctl(ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount))记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

Linuxc/c++服务器开发高阶视频,电子书学习资料后台私信【架构】获取

内容包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,MongoDB,ZK,流媒体,P2P,K8S,Docker,TCP/IP,协程,DPDK多个高级知识点。

四、ThreadPoolExecutor的几个方法

1.基础方法

  • execute():提交任务,交给线程池执行
  • submit():提交任务,能够返回执行结果。(execute+Future)
  • shutdown():关闭线程池,等待任务都执行完
  • shutdownNow():关闭线程池,不等待任务执行完

2.监控方法

  • getTaskCount():线程池已执行和未执行的任务总数
  • getCompletedTaskCount():已完成的任务数量
  • getPoolSize():线程池当前的线程数量
  • getActiveCount():当前线程池中正在执行任务的线程数量

3.线程池类图

如图所示,J.U.C中有三个Executor接口:

  • Executor:一个运行新任务的简单接口;
  • ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
  • ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。

而我们的ThreadPoolExecutor是功能最强大的,因为它可以自定义参数。

4.J.U.C框架是极其强大的,他还为我们提供了许多额外的方法

  • Executors.newCachedThreadPool():可以创建一个可缓存的线程池,如果线程池长度超过了处理的需要,可以灵活回收空闲线程;如果没有可以回收的,那么就新建线程。
  • Executors.newFixedThreadPool():它创建的是一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待。
  • Executors.newScheduledThreadPool():它创建的也是一个定长的线程池,支持定时以及周期性的任务执行。
  • Executors.newSingleThreadPool():它创建的是一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指令顺序执行。

5 接下来我们分别对这几个方法做一个了解

(1)这是newCachedThreadPool的基本方法

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

还有一个可以传入指定的ThreadFactory参数

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

(2)这是newFixedThreadPool的基本方法

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

同样还有一个可以传入指定的ThreadFactory参数

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

(3)这是newScheduledThreadPool的基本方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

同样的,可以传入指定的threadFactory参数

 public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

(4)这是newSingleThreadExecutor的基本方法

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

同样的,也可以传入指定的threadFactory参数

 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

需要注意的是,他们返回值都是ExecutorService对象,而非ThreadPoolExecutor对象,因此缺少监控方法和部分基本方法,只有shutdown(),submit(),shutdownNow()基本方法,这是ExecutorService的局限性。

6.最后,分别演示一下代码例子

(1)newCachedThreadPool

mport lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:52:22.217 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample1 - task:1
21:52:22.217 [pool-1-thread-4] INFO com.practice.aqs.ThreadPoolExample1 - task:3
21:52:22.217 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample1 - task:0
21:52:22.217 [pool-1-thread-8] INFO com.practice.aqs.ThreadPoolExample1 - task:7
21:52:22.217 [pool-1-thread-6] INFO com.practice.aqs.ThreadPoolExample1 - task:5
21:52:22.217 [pool-1-thread-5] INFO com.practice.aqs.ThreadPoolExample1 - task:4
21:52:22.217 [pool-1-thread-7] INFO com.practice.aqs.ThreadPoolExample1 - task:6
21:52:22.217 [pool-1-thread-10] INFO com.practice.aqs.ThreadPoolExample1 - task:9
21:52:22.217 [pool-1-thread-9] INFO com.practice.aqs.ThreadPoolExample1 - task:8
21:52:22.217 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample1 - task:2

(2)

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExample2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:55:17.350 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:0
21:55:17.350 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:1
21:55:17.350 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:2
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:5
21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:4
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:6
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:8
21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:7
21:55:17.354 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:3
21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:9

(3)newSingleThreadExecutor

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExample3 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for(int i=0;i<10;i++){
            final int index = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}

结果:

21:57:01.913 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:0
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:1
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:2
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:3
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:4
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:5
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:6
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:7
21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:8
21:57:01.917 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:9

(4)newScheduledThreadPool(注意:返回值与以上不同)

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                log.info("schedule run");
            }
        },3, TimeUnit.SECONDS);//延时3秒执行该输出任务
        scheduledExecutorService.shutdown();
    }
}

结果(这个结果是延时3秒出来的):

22:03:22.303 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run

Process finished with exit code 0

除了schedule()方法,他还有scheduleAtFixedRate(以指定的速率去执行任务),scheduleWithFixedDelay(以指定的一个延迟执行任务)

先说scheduleAtFixedRate

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

     
       scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
               log.info("schedule run");
           }
       },1,3,TimeUnit.SECONDS);//延迟一秒,每隔3秒执行一次任务
    }
}

结果(注意观察时间间隔):

22:10:22.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:25.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:28.168 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:31.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
22:10:34.167 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample4 - schedule run

五、线程池的合理配置

1.CPU密集型

尽量使用较小的线程池,一般Cpu核心数+1

因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销

2.IO密集型

方法一:可以使用较大的线程池,一般CPU核心数 * 2

IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用cpu时间

方法二:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

3、混合型

可以将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,按情况而定。

相关推荐

B站收藏视频失效?mybili 收藏夹备份神器完整部署指南

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:羊刀仙很多B站用户都有过类似经历:自己精心收藏的视频突然“消失”,点开一看不是“已被删除”,就是“因UP主设置不可见”。而B站并不会主动通知...

中间件推荐初始化配置

Redis推荐初始化配置bind0.0.0.0protected-modeyesport6379tcp-backlog511timeout300tcp-keepalive300...

Redis中缓存穿透问题与解决方法

缓存穿透问题概述在Redis作为缓存使用时,缓存穿透是常见问题。正常查询流程是先从Redis缓存获取数据,若有则直接使用;若没有则去数据库查询,查到后存入缓存。但当请求的数据在缓存和数据库中都...

后端开发必看!Redis 哨兵机制如何保障系统高可用?

你是否曾在项目中遇到过Redis主服务器突然宕机,导致整个业务系统出现数据读取异常、响应延迟甚至服务中断的情况?面对这样的突发状况,作为互联网大厂的后端开发人员,如何快速恢复服务、保障系统的高可用...

Redis合集-大Key处理建议

以下是Redis大Key问题的全流程解决方案,涵盖检测、处理、优化及预防策略,结合代码示例和最佳实践:一、大Key的定义与风险1.大Key判定标准数据类型大Key阈值风险场景S...

深入解析跳跃表:Redis里的&quot;老六&quot;数据结构,专治各种不服

大家好,我是你们的码农段子手,今天要给大家讲一个Redis世界里最会"跳科目三"的数据结构——跳跃表(SkipList)。这货表面上是个青铜,实际上是个王者,连红黑树见了都要喊声大哥。...

Redis 中 AOF 持久化技术原理全解析,看完你就懂了!

你在使用Redis的过程中,有没有担心过数据丢失的问题?尤其是在服务器突然宕机、意外断电等情况发生时,那些还没来得及持久化的数据,是不是让你夜不能寐?别担心,Redis的AOF持久化技术就是...

Redis合集-必备的几款运维工具

Redis在应用Redis时,经常会面临的运维工作,包括Redis的运行状态监控,数据迁移,主从集群、切片集群的部署和运维。接下来,从这三个方面,介绍一些工具。先来学习下监控Redis实时...

别再纠结线程池大小 + 线程数量了,没有固定公式的!

我们在百度上能很轻易地搜索到以下线程池设置大小的理论:在一台服务器上我们按照以下设置CPU密集型的程序-核心数+1I/O密集型的程序-核心数*2你不会真的按照这个理论来设置线程池的...

网络编程—IO多路复用详解

假如你想了解IO多路复用,那本文或许可以帮助你本文的最大目的就是想要把select、epoll在执行过程中干了什么叙述出来,所以具体的代码不会涉及,毕竟不同语言的接口有所区别。基础知识IO多路复用涉及...

5分钟学会C/C++多线程编程进程和线程

前言对线程有基本的理解简单的C++面向过程编程能力创造单个简单的线程。创造单个带参数的线程。如何等待线程结束。创造多个线程,并使用互斥量来防止资源抢占。会使用之后,直接跳到“汇总”,复制模板来用就行...

尽情阅读,技术进阶,详解mmap的原理

1.一句话概括mmapmmap的作用,在应用这一层,是让你把文件的某一段,当作内存一样来访问。将文件映射到物理内存,将进程虚拟空间映射到那块内存。这样,进程不仅能像访问内存一样读写文件,多个进程...

C++11多线程知识点总结

一、多线程的基本概念1、进程与线程的区别和联系进程:进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程;线程:是运行中的实际的任务执行者。可以说,进程中包含了多...

微服务高可用的2个关键技巧,你一定用得上

概述上一篇文章讲了一个朋友公司使用SpringCloud架构遇到问题的一个真实案例,虽然不是什么大的技术问题,但如果对一些东西理解的不深刻,还真会犯一些错误。这篇文章我们来聊聊在微服务架构中,到底如...

Java线程间如何共享与传递数据

1、背景在日常SpringBoot应用或者Java应用开发中,使用多线程编程有很多好处,比如可以同时处理多个任务,提高程序的并发性;可以充分利用计算机的多核处理器,使得程序能够更好地利用计算机的资源,...

取消回复欢迎 发表评论: