码农翻身

Tomcat源码篇之线程池设计

- by MRyan, 2023-01-20



本系列针对于 Tomcat 版本为 8.5X

文章已收录至精进Tomcat系列 系列其它文章 https://www.wormholestack.com/tag/Tomcat/

源码阅读环境:https://gitee.com/M-Analysis/source_tomcat8 已填充关键注释


JUC 原生线程池前置知识可以参考深入Java线程池

原生线程池提交任务流程

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
             // 线程池状态和线程数的整数
        int c = ctl.get();
        // 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
        if (workerCountOf(c) < corePoolSize) { 
            // 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
         // 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
              // 二次状态检查
            int recheck = ctl.get(); 
              // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
              // 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0) 
                  // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
                addWorker(null, false);
        }
             // 如果 workQueue 队列满了,那么进入到这个分支 以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false)) 
            reject(command);
 }
  • 当前线程数小于核心线程数,则创建一个新的线程来执行任务
  • 当前线程数大于等于核心线程数,且阻塞队列未满,则将任务添加到队列中
  • 如果阻塞队列已满,当前线程数大于等于核心线程数,当前线程数小于最大线程数,则创建并启动一个线程来执行新提交的任务(这里可以继续看下面的分析)
  • 若当前线程数大于等于最大线程数,且阻塞队列已满,此时会执行拒绝策略

如下图所示:

核心思想就是就是先让核心线程数的线程工作,多余的任务统统塞到阻塞队列,阻塞队列塞不下才再多创建线程来工作,这种情况下当大量请求提交时,大量的请求很有可能都会被阻塞在队列中,而线程还没有创建到最大线程数,导致用户请求处理很慢,用户体验很差,而且当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。

如何解决这个问题呢?进而提升吞吐量

我们有没有办法让线程池更激进一点呢,优先开启更多的线程,而把队列当成一个后备方案。

那 Tomcat 如何解决这个问题呢?

Tomcat 自己实现了 ThreadPoolExecutor 方法

重写了execute()方法

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  
  ...
  
  public void execute(Runnable command, long timeout, TimeUnit unit) {
      submittedCount.incrementAndGet();
      try {
          //调用Java原生线程池的execute去执行任务
          super.execute(command);
      } catch (RejectedExecutionException rx) {
          // 判断如果任务队列类型是 TaskQueue,则尝试一次将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略
          if (super.getQueue() instanceof TaskQueue) {
              final TaskQueue queue = (TaskQueue)super.getQueue();
              try {
                  //继续尝试把任务放到任务队列中去
                  if (!queue.force(command, timeout, unit)) {
                      submittedCount.decrementAndGet();
                      //如果缓冲队列也满了,插入失败,执行拒绝策略。
                      throw new RejectedExecutionException("...");
                  }
              } 
          }
      }
}

从这个方法你可以看到,Tomcat 线程池的 execute 方法会调用 Java 原生线程池的 execute 去执行任务,如果总线程数达到 maximumPoolSize,Java 原生线程池的 execute 方法会抛出 RejectedExecutionException 异常,但是这个异常会被 Tomcat 线程池的 execute 方法捕获到,并继续尝试把这个任务放到任务队列中去;如果任务队列也满了,再执行拒绝策略。

简单来说就是:当总线程数达到最大线程 maximumPoolSize 时,不是立刻执行拒绝策略。而是先尝试将任务投递到任务队列中(再给一次机会)尝试往阻塞队列里插任务,尽最大努力的去执行任务,如果任务队列此时仍然是满的,再执行拒绝策略。

当然你会发现,在 Tomcat 线程池的 execute 方法最开始有这么一行:

submittedCount.incrementAndGet();

这行代码的意思把 submittedCount 这个原子变量加一,并且在任务执行失败,抛出拒绝异常时,将这个原子变量减一:

submittedCount.decrementAndGet();

其实 Tomcat 线程池是用这个变量 submittedCount 来维护已经提交到了线程池,但是还没有执行完的任务个数。Tomcat 为什么要维护这个变量呢?

这跟 Tomcat 的定制版的任务队列有关。Tomcat 的任务队列 TaskQueue 扩展了 Java 中的 LinkedBlockingQueue,我们知道 LinkedBlockingQueue 默认情况下长度是没有限制的,除非给它一个 capacity。因此 Tomcat 给了它一个 capacity,TaskQueue 的构造函数中有个整型的参数 capacity,TaskQueue 将 capacity 传给父类 LinkedBlockingQueue 的构造函数。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  public TaskQueue(int capacity) {
      super(capacity);
  }
  ...
}

