彻底搞懂Java线程池
Table of Contents
线程池概述
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。 线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。 线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
使用场景
- 需要频繁使用线程做一些并发的工作(创建异步线程做一些异步的工作)
- 线程的创建成本高 (数据库连接池)
线程池核心原理
对于一些初始化比较麻烦且耗时的工作,通过池化技术维护一些状态是一种比较常用的手段。减少创建步骤,可以提高重复利用率,让程序运行更高效。大致的流程如下图所示。
源码分析
理解源码的最好的方式,是自己去实现一下核心流程,才能体会到作者在写代码时候的思考有多细致,代码写的有多优雅。不然你是感受不出来的。
核心流程
- 状态管理
- 任务处理
- 任务执行
状态管理
八股文开始,线程池有几种状态(RUNNING,SHUTDOWN,STOP, TIDYING,TERMINATED),他们的之间的状态是怎么扭转的?
问问自己:
- shutdown和stop有什么区别?
- shutdown或者shutdownNow之后,正在工作的线程,队列中剩余任务怎么处理的?
- shutdown过程中怎么保证并发?
- TIDYING是什么状态,什么情况会变成这种状态?
- TIDYING和 TERMINATED 状态有什么区别?
状态解读
- RUNNING:运行状态,接受新的任务并且处理队列中的任务。
- SHUTDOWN:关闭状态(调用了shutdown方法)。不接受新任务,,但是要处理队列中的任务。
- STOP:停止状态(调用了shutdownNow方法)。不接受新任务,也不处理队列中的任务,并且要中断正在处理的任务。
- TIDYING:所有的任务都已终止了,workerCount为0,线程池进入该状态后会调 terminated() 方法进入TERMINATED 状态。
- TERMINATED:终止状态,terminated() 方法调用结束后的状态。
在状态的值的设计上,Doug Lea也很细节。欣赏一下。
// 通过一个原子变量保存状态和工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// int的后29位来作为工作线程数的最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 前3位表示状态。
* 这几个状态设计就很巧妙。
* RUNING 111刚好是个负数
* 其他值是正数,而且刚好是递增的,
* 下面通过位运算来判断和更改线程状态和线程数非常高效。
* 太细节了呀。
*/
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
任务处理
脱离状态管理和并发处理,任务的处理其实很简单。用一个小小的流程图来帮助理解一下。但如果加上状态处理和并发处理的逻辑,就变得很复杂很多了。
但是我们可以思考一下,上面那些过程会出现并发问题?
void execute(Runnable command)
咋一看也还好,不就一些判断吗?
任务处理分为两个关键方法:
- execute
- addWorker
上面只是execute方法
boolean addWorker(Runnable firstTask, boolean core)
每次添加都有可能面临并发,从而导致失败,那如何保证添加的原子性,正常来讲,加锁嘛。但是加锁会不会有点太重了。每次都要触发系统调用。Doug Lea大神用的是自旋的CAS,来欣赏一下源码。
retry:
for (;;) {
int c = ctl.get();
//线程状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//工作线程数超过了最大值
return false;
if (compareAndIncrementWorkerCount(c))
//CAS成功了,可以去下面创建线程了
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 线程池状态发生了改变,从头开始
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
任务执行
注意
比较需要注意的,什么时候需要处理并发场景。这个是需要小心翼翼思考的问题。
- 添加任务,核心线程接近满了或者队列接近满了 【execute方法】
- 添加过程中,线程池状态变了【addWorker方法】
- cas增加线程数成功了,但是添加任务的时候失败了(可能是线程池状态变更了)【addWorkerFailed�方法】
- 添加线程成功了,但是启动失败了,需要回滚状态【addWorkerFailed�】
- 工作线程执行完了,或者执行失败了,应该怎么处理【runWorker方法】
总结
线程池的核心还是体现在ThreadPoolExecutor�这个类,主要体现在这几个流程
- 状态管理
- 任务处理(execute , addWorker方法)
- 任务执行 (runWorker , getTask方法)
在状态管理上,作者用了大量的自旋CAS操作来管理状态和线程数的变更。在任务添加和执行过程中甚至也用到了一下recheck的方式。 整体的逻辑,刚开始看可能会比较手足无措,但是当你尝试自己去写的时候,你会发现作者的实现思路可能不一致,你就会去想作者为什么这样写。然后追着主流程慢慢捋,你才能慢慢感受到作者的思考的细致,以及借鉴一些思路,这才是你的学习与成长。享受这个过程!