我们知道线程池是通过复用线程来实现的,那么在线程池中,线程是怎么创建与销毁的呢?我们通过源码来一探究竟。
给线程池添加一个任务是通过execute方法来实现的,那我们就从这个方法入手:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //当前线程数小于核心线程数——新建核心线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //当前线程数大于或等于核心线程数,且线程池处于Running状态——任务加入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //队列添加失败(队列满了)——新增非核心线程 else if (!addWorker(command, false)) reject(command); }
参考代码及注释,在execute方法中,针对欲执行的任务command会有3种处理方法:(按照顺序对应代码中标红部分)
1、新增核心线程执行
2、添加到任务队列
3、新增非核心线程执行
线程池中线程的创建就在addWorker方法中:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
在线程池中,线程的表现形式为一个Worker类(实现了Runnable接口,并在构造函数中实例化了一个任务为自己this的线程),当成功创建一个Worker对象之后,会启动运行自己的线程,也就是说新创建的线程进入了工作状态。
在addWorker方法中有一个core参数用来区分是核心线程还是非核心线程,但是仅在第一处标红的地方使用到,作用就是用来判断当前线程池是否满载(核心线程满载依据为最大核心线程是;非核心线程满载依据为最大线程数)。也就是说,在线程池中,核心线程和非核心线程本质上没有任何区别。那么核心线程是如何做到完成任务后不被销毁,而非核心线程完成任务后就会被销毁呢?继续往下看。
经过上述操作。我们已经创建了一个线程并启动了他,而具体执行业务逻辑的代码则在Worker对象中的run方法,run方法最终调用runWorker方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
通过代码可以看到,在runWorker方法中,线程第一次会将创建时的构造参数firstTask赋值给task,并调用task的run方法。接下来会循环从getTask方法中继续获取task并调用task的run方法直到task为null停止。
所以线程池创建的线程会经历如下状态:创建——工作——工作——n次工作——无工作可做——结束(销毁)
也就是说线程池中的线程不是被动销毁的,而是线程自己本身发现无任务可做之后,整个生命周期自然而然的结束。到这里已经解释了非核心线程执行完任务之后的销毁操作,那么核心线程执行完任务之后是如何保持不销毁的呢?秘密就在于getTask方法中:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
在getTask方法中,通过allowCoreThreadTimeOut与当前线程数及核心线程数等配置获取一个布尔值timed,这个值就是用来控制线程超时时间的标记。
当timed为true时,会通过阻塞队列来实现线程的阻塞从而达到线程不销毁的目的;当timed为false时,则不会阻塞,此时如果获取到task为null,则线程自然结束销毁。
总结:
1、线程池是由一堆线程+阻塞队列组成。可以类比成MQ系统,线程就是消费者、阻塞队列就是MQ、execute方法就是生产者。
2、线程池中虽然有核心线程和非核心线程之分,但是本质上是没有区别的。线程最终是阻塞等待任务还是销毁取决于当前线程在getTask方法时刻线程池整体的状态。也就是说一个线程是核心线程还是非核心线程是不确定的。
3、线程池中线程的销毁不是线程池来销毁的,而是线程本身的行为,线程从创建开始就一直工作,直到无任务可工作之后会由getTask方法的阻塞状态来决定销毁(非核心线程)还是阻塞(核心线程)。
参考资料:
阻塞队列LinkedBlockingQueue和ArrayBlockingQueue的异同