码农翻身

Tomcat源码篇之线程池设计

- by MRyan, 2023-01-20



本系列针对于 Tomcat 版本为 8.5X

文章已收录至精进Tomcat系列 系列其它文章 https://www.wormholestack.com/tag/Tomcat/

源码阅读环境:https://gitee.com/M-Analysis/source_tomcat8 已填充关键注释


JUC 原生线程池前置知识可以参考深入Java线程池

原生线程池提交任务流程

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
             // 线程池状态和线程数的整数
        int c = ctl.get();
        // 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
        if (workerCountOf(c) < corePoolSize) { 
            // 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
         // 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
              // 二次状态检查
            int recheck = ctl.get(); 
              // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
              // 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0) 
                  // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
                addWorker(null, false);
        }
             // 如果 workQueue 队列满了,那么进入到这个分支 以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false)) 
            reject(command);
 }
  • 当前线程数小于核心线程数,则创建一个新的线程来执行任务
  • 当前线程数大于等于核心线程数,且阻塞队列未满,则将任务添加到队列中
  • 如果阻塞队列已满,当前线程数大于等于核心线程数,当前线程数小于最大线程数,则创建并启动一个线程来执行新提交的任务(这里可以继续看下面的分析)
  • 若当前线程数大于等于最大线程数,且阻塞队列已满,此时会执行拒绝策略

如下图所示:

核心思想就是就是先让核心线程数的线程工作,多余的任务统统塞到阻塞队列,阻塞队列塞不下才再多创建线程来工作,这种情况下当大量请求提交时,大量的请求很有可能都会被阻塞在队列中,而线程还没有创建到最大线程数,导致用户请求处理很慢,用户体验很差,而且当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。

如何解决这个问题呢?

我们有没有办法让线程池更激进一点呢,优先开启更多的线程,而把队列当成一个后备方案。

那 Tomcat 如何解决这个问题呢?

Tomcat 自己实现了 ThreadPoolExecutor 方法

重写了execute()方法

 public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            executeInternal(command);
        } catch (RejectedExecutionException rx) {
            // 判断如果任务队列类型是 TaskQueue,则尝试一次将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略
            if (getQueue() instanceof TaskQueue) {
                 final TaskQueue queue = (TaskQueue) getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    

以上代码其实就是当抛出异常拒绝策略了再给一次机会,尝试往阻塞队列里插任务,尽最大努力的去执行任务。

executeInternal 实现代码如下:

   public void executeInternal(Runnable command) {
        if (command == null)
            throw new NullPointerException();
             // 线程池状态和线程数的整数
        int c = ctl.get();
        // 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
        if (workerCountOf(c) < corePoolSize) { 
            // 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
         // 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
              // 二次状态检查
            int recheck = ctl.get(); 
              // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
              // 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0) 
                  // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
                addWorker(null, false);
        }
             // workQueue.offer(command)返回false,以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false)) 
            reject(command);
   }

新增阻塞队列 TaskQueue 继承了 LinkedBlockingQueue,重写了offer()方法,

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

     @Override
    public boolean offer(@NonNull Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor.");
        }
        int currentPoolThreadSize = executor.getPoolSize();
        // 线程池中的线程数等于最大线程数的时候,就将任务放进队列等待工作线程处理
        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
            return super.offer(runnable);
        }
        // 如果当前未执行的任务数量小于等于当前线程数,说明还有剩余的worker线程,就将任务放进队列等待工作线程处理
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }
        // 如果当前线程数大于核心线程,但小于最大线程数量,则直接返回false,外层会让线程池创建新的线程来执行任务
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

}

上述代码我们看到,每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败(currentPoolThreadSize < executor.getMaximumPoolSize()) 返回 false。

外层 executeInternal 方法则执行 addWorker(command, false),创建新的线程来执行任务。

如下图所示:

img

简单来说就是重写了execute()方法,当抛出拒绝策略了尝试一次往阻塞队列里插入任务,尽最大努力的去执行任务,新增阻塞队列继承了 LinkedBlockingQueue,重写了offer()方法,重写了offer()方法,每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败。进而让线程池创建新线程来处理任务。

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://www.wormholestack.com/archives/669/
2024 © MRyan 40 ms