@TOC
前言
学习完了阻塞队列之后,接下来,我们学习下面试必考的知识点–线程池。用过JAVA的同学一定听过线程池的大名。下面我们就来看看这个大名鼎鼎的家伙。
为啥要用线程池呢?
第一个问题来了,为啥要使用线程池呢?直接new一个线程它不香么?就像这样
new Thread(new Runnable() {
public void run() {
longTest.countTest();
}
}).start()
简单又快捷,其中run方法的作用是用来执行一个任务单元(也就是一段代码)。start方法的作用是用来创建一个新线程,同时设置好这个线程的上下文,比如这个线程的栈,线程的状态等一系列的信息。
这些信息处理好之后这个线程才可以被调度,一旦调度,就会执行run()
方法。
但是在实际项目中是禁止这样做了,在阿里出的JAVA开发手册中就明确说了原因:
所以,直接new一个线程不香,原因主要在于创建大量相同的线程会大量的消耗系统内存,甚至会导致系统内存耗尽;同时,大量的线程会竞争CPU的调度,导致CPU过度切换。
接下来我们就看看香香的线程池的优点。
线程池的优点
- 减少在创建和销毁线程上所花费的时间和系统资源的开销
- 提高响应速度,当任务到达之后,任务可以不需要等到线程创建就能被立即执行
- 提高线程的可管理性,线程是稀缺资源,如果无限制的的创建,不仅会消耗系统资源,还会降低系统性能,使用线程池可以进行统一分配,调优和监控。
线程池长啥样呢?
前面说了一堆线程池的好处,下面我们来看看线程池的结构。
创建线程的几种方式
- 创建一个缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
由于SynchronousQueue是一个无界的队列,当任务过多时,大量的任务堆积到队列里可能会发生OOM异常(Java内存溢出异常),同时线程池的最大线程数也没有限制,创建大量线程缺点前面也说了,综上所述不推荐使用这种方式创建线程池
2. 创建固定容量的线程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
这个线程池多了一个参数ThreadFactory,这个参数的作用可以自定义线程的名称如下:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("测试-%s").build();
线程池的容量固定为传入值nThreads,任务的阻塞队列用的是LinkedBlockingQueue,没有指定队列的容量,所以队列的最大容量可达Integer.MAX_VALUE
。当有大量请求时,可能造成任务的大量堆积,发生OOM异常(Java内存溢出异常)。综上所说实际项目中不推荐使用这种方式创建线程池。
3. 创建单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
这个线程池中线程的容量只有1个,可用于一些特殊的场景下。
从上,我们知道Executors类中各种创建线程池的方法,其实内部都调用的是ThreadPoolExecutor的构造器,那么我们就来看看ThreadPoolExecutor类。
ThreadPoolExecutor类
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor类的构造方法参数比较多,下面分别介绍下各个参数:
- corePoolSize: 核心线程数的大小,就是线程池中始终保留的存活线程的数量,这个就相当于项目组的常驻组员。
- maximumPoolSize: 线程池中允许最大的线程数,当任务比较多时,可以适当增加到线程,只要总的线程数量不超过最大线程数。就相当于项目组常驻组员数+外援组员数<=最大的组员数
- keepAliveTime :空闲线程允许的最大存活时间,当一个线程空闲超过这段时间,就会被回收
- unit :存活时间的时间单位
- workQueue :保存任务的阻塞队列,所有需要线程执行的任务都会保存到这个队列中,当线程被CPU调度之后就会从队列中取出一个任务来执行。
- threadFactory: 线程工厂用来创建线程,通过这个参数可以自定义如何创建线程,例如:你可以给线程指定一个有意义的名字
- handler: 拒绝策略,针对当队列满了是 新来任务的处理方式,如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接受,至于拒绝的策略,可以通过handler这个参数来指定,ThreadPoolExector已经提供了以下4中策略。
- CallerRunsPolicy: 提交任务的线程自己去执行该任务
- AbortPolicy: 默认的拒绝策略,会抛出 RejectedExecutionException。
- DiscardPolicy: 直接丢弃任务,没有任何异常抛出
- DiscardOldestPolicy: 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
创建线程池的正确姿势是啥呢?
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("测试-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(10, 15, 1, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
就像如上所示,创建的线程池,指定了核心线程数以及最大线程数,同时,指定了任务队列的容量,这个很重要,最后,给线程指定了一个有意义的名字。
这里的相关的参数的设置要根据不同的场景进行不同的设置:
- 高并发,任务执行时间短的场景
这种场景应该是CPU密集型的计算场景,可以将核心线程数设置为CPU数+1,减少线程上下文之间的切换 - 并发不高,任务执行时间长的场景
- 假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以加大线程池中的线程数目,让CPU处理更多的业务。
- 假如是业务时间长集中在CPU操作上,也就是CPU密集型的任务,这就只能跟(1)一样,线程池中的线程数设置的少一点,减少线程上下文的切换
- 高并发,任务执行时间长的场景
解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,然后增加服务器是第二步,再是考虑线程池的设置。
线程池的执行
se.execute(new Job);
这样的方式提交一个任务到线程池,所以核心的逻辑就是execute()
函数。
源码解析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//1.获取线程池的状态
int c = ctl.get();
//2.当前线程数量小于corePoolSize时创建一个新的线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//3.如果当前线程处于运行状态,并且写入阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//4.双重检查,再次获取线程状态,如果线程状态变了(非运行状态)
//就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//5.如果当前线程池为空就新创建一个线程并执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//6.如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
方法说明:该方法核心的逻辑主要是如下三步:
- 如果当前在可运行的线程数量小于corePoolSize时,则创建一个新线程运行。
- 如果任务成功的加入到队列中,则进行双重检查,再次获取线程状态,因为线程的状态可能变成了非运行状态
- 如果是非运行状态,则尝试创建一个线程,如果失败则执行拒绝策略。
线程池的配置
按照经验,我们首先可以分析线程池需要执行的任务是那种类型:
对于 IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如CPU个数*2
对于CPU密集型任务(大量复杂的运算)应当分配较少的线程,比如CPU个数相当的大小。
如何优雅的关闭线程池?
关闭线程池无非就是两种方法shutdown()/shutdownNow()
。
这两个方法有着重要的区别:
shutdown()
执行后停止接受新任务,会把队列的任务执行完毕。shutdownNow()
也是停止接受新任务,但会中断所有的任务,将线程池的状态改成stop。shutdownNow()
更加的简单粗暴,可以根据实际场景来选用不同的方法。
如何SpringBoot中整合线程池?
首先是利用好SpringBoot的自动装配功能,配置好线程池的一些基本参数。
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/*
* 线程池名前缀
*/
private static final String threadNamePrefix = "Api-Async-";
/**
* bean的名称, 默认为首字母小写的方法名
* @return
*/
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
/*
* 核心线程数(默认线程数)
*/
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//缓冲队列数
executor.setQueueCapacity(queueCapacity);
//允许线程空闲时间(单位是秒)
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
//用来设置线程池关闭时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//线程池对拒绝任务的处理策略,CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
}
配置好线程池的基本参数时候,我们就可以使用线程池了, 只要在一个限定域为public
的方法头部加上@Async
注解即可。
@Async
public void createOrder() {
System.out.println("执行任务");
}
总结
本文主要介绍了介绍了线程池的优点,其主要就是减少创建创建和销毁线程所花费的时间和系统资源的开销,然后,介绍了Executors
中几种创建线程池的方式,不过不推荐使用,接着介绍了正确创建线程池的姿势,着重介绍了ThreadPoolExecutor类的构造器和execute()
方法。最后提到了在SpringBoot中使用线程池。
参考
线程和线程池
如何优雅的使用和理解线程池
关于Java多线程及线程池的使用看这篇就够了
https://time.geekbang.org/column/article/90771