线程池中状态与线程数的设计分析(ThreadPoolExecutor中ctl变量)

预备知识

源码分析

我们把ThreadPoolExecutor中的状态和状态相关的方法复制出来,然后创建一个线程池,在运行中的时候分析线程池的状态和线程数,于是有了下面例子:

@Slf4j public class ThreadPoolExecutorCtlAnalysis {     private static final int COUNT_BITS = Integer.SIZE - 3;     private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 000,11111111111111111111111111111      // runState is stored in the high-order bits     private static final int RUNNING = -1 << COUNT_BITS;  // 111,00000000000000000000000000000     private static final int SHUTDOWN = 0 << COUNT_BITS;  // 000,00000000000000000000000000000     private static final int STOP = 1 << COUNT_BITS;      // 001,00000000000000000000000000000     private static final int TIDYING = 2 << COUNT_BITS;   // 010,00000000000000000000000000000     private static final int TERMINATED = 3 << COUNT_BITS;// 011,00000000000000000000000000000      // Packing and unpacking ctl      // RUNNING(3'thread) 111,00000000000000000000000000011     // ~CAPACITY         111,00000000000000000000000000000     // RESULT            111,00000000000000000000000000000     // 与操作取高位获取的就是ctl中保存的的线程池的状态     private static int runStateOf(int c) {         return c & ~CAPACITY;     }      // RUNNING(3'thread) 111,00000000000000000000000000011     // CAPACITY          000,11111111111111111111111111111     // RESULT            000,00000000000000000000000000011     // 与操作取低位获取的就是ctl中保存的worker数量     private static int workerCountOf(int c) {         return c & CAPACITY;     }      private static Runnable buildRunnableTask() {         return () -> {             try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             log.info("Task finished.");         };     }      private static int getCtlValue(ThreadPoolExecutor executor, Field field) {         //noinspection ConstantConditions         return ((AtomicInteger) ReflectionUtils.getField(field, executor)).get();     }      private static String formatBinaryString(int state) {         StringBuilder binaryString = new StringBuilder(Integer.toBinaryString(state));         if (binaryString.length() < Integer.SIZE) {             for (int i = binaryString.length(); i < Integer.SIZE; i++) {                 binaryString.insert(0, "0");             }         }         return binaryString.substring(0, 3) + "," + binaryString.substring(3, Integer.SIZE);     }      private static void peekThreadPoolExecuteState(ThreadPoolExecutor executor, Field ctlField) {         log.info("------------------- ThreadPoolExecuteState -------------------");         int ctlValue = getCtlValue(executor, ctlField);         log.info("getCtlValue  : {}", formatBinaryString(ctlValue));         log.info("workerCountOf: {}", workerCountOf(ctlValue));         log.info("Is    RUNNING: {}", runStateOf(ctlValue) == RUNNING);         log.info("Is   SHUTDOWN: {}", runStateOf(ctlValue) == SHUTDOWN);         log.info("Is       STOP: {}", runStateOf(ctlValue) == STOP);         log.info("Is    TIDYING: {}", runStateOf(ctlValue) == TIDYING);         log.info("Is TERMINATED: {}", runStateOf(ctlValue) == TERMINATED);     }      public static void main(String[] args) throws NoSuchFieldException, InterruptedException {         // 打印出来看看几种状态的二进制表示         log.info("{} --> CAPACITY", formatBinaryString(CAPACITY));         log.info("{} --> RUNNING", formatBinaryString(RUNNING));         log.info("{} --> STOP", formatBinaryString(STOP));         log.info("{} --> TERMINATED", formatBinaryString(TERMINATED));           // 创建一个线程池,运行3个任务         ThreadPoolExecutor executor = new ThreadPoolExecutor(                 1, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));         executor.submit(buildRunnableTask());         executor.submit(buildRunnableTask());         executor.submit(buildRunnableTask());         // 休眠一秒钟,可以拿到中间状态的ctl         Thread.sleep(1000);         log.info("getActiveCount(): {}", executor.getActiveCount());         // 通过反射能拿到ThreadPoolExecutor的ctl的值         Field ctlField = ThreadPoolExecutor.class.getDeclaredField("ctl");         ctlField.setAccessible(true);         // 线程池运行中的状态可通过ctl拿到         peekThreadPoolExecuteState(executor, ctlField);         // 终止线程池,再来看看线程池中ctl的状态         executor.shutdownNow();         peekThreadPoolExecuteState(executor, ctlField);         // 休眠2秒钟,看看线程池最终的状态         Thread.sleep(2000);         peekThreadPoolExecuteState(executor, ctlField);     } } 

在看运行结果之前,我们先看下ThreadPoolExecutor中的几处涉及到状态变更的方法实现。

submit()源码分析

public Future<?> submit(Runnable task) {     if (task == null) throw new NullPointerException();     RunnableFuture<Void> ftask = newTaskFor(task, null);     execute(ftask);     return ftask; } 

最终调用的是内部的execute方法:

public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();       int c = ctl.get();     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     if (isRunning(c) && workQueue.offer(command)) {         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     else if (!addWorker(command, false))         reject(command); } 

这个方法不是特别复杂,我们本文的重点是要看看它的addWorker()方法,这个不复制太多逻辑,关键在两行:

private boolean addWorker(Runnable firstTask, boolean core) {     int c = ctl.get();     ...         compareAndIncrementWorkerCount(c)     ... } private boolean compareAndIncrementWorkerCount(int expect) {     return ctl.compareAndSet(expect, expect + 1); } 

这里控制的是ctl中工作线程数(wc:WorkerCount)的变更,即整形低29位的自增不会影响到高3位的状态:

RUNNING(0'wc) 111,00000000000000000000000000000 RUNNING(1'wc) 111,00000000000000000000000000001 

所以可预见的输出结果就是:

workerCountOf(): 1 Is Running: true Is Stop: false 

注意的是这些值都从ctl属性中得来。

shutdownNow()源码分析

在我们的例子中,我们调用了shutdownNow()方法来改变线程池的状态。

public List<Runnable> shutdownNow() {     List<Runnable> tasks;     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         checkShutdownAccess();         advanceRunState(STOP);         interruptWorkers();         tasks = drainQueue();     } finally {         mainLock.unlock();     }     tryTerminate();     return tasks; } 

这里我们

区块链毕设网(www.qklbishe.com)全网最靠谱的原创区块链毕设代做网站
部分资料来自网络,侵权联系删除!
资源收费仅为搬运整理打赏费用,用户自愿支付 !
qklbishe.com区块链毕设代做网专注|以太坊fabric-计算机|java|毕业设计|代做平台 » 线程池中状态与线程数的设计分析(ThreadPoolExecutor中ctl变量)

提供最优质的资源集合

立即查看 了解详情