自己动手实现线程池 jdk线程池ThreadPoolExecutor工作原理解析(一)( 五 )


MyThreadPoolExecutorV1的execute方法(相比jdk的实现v1版本去掉了关于优雅停止的逻辑)/*** 提交任务,并执行* */public void execute(Runnable command) {if (command == null){throw new NullPointerException("command参数不能为空");}int currentCtl = this.ctl.get();if (workerCountOf(currentCtl) < this.corePoolSize) {// 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程boolean addCoreWorkerSuccess = addWorker(command,true);if(addCoreWorkerSuccess){// addWorker添加成功,直接返回即可return;}}// 走到这里有两种情况// 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列// 2 addWorker返回false,创建核心工作线程失败if(this.workQueue.offer(command)){// workQueue.offer入队成功if(workerCountOf(currentCtl) == 0){// 在corePoolSize为0的情况下,当前不存在存活的核心线程// 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务// 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行addWorker(null,false);}else{// 加入队列成功,且当前存在worker线程,成功返回return;}}else{// 阻塞队列已满,尝试创建一个新的非核心线程处理boolean addNonCoreWorkerSuccess = addWorker(command,false);if(!addNonCoreWorkerSuccess){// 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)reject(command);}else{// 创建非核心线程成功,成功返回return;}}}/*** 根据指定的拒绝处理器,执行拒绝策略* */private void reject(Runnable command) {this.handler.rejectedExecution(command, this);}可以看到,execute方法源码中对于任务处理的逻辑很清晰,也能与ThreadPoolExecutor运行时工作流程中所介绍的流程所匹配 。
addWorker方法(创建新的工作线程)在execute方法中当需要创建核心线程或普通线程时,便需要通过addWorker方法尝试创建一个新的工作线程 。
/*** 向线程池中加入worker* */private boolean addWorker(Runnable firstTask, boolean core) {// retry标识外层循环retry:for (;;) {int currentCtl = ctl.get();// 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)for (;;) {// 判断当前worker数量是否超过了限制int workerCount = workerCountOf(currentCtl);if (workerCount >= CAPACITY) {// 当前worker数量超过了设计上允许的最大限制return false;}if (core) {// 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数if (workerCount >= this.corePoolSize) {// 超过了核心线程数,创建核心worker线程失败return false;}} else {// 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数if (workerCount >= this.maximumPoolSize) {// 超过了最大线程数,创建非核心worker线程失败return false;}}// cas更新workerCount的值boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);if (casSuccess) {// cas成功,跳出外层循环break retry;}// compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环}}boolean workerStarted = false;MyWorker newWorker = null;try {// 创建一个新的workernewWorker = new MyWorker(firstTask);final Thread myWorkerThread = newWorker.thread;if (myWorkerThread != null) {// MyWorker初始化时内部线程创建成功// 加锁,防止并发更新final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorkerThread.isAlive()) {// 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态throw new IllegalThreadStateException();}// 加入worker集合this.workers.add(newWorker);int workerSize = workers.size();if (workerSize > largestPoolSize) {// 如果当前worker个数超过了之前记录的最大存活线程数,将其更新largestPoolSize = workerSize;}// 创建成功} finally {// 无论是否发生异常,都先将主控锁解锁mainLock.unlock();}// 加入成功,启动worker线程myWorkerThread.start();// 标识为worker线程启动成功,并作为返回值返回workerStarted = true;}}finally {if (!workerStarted) {addWorkerFailed(newWorker);}}return workerStarted;}

经验总结扩展阅读