好长时间木有写java,怕忘光了😄,今天抽空翻翻源码做些总结。总的来说实现逻辑还是比较简单清晰的。

ThreadPoolExecutor 中有维护了队列,和Worker(对应一个线程)的池子,提交的任务会交给Worker执行。

corePoolSize: 核心线程(即开启了就会常驻的线程)的数量
workQueue: 提交任务的队列(当核心池用完就会先放在队列里面)
maximumPoolSize: 线程池大的size,如果队列里放不下,就会接着起线程运行任务
RejectedExecutionHandler: 超过线程池大的size,队列也放不下就开始执行Rejected逻辑
keepAliveTime: 队列消费完数据后,线程大的keepAlive时间 ,默认allowCoreThreadTimeOut 是false,只会清理maxpool的线程。
控制
ctl: 记录线程池生命周期的状态位和运行的线程个数 ,高三位记录状态,低位记录Worker个数。通过乐观锁的方式控制线程池(查询线程个数的方法workerCountOf和workQueue中有些混淆,前者是运行的线程数,后者是提交上来存放任务)
private static final int RUNNING = -1<< COUNT_BITS; //运行中 private static final int SHUTDOWN = 0<< COUNT_BITS; //对应shutdown()方法,队列和Worker现有的方法会继续运行,清理掉空闲的Worker(是否空闲看能否持有到Woker的锁),甚至还能添加Worker private static final int STOP = 1<< COUNT_BITS; //对应shutdownNow()方法,所有的Worker都会发interrupt信号(不会保证线程被释放掉),队列也会清空 private static final int TIDYING = 2<< COUNT_BITS; //当所有的任务到停掉,ctl记录的任务数为0 private static final int TERMINATED = 3<< COUNT_BITS; //线程池正式停掉。TiDYING状态后执行terminated()(空方法)变成TERMINATED状态4.提交流程:
int c = ctl.get();
if (workerCountOf(c)< corePoolSize) {
//当前Worker数小于corePoolSize时候,会创建个新的Worker
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//corePool的线程已经起完了,就会将其提交到队列中,由Worker消费
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command); //重新检查状态,如果池子如果不是运行中,且task未来的及消费,发起拒绝策略
else if (workerCountOf(recheck) == 0) // 如果从队列中删除未成功,而且Worker数等于0
addWorker(null, false); //加个Worker把这个任务消化掉
}
else if (!addWorker(command, false)) //如果队列也满了,会向maximumPool增加新的Worker,去运行线程
reject(command); //maximumPool也装不下触发拒绝策略5.创建Worker的逻辑5.1 Check是否可以添加Worker
根据上面提交流程我们知道:
判断是否需要添加Worker由ctl中保存的Worker数决定是否addWorkder,但是多个线程之间会有竞争,进入addWorker方法时,是会存在池子已经满了的情况。
这个时候会有个乐观锁的方式,重试检查线程池状态和Worker数量,保证其他提交线程的性能不会受影响:
如果池子真的满了只能通知调用者,addWorker失败,进行其他策略
如果池子未满,就可以创建一个新的Worker
private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池还是maxium池
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}5.2 开始添加Worker
这里没啥好说的,创建个Worker并放到Woker的池子中,
这里用到互斥锁,不过锁的粒度很小,只会是在加入池子的时候上锁
入池子前还是会检查池子的状态
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs< SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s >largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}6. Woker 介绍Worker 对应线程池里的一个工作线程
持续从队列里获取任务执行
boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
if ((wc >maximumPoolSize || (timed && timedOut))
&& (wc >1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}7.Worker生命周期创建: 在上文有介绍
会根据线程池的的状态自行结束线程
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();
如果是异常退出,会退出后会重新起一个新的woker(注意这个一般代码的逻辑异常不会导致Worker异常,会被封装到submit方法返回的future中,jdk留了空方法可以监听到这种异常)
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// completedAbruptly = true 会被任务是异常退出
addWorker(null, false);
}maxium池子的空闲Worker会被释放
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
if ((wc >maximumPoolSize || (timed && timedOut))
&& (wc >1 || workQueue.isEmpty())) { //判断超时
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}被主动释放(看shutdownNow()方法)
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
Q1: 在创建线程池时,在ThreadFactory接口中指定setUncaughtExceptionHandler之后,如果线程池中的任务抛异常,是否会被UncaughtExceptionHandler捕获到
不会,在线程池的实现中,Worker的会一直持有申请到的线程,不会处理提交任务的异常,异常可以在submit 方法返回的future上获取到。
Q2:showdownNow()执行之后线程就一定都执行完嘛
只是发interrupt 信号,看上文介绍只有TYDING ,TERMINATED状态,所有任务才会执行完。一定要保持代码的健壮性,控制业务代码的生命周期。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网站名称:JDK学习笔记-线程池的实现-创新互联
路径分享:http://www.jxjierui.cn/article/jeese.html


咨询
建站咨询
