线程池的概述
线程池的内部构建了一个生产者-消费者模型,将线程和任务进行解耦,从而能够良好的缓冲任务,复用线程。同时线程池的运行主要分成两个部分:任务管理、线程管理。
那么生产者-消费者模型体现在哪儿?任务管理
可以视为生产者
,线程管理
视为消费者
。任务进入线程池中,线程池会判断其后续的流程:
- 直接申请线程执行任务
- 任务缓冲进入阻塞队列,等待线程执行
- 拒绝该任务
以上属于任务管理
,线程管理
主要为根据任务请求进行线程的分配,线程执行完之后,继续获取新的任务执行,最后线程获取不到任务,到一定时间之后,该线程被回收。
在Java中,线程池的主要实现类为ThreadPoolExecutor
,其实阅读其源码,就可以理解线程池中的所有的思想,以及如何实现。阅读套路:构造方法—->其他方法及属性。
四个构造方法:
其共有四中构造方法,参数分别有5、6、6、7个
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
...
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
...
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
...
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
必备的参数(5个)
corePoolSize:线程池中核心线程的数量
可以形象理解为餐馆就餐中的餐桌,正常情况下的餐桌数量就是corePoolSize。
maximumPoolSize:线程池中线程总数最大值
同上的例子,比作为就餐时间,因为人流量多,老板从后厨新加几个餐桌之后的数量。
在线程池中,从某方面可以将线程分为核心线程和非核心线程,核心线程默认会一直存在在线程池中,即使什么也不干。核心线程数量就是corePoolSize,非核心线程,如果长时间限制的话,会被回收(这和餐桌的例子很契合哇😄)。最大线程总数(maximumPoolSize)= 核心线程数量(corePoolSize) + 非核心线程数量
keepAliveTime:非核心线闲置超时时长
非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。
TimeUnit unit
枚举类型: 包含有纳秒、微妙、毫秒、秒、分、小时、天
BlockingQueue\
workQueue:阻塞队列,用于维护等待执行的Runnable任务对象 常用的几个阻塞队列:
LinkedBlockingQueue
链式阻塞队列,底层数据结构是链表,默认大小是
Integer.MAX_VALUE
,也可以指定大小。ArrayBlockingQueue
数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
SynchronousQueue
同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
DelayQueue
延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
非必备的核心参数(2个)
ThreadFactory threadFactory
- 用于自定义线程池的名字、以及兜底的异常处理策略等
RejectedExecutionHandler handler
拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
使用线程池的主要原因有如下:
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
线程池的生命周期
线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。所以线程池中也有自己的状态。通过源码了解到ThreadPoolExecutor
类中通过volatile int 定义了runState变量表示线程池的状态。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
运行状态 | 状态描述 | 转换方法 |
---|---|---|
RUNNING | 能够接受新的任务,并且也能处理阻塞队列中的任务 | 线程池创建之后 |
SHUTDOWN | 不再接收新提交的任务,清除一些空闲worker,会等待阻塞队列中存储的任务执行完成。 | shutdown() |
STOP | 停止接受新的任务,中断所有线程(包括正在执行的),阻塞队列中为执行的任务全部抛弃,此时poosize = 0, 阻塞队列size = 0; | shutdownNow() |
TIDYING | 所有的任务已经终止,workerCount(有效线程数量)数量为0 | |
TERMINATED | terminated()方法执行后,进入该状态 |
任务的管理
概述中已经提及,线程池内部就是一个生产者-消费者
模型,任务管理
视为生产者
,线程管理
视为消费者
,所以接下来详细阐述任务管理
和线程管理
任务的调度
任务的调度,感觉和操作系统中处理机的调度类似,任务提交之后,进行判断
- 判断线程池的状态是否为RUNNING,如不执行了,拒绝接收该任务(线程池要保证在RUNNING状态下执行任务)。
- 如果workerCount < corePoolSize, 创建并启动一个线程,执行该任务。
- 如果workerCount >= corePoolSize && 阻塞队列未满,将该任务添加到阻塞队列中。
- 如果workerCount >=corePoolSize && 阻塞队里已满&&workerCount < maximumPoolSize,创建并启动一个线程,执行该任务
- 如果workerCount >= maximumPoolSize && 阻塞队列已满, 根据拒策略进行拒绝,默认使用是
AbortPolicy
,放弃任务并抛出异常。
ThreadPoolExecutor中任务调度的实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池状态
int c = ctl.get();
// ②
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// ③
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 3.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 3.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// ④如果workerCount >=corePoolSize && 阻塞队里已满&&workerCount < maximumPoolSize,创建并启 // 动一个线程,执行该任务
else if (!addWorker(command, false))
reject(command);
}
任务的缓冲(BlockingQueue)
这是线程池的关键,其等同于生产者-消费者
中的缓冲区,在线程池中,核心是对任务和线程的管理,上述也讲过了核心就是任务和线程的解耦,解耦的方式就是添加缓冲区。在线程池中通过阻塞队列实现缓冲区,暂存任务,工作线程从阻塞队列中获取任务。
阻塞队列常用用于生产者-消费者场景,其除了支持存储任务的线程存储元素,获取任务的线程获取元素之外,还有两个附加的操作(自己编写生产者-消费者最麻烦的地方):
- 当队列为空的时候,获取任务的线程会等待队列非空
- 当队列满的时候,存放任务的线程会等待队列可用。
这样之后,我们就可以只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。简而言之,阻塞队列(BlockQueue就是生产者存放元素的容器),其提供了四中不同插入
、移除
、检测元素
的方法:
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer() | put(e) | offer(e, time , unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() |
BlockingQueue的实现类有如下的一些
ArrayBlockingQueue
LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
- SynchronousQueue
后续会看相应的源码,并整理为博客。目前就不详细解释其区别。
任务的申请
由任务的调度可以总结出,任务的执行有两种可能
- 任务直接由新创建的线程执行
- 复用线程从阻塞队列中获取任务,执行
第一种情况出现在线程初始创建的时候,以及出现峰值的时候。第二种是线程获取任务绝大多数的情况。这里可以顺便分析ThreadPoolExecutor
是如何实现线程的复用的。
第二种的任务的申请使用的是线程池中的getTask
的方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
任务的拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
决绝策略在构造方法的时候,就已经讲解了,共四种:
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
线程的管理
Worker线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
...
}
Worker
继承了AbstractQueueSynchronnizer
,实现了Runnable
接口。
构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
通过ThreadFactory
方法创建线程,firstTask
可为空,为空的时候,也就对应线程去阻塞队列中去获取任务,若是不为空,对应的为直接执行任务。
run方法
public void run() {
runWorker(this);
}
Worker线程增加(涉及到线程的复用)
Worker
线程的增加是通过线程池中的addWorker()
方法
上篇
private boolean addWorker(Runnable firstTask, boolean core) {
// 源码中的上半部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 当前线程的总数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 1.如果core是ture,证明需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
// 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
// 如果不小于,则返回false
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
以上的部分用于判断线程数量是否超出阀值,超过了就返回为false
下篇
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker对象
w = new Worker(firstTask);
// 实例化thread对象
final Thread t = w.thread;
if (t != null) {
// 全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加入到workers set中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
创建worker
对象,并初始化一个Thread
对象,然后启动这个线程对象。t.start()
方法会调用Worker中的run
方法,run
方法又使用了runWorker()
方法,也就是线程的任务执行
线程的任务执行
runWorker()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// Worker执行firstTask或从workQueue中获取任务,如果getTask方法不返回null,循环不退出
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while
循环中,worker会不断地调用getTask
方法从阻塞队列中获取任务然后调用task.run()
执行任务,从而达到复用线程的目的。只要getTask
方法不返回null
,此线程就不会退出。
常见的线程池
Executors
类中提供的几个静态方法来创建线程池。通过前面的ThreadPoolExecutor
的构造方法,就可以明白这些线程池具体干啥的,很可惜,阻塞队列的具体实现还没有看,而且线程池的demo写的也不多,认识不深刻,所以就暂时先不写。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}