这个 capacity 参数是通过 Tomcat 的 maxQueueSize 参数来设置的,但问题是默认情况下 maxQueueSize 的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题,TaskQueue 重写了 LinkedBlockingQueue 的 offer 方法,在合适的时机返回 false,返回 false 表示任务添加失败,这时线程池会创建新的线程。那什么是合适的时机呢?请看下面 offer 方法的核心源码:

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  ...
   @Override
  //线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
  public boolean offer(Runnable o) {

      //如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
      if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
          return super.offer(o);
          
      //执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
      //表明是可以创建新线程的,那到底要不要创建呢?分两种情况:
      
      //1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
      if (parent.getSubmittedCount()<=(parent.getPoolSize())) 
          return super.offer(o);
          
      //2. 如果已提交的任务数大于当前线程数,线程不够用了,返回false去创建新线程
      if (parent.getPoolSize()<parent.getMaximumPoolSize()) 
          return false;
          
      //默认情况下总是把任务添加到任务队列
      return super.offer(o);
  }
  
}

从上面的代码我们看到,只有当前线程数大于核心线程数、小于最大线程数,并且已提交的任务个数大于当前线程数时,也就是说线程不够用了,但是线程数又没达到极限,才会去创建新的线程。这就是为什么 Tomcat 需要维护已提交任务数这个变量,它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程。

外层 executeInternal 方法则执行 addWorker(command, false),创建新的线程来执行任务。

executeInternal 实现代码如下:

```java
   public void executeInternal(Runnable command) {
        if (command == null)
            throw new NullPointerException();
             // 线程池状态和线程数的整数
        int c = ctl.get();
        // 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
        if (workerCountOf(c) < corePoolSize) { 
            // 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
         // 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
              // 二次状态检查
            int recheck = ctl.get(); 
              // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command)) 
                reject(command);
              // 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0) 
                  // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
                addWorker(null, false);
        }
             // workQueue.offer(command)返回false,以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false)) 
            reject(command);
   }
```

当然默认情况下 Tomcat 的任务队列是没有限制的,你可以通过设置 maxQueueSize 参数来限制任务队列的长度。

如下图所示:

img

  1. 判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务
  2. 如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务
  3. 如果当前线程数等于最大线程数,则将任务放入任务队列等待执行
  4. 如果队列已满,则执行拒绝策略

Tomcat 线程池和 Java 原生线程池的区别是

  1. Tomcat 线程池当核心线程数满了,但是还未达到最大线程数量的时候,直接触发非核心线程的创建,而不是和 JUC 线程池一样加入队列。
  2. Tomcat 在线程总数达到最大数时,不是立即执行拒绝策略,而是再尝试向任务队列添加任务,添加失败后再执行拒绝策略。

为什么要这么设计呢?

默认情况下,tomcat的 任务队列 TaskQueue的 capacity 是 Integer.MAX_VALUE。

这样的话,当线程数达到核心线程数以后,再来新的任务都会被投递到任务队列中,就没有办法再创建新线程了,这样肯定是不行的。

所以,tomcat重写了LinkedBlockingQueue的offer方法,如果当前提交的任务数submittedCount大于核心线程数,并且小于最大线程数的情况下,就去创建一个新线程。

这样,就避免了无限往队列中投递任务的情况。

添加了重试机制的好处:

因为有可能第一次尝试放队列是满的,失败,再尝试创建临时线程,也满了,但是这个过程中,队列中的任务可能被临时线程消费了一部分,再往队列中送可能会成功。


简单来说对于一个新任务,有空闲的线程就先用空闲的线程,线程不够用又没达到上限就去创建新线程,线程达到上限就扔到队列排队去,队列满了执行重试机制,重试失败那就没办法了,执行拒绝策略。

白话说Tomcat的线程池总是优先尝试新建线程,如果达到上限了,再尝试将任务放入阻塞队列。由于是IO密集型任务,执行时间一般都不会太长,所以阻塞队列大概率不会排队太多造成OOM。

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://www.wormholestack.com/archives/669/
2025 © MRyan 40 ms