码农翻身

Tomcat源码篇之HTTP请求处理流程

- by MRyan, 2023-01-15



本系列针对于 Tomcat 版本为 8.5X

文章已收录至精进Tomcat系列 系列其它文章 https://www.wormholestack.com/tag/Tomcat/

源码阅读环境:https://gitee.com/M-Analysis/source_tomcat8 已填充关键注释


当请求从客户端发起之后,Tomcat 的处理流程是怎么样的呢?

1. Tomcat 线程模型结构

不知道你有没有注意到,上文《Tomcat源码篇之启动流程》中,Tomcat 成功启动了之后,会开启 6 个线程核心线程。

uTools_1673270800355

其中 1 个用户线程,5 个守护线程。

守护线程

所谓守护线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程。这种线程并不属于程序中不可或缺的部分,用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。反过来说,只要任何非守护线程还在运行,程序就不会终止。

将线程转换为守护线程可以通过调用 Thread 对象的 setDaemon(true) 方法来实现。


在《Tomcat源码篇之启动流程》中我们简要介绍了以上线程的创建过程

Engine 启动时会开启 ContainerBackgroundProcessor 守护线程作用于调用子容器的backgroundProcess 方法。

Connector 启动时会开启 AsyncTimeout 守护线程作用于检测超时的请求,并将该请求再转发到工作线程池处理。

Endpoint 启动时会创建 Work-exec 工作者线程池, 默认 10 个等待处理,作用于真正的任务处理工作。

同时也会创建 Poller 守护线程,用于对接受者线程生产的消息(或事件)进行处理。

还会创建 Acceptor 守护线程,端口 8080,用于监听套接字,将已连接套接字转给Poller线程。


Connector 组件是 Tomcat 最核心的两个组件之一,主要的职责就是负责接收客户端连接和客户端请求的处理加工,上述核心线程大部分都作用于 Connector 组件上。

《Tomcat架构的秘密》中我们简单对 Connector 进行了架构解析,了解了 Connector 组件中的几个部件,如下图所示:

image-20230109220401350


下面我们首先通过 Connector 源码的详细分析,来真正的了解上述核心线程的用途。

1.1 Acceptor线程

Acceptor 作为监听接受请求的线程,也是 Tomcat 处理一次请求的入口点。

// 已删去非关键代码
public class Acceptor<U> implements Runnable {

    private static final Log log = LogFactory.getLog(Acceptor.class);
    private static final StringManager sm = StringManager.getManager(Acceptor.class);

    private static final int INITIAL_ERROR_DELAY = 50;
    private static final int MAX_ERROR_DELAY = 1600;

    private final AbstractEndpoint<?, U> endpoint;

    public Acceptor(AbstractEndpoint<?, U> endpoint) {
        this.endpoint = endpoint;
    }


    /**
     * 接受端口来的数据
     */
    @SuppressWarnings("deprecation")
    @Override
    public void run() {

        int errorDelay = 0;
        long pauseStart = 0;

        try {
            // Loop until we receive a shutdown command
            while (!stopCalled) {

                 // 运行过程中,如果`Endpoint`暂停了,则`Acceptor`进行自旋`
                while (endpoint.isPaused() && !stopCalled) {
                    if (state != AcceptorState.PAUSED) {
                        pauseStart = System.nanoTime();
                        // Entered pause state
                        state = AcceptorState.PAUSED;
                    }
                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
                        // Paused for more than 1ms
                        try {
                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
                                Thread.sleep(10);
                            } else {
                                Thread.sleep(1);
                            }
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                }

                // 如果`Endpoint`终止运行了,则`Acceptor`也会终止
                if (stopCalled) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    // 如果请求达到了最大连接数,则wait直到连接数降下来
                    endpoint.countUpOrAwaitConnection();

                    // Endpoint might have been paused while waiting for latch
                    // If that is the case, don't accept new connections
                    if (endpoint.isPaused()) {
                        continue;
                    }

                    U socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        // 接受下一次连接的socket 接受8080端口数据
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {
                        // We didn't get a socket
                        endpoint.countDownConnection();
                        if (endpoint.isRunning()) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (!stopCalled && !endpoint.isPaused()) {
 
                        // 会将socket以事件的方式传递给poller队列
                        if (!endpoint.setSocketOptions(socket)) {
                            // 设置socket失败 关闭socket
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        endpoint.destroySocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    String msg = sm.getString("endpoint.accept.fail");
                    // APR specific.
                    // Could push this down but not sure it is worth the trouble.
                    if (t instanceof org.apache.tomcat.jni.Error) {
                        org.apache.tomcat.jni.Error e = (org.apache.tomcat.jni.Error) t;
                        if (e.getError() == 233) {
                            // Not an error on HP-UX so log as a warning
                            // so it can be filtered out on that platform
                            // See bug 50273
                            log.warn(msg, t);
                        } else {
                            log.error(msg, t);
                        }
                    } else {
                        log.error(msg, t);
                    }
                }
            }
        } finally {
            stopLatch.countDown();
        }
        state = AcceptorState.ENDED;
    }


}

当 Acceptor 监听到请求时,会触发以下代码

其关键代码

 endpoint.countUpOrAwaitConnection();

这一行代码的作用是检查当前最大连接数,若未达到 maxConnections 则 +1,如果请求达到了最大连接数,则等待直到连接数降下来。


socket = endpoint.serverSocketAccept();

这一行中的 serverSocket 正是 NioEndpoint 的 bind 函数中打开的 ServerSocketChannel,用于接受下一次连接的 socket 接受 8080 端口数据。


endpoint.setSocketOptions(socket)

进入 Endpoint 的 setSocketOptions 方法

@Override
    protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(bufhandler, this);
                } else {
                    channel = new NioChannel(bufhandler);
                }
            }
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            // 缓存
            connections.put(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            // Disable blocking, polling will be used
            socket.configureBlocking(false);
            socketProperties.setProperties(socket.socket());

            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            // 将Socket注册到poller队列
            poller.register(socketWrapper);
            return true;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }
        // Tell to close the socket if needed
        return false;
    }

可以看到 poller.register(socketWrapper); 将 Socket 包装成了 socketWrapper ,并注册到了 poller 队列中。

注册的方式如下:


public void register(final NioSocketWrapper socketWrapper) {  
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
                      // 向events对了中间添加事件
            addEvent(pollerEvent);
        }
    // poller事件队列
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();   

private void addEvent(PollerEvent event) {
            events.offer(event);
            if (wakeupCounter.incrementAndGet() == 0) {
                selector.wakeup();
            }
        }

将 Socket 的包装类 socketWrapper 再次封装成 poller 事件,向 events 事件队列中添加了该事件(此时并未读取数据)。

总结一下其实 Acceptor 线程主要用于监听套接字,将已连接套接字转给 Poller 线程。

1.2 Poller线程

Poller 线程在 NioEndpoint 中被定义

  public class Poller implements Runnable {
     // 省略部分代码

       private Selector selector;
  
        // poller事件队列
        private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

        public Poller() throws IOException {
            // 为每个Poller打开了一个新的Selector
            this.selector = Selector.open();
        }      
  }    

Poller 线程构造函数为每个 Poller 打开了一个新的 Selector

public class Poller implements Runnable {
/**
         * The background thread that adds sockets to the Poller, checks the
         * poller for triggered events and hands the associated socket off to an
         * appropriate processor as events occur.
         * <p>
         * Poller线程执行方法 事件处理
         */
        @Override
        public void run() {
            // Loop until destroy() is called
            // 循环
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        hasEvents = events();
                       // 将通道注册到Poller的Selector上
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                    // Either we timed out or we woke up, process events first
                    if (keyCount == 0) {
                        hasEvents = (hasEvents | events());
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }

                // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        // 真正处理事件的地方
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount, hasEvents);
            }

            getStopLatch().countDown();
        }
}

上文我们分析了 Poller#register() 方法。Poller 维持了一个 events同步队列,所以 Acceptor 接收到 Poller事件(Socket 监听事件)会放在这个队列里面,通过 events() 方法获取队列中的 Poller事件(Socket 监听事件)

hasEvents = events();
public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            // 从队列中获取数据
            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                result = true;
                NioSocketWrapper socketWrapper = pe.getSocketWrapper();
                SocketChannel sc = socketWrapper.getSocket().getIOChannel();
                int interestOps = pe.getInterestOps();
                if (sc == null) {
                    log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
                    socketWrapper.close();
                } else if (interestOps == OP_REGISTER) {
                    try {
                        sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                        log.error(sm.getString("endpoint.nio.registerFail"), x);
                    }
                } else {
                    final SelectionKey key = sc.keyFor(getSelector());
                    if (key == null) {
                         socketWrapper.close();
                    } else {
                        final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                        if (attachment != null) {
                            try {
                                int ops = key.interestOps() | interestOps;
                                attachment.interestOps(ops);
                                key.interestOps(ops);
                            } catch (CancelledKeyException ckx) {
                                cancelledKey(key, socketWrapper);
                            }
                        } else {
                            cancelledKey(key, socketWrapper);
                        }
                    }
                }
                if (running && eventCache != null) {
                    pe.reset();
                    eventCache.push(pe);
                }
            }

            return result;
        }

线程获取到了 event 队列的就绪事件,来继续看下 Poller 线程处理

 // 获取当前选择器中所有注册的(已就绪的监听事件)
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                  while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    if (socketWrapper != null) {
                        // 真正处理事件的地方
                        processKey(sk, socketWrapper);
                    }
                }

获取已连接套接字通道注册到 Poller 线程的 Selector 上的 SelectionKey,进行事件处理


接着分析下 processKey(),该方法会根据 key 的类型,来分别处理读操作和写操作

  protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk, socketWrapper);
                } else if (sk.isValid()) {
                    if (sk.isReadable() || sk.isWritable()) {
                        if (socketWrapper.getSendfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            unreg(sk, socketWrapper, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            // 处理读事件,生成Request对象
                            if (sk.isReadable()) {
                                if (socketWrapper.readOperation != null) {
                                    if (!socketWrapper.readOperation.process()) {
                                        closeSocket = true;
                                    }
                                } else if (socketWrapper.readBlocking) {
                                    synchronized (socketWrapper.readLock) {
                                        socketWrapper.readBlocking = false;
                                        socketWrapper.readLock.notify();
                                    }
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            // 处理写事件,将生成的Response对象通过socket写回客户端
                            if (!closeSocket && sk.isWritable()) {
                                if (socketWrapper.writeOperation != null) {
                                    if (!socketWrapper.writeOperation.process()) {
                                        closeSocket = true;
                                    }
                                } else if (socketWrapper.writeBlocking) {
                                    synchronized (socketWrapper.writeLock) {
                                        socketWrapper.writeBlocking = false;
                                        socketWrapper.writeLock.notify();
                                    }
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk, socketWrapper);
                            }
                        }
                    }
                } else {
                    // Invalid key
                    cancelledKey(sk, socketWrapper);
                }
            } catch (CancelledKeyException ckx) {
                cancelledKey(sk, socketWrapper);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
            }
        }

来看下 processSocket() 方法

public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = null;
            // 为提升性能,针对每个有效的链接都会缓存其Processor对象,不仅如此,当前链接关闭时,其Processor对象还会被释放到一个回收队列(升级协议不会回收)这样后续链接可以重置冰重复利用,以减少对象构造
            // 缓存中有则从缓存中获取一个Processor
            if (processorCache != null) {
                // 从processorCache里面拿一个Processor来处理socket,Processor的实现为SocketProcessor
                sc = processorCache.pop();
            }
            // 缓存中没有则根据协商协议创建一个SocketProcessor
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            //  将Processor放到Work工作线程池中执行
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper), ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
             getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

实际上它只做了两件事

  1. processorCache 里面拿一个 Processor 来处理socket,缓存中没有则根据协商协议创建爱一个Processor,Processor 的实现为 SocketProcessor
  2. Processor 放到 Work 工作线程池中执行

dispatch 参数表示是否要在工作线程中处理,上文 processKey 各处传递的参数默认都是true。

  • dispatch 为 true 且工作线程池存在时会执行 executor.execute(sc),之后是由工作线程池处理已连接套接字;
  • 否则继续由 Poller 线程自己处理已连接套接字。

AbstractEndPoint 的 createSocketProcessor 是抽象方法,NioEndPoint 实现了它:

  @Override
    protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        return new SocketProcessor(socketWrapper, event);
    }

SocketProcessor 继承了 SocketProcessorBase,而 SocketProcessorBase 实现了 Runnable。


public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    @Override
    public final void run() {
        synchronized (socketWrapper) {
             if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}

SocketProcessorBase 线程执行中会调用 抽象方法 doRun,由子类实现,也就是 SocketProcessor 的 doRun 方法,也就是说 Work 工作线程最终会执行 SocketProcessor 的 doRun方法。

   protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
          
            Poller poller = NioEndpoint.this.poller;
            if (poller == null) {
                socketWrapper.close();
                return;
            }

            try {
                // https 握手环节
                int handshake = -1;
                try {
                    if (socketWrapper.getSocket().isHandshakeComplete()) {
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                        event == SocketEvent.ERROR) {    
                        handshake = -1;
                    } else {
                        handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
                        event = SocketEvent.OPEN_READ;
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("endpoint.err.handshake"), x);
                    }
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;        
                    // 将处理逻辑交给`Handler`处理,当event为null时,则表明是一个`OPEN_READ`事件
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        poller.cancelledKey(getSelectionKey(), socketWrapper);
                    }
                } else if (handshake == -1) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } else if (handshake == SelectionKey.OP_READ) {
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE) {
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error(sm.getString("endpoint.processing.fail"), t);
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && processorCache != null) {
                    processorCache.push(this);
                }
            }
        }
    }

Handler 的关键方法是 process()

state = getHandler().process(socketWrapper, event);

主要是调用 Processor.process()方法。

 protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {

        @Override
        public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
          // 省略部分代码
          
             // 拿到Socket nio包装wrapper 监听8080端口的socket
            S socket = wrapper.getSocket();

            Processor processor = (Processor) wrapper.takeCurrentProcessor();

            if (processor == null) {
                // 当前协议创建一个Processor
                processor = getProtocol().createProcessor();
                register(processor);
            }

            SocketState state = SocketState.CLOSED
            // 关键代码 处理器处理socket
            state = processor.process(wrapper, status);

            return state;
        }
 }

public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    @Override
    protected Processor createProcessor() {
        Http11Processor processor = new Http11Processor(this, getEndpoint());
        processor.setAdapter(getAdapter());
        // 默认的 KeepAlive 情况下, 每个 Socket 处理的最多的 请求次数
        processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
        // http 当遇到文件上传时的 默认超时时间 (300 * 1000)
        processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
        processor.setDisableUploadTimeout(getDisableUploadTimeout());
        processor.setRestrictedUserAgents(getRestrictedUserAgents());
        // 最大的 Post 处理尺寸的大小 4 * 1000
        processor.setMaxSavePostSize(getMaxSavePostSize());
        return processor;
    }
}

代码比较长,我们直接看关键代码

  state = processor.process(wrapper, status);
public abstract class AbstractProcessorLight implements Processor {

    private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();


    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {

        // 处理中 设置关闭socket状态
        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
                }
                state = dispatch(nextDispatch.getSocketStatus());
                if (!dispatches.hasNext()) {
                    state = checkForPipelinedData(state, socketWrapper);
                }
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                state = checkForPipelinedData(state, socketWrapper);
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ) {
                // 调用`service()`方法
                state = service(socketWrapper);
            } else if (status == SocketEvent.CONNECT_FAIL) {
                logAccess(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }
            
        } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);

        return state;
    }

对于读操作,会调用service()方法

 public class Http11Processor extends AbstractProcessor {
  
        @Override
    public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
       // 省略部分代码,感兴趣可自行翻看源码
      
       // 关键代码,CoyoteAdapter请求映射
        getAdapter().service(request, response);
    }
 }
     

上述代码实际只完成了 2 件事

  1. 读取消息报文,解析请求行、请求体、请求头,封装成 Request 对象
  2. 生成 Response 对象
  3. 调用 Adapter.service() 方法,将生成的 Request 和 Response 对象传进去,由连接器分发请求各组件执行处理流程

1.3 小结

回顾下我们刚刚分析的 Tomcat 接受请求的流程

image-20230115001028034


分析到目前为止 Tomcat 线程模型结构应该是非常清晰了,简单总结下:

Acceptor守护线程,Poller守护线程,Worker批量工作线程

  1. Tomcat启动结束后,Accepor线程 一直接受 8080 的请求,会尝试 endpoint.setSocketOptions(socket),socket 封装为 socketWrapper,创建一个 PollerEvent 然后把请求注册到 poller 事件队列 SynchronizedQueue<PollerEvent> 上。
  2. Poller线程 一直判断是否有事件 events.poll(),事件就拿到读取 socket 的内容并处理 processSocket (socketWrapper),processSocket 的时候 poller 会进行select(),接收到 key 后提交给 worker 进行processKey,也就是把线程形成一个 processor 进行后提交给 Worker 线程池,
  3. Worker 线程池控制执行工作事件,在里面执行 processor.process(),在 process 里调用service(),交由连接器组件分发处理请求,调用连接器适配器 CoyoteAdapter.service(),最终由连接器分发到各个组件处理。

image-20230109220435952

2. 处理一次 Http 请求的流程

2.1 连接器处理请求的流程

CoyoteAdaptor 组件负责将 Connector 组件和 Engine 容器关联起来,把生成的 Request对象和响应对象 Response 传递到 Engine 容器中,调用 Pipeline。

上文我们分析过 Processor 会调用 Adapter.service() 方法,请求会经过 CoyoteAdaptor 组件,如下代码所示。

   @Override
    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {
        // 根据coyote框架的request和response对象,生成connector的request和response对象(是HttpServletRequest和HttpServletResponse的封装)
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);

        if (request == null) {
            // Create objects
            request = connector.createRequest();
            request.setCoyoteRequest(req);
            response = connector.createResponse();
            response.setCoyoteResponse(res);

            // Link objects
            request.setResponse(response);
            response.setRequest(request);

            // Set as notes
            req.setNote(ADAPTER_NOTES, request);
            res.setNote(ADAPTER_NOTES, response);

            // Set query string encoding
            req.getParameters().setQueryStringCharset(connector.getURICharset());
        }

        // 补充header
        if (connector.getXpoweredBy()) {
            response.addHeader("X-Powered-By", POWERED_BY);
        }

        boolean async = false;
        boolean postParseSuccess = false;

        req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
        req.setRequestThread();

        try {
            // Parse and set Catalina and configuration specific
            // request parameters
            // 用来处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId )
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                // 检查Value是否支持异步
                request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
                // 得到Engine首个Value()
                // 每一级Container的基础Value在完成自身处理情况下,同时要保证启动下一级Container的Value链的执行,由于请求映射已经将映射结果保存到请求对象中,因此Value直接从请求中获取下一级Container即可
                // 真正进入容器的地方,调用Engine容器下pipeline的阀门
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
            }
            if (request.isAsync()) {
                async = true;
                ReadListener readListener = req.getReadListener();
                if (readListener != null && request.isFinished()) {
                    // Possible the all data may have been read during service()
                    // method so this needs to be checked here
                    ClassLoader oldCL = null;
                    try {
                        oldCL = request.getContext().bind(false, null);
                        if (req.sendAllDataReadEvent()) {
                            req.getReadListener().onAllDataRead();
                        }
                    } finally {
                        request.getContext().unbind(false, oldCL);
                    }
                }

                Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

                if (!request.isAsyncCompleting() && throwable != null) {
                    request.getAsyncContextInternal().setErrorState(throwable, true);
                }
            } else {
                //  // 通过request.finishRequest 与 response.finishResponse(将OutputBuffer中的数据写到浏览器) 来完成整个请求
                request.finishRequest();
                response.finishResponse();
            }

        } catch (IOException e) {
            // Ignore
        } finally {
            AtomicBoolean error = new AtomicBoolean(false);
            res.action(ActionCode.IS_ERROR, error);

            if (request.isAsyncCompleting() && error.get()) {
                // Connection will be forcibly closed which will prevent
                // completion happening at the usual point. Need to trigger
                // call to onComplete() here.
                res.action(ActionCode.ASYNC_POST_PROCESS, null);
                async = false;
            }

            req.getRequestProcessor().setWorkerThreadName(null);
            req.clearRequestThread();

            // Recycle the wrapper request and response
            if (!async) {
                updateWrapperErrorCount(request, response);
                request.recycle();
                response.recycle();
            }
        }
    

上述代码只要做了这么几件事:

  1. 处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId )
  2. 解析请求,该方法会出现代理服务器、设置必要的 header 等操作
  3. 调用 Engine 容器下 pipeline 的阀门,请求处理交由容器
  4. 通过request.finishRequest 与 response.finishResponse(将OutputBuffer中的数据写到浏览器) 来完成整个请求

2.2 各容器处理请求的流程

在《Tomcat源码篇之启动流程中》我们说过 StandardEngine 的构造函数为自己的 Pipeline 添加了基本阀 StandardEngineValve

Engine请求处理

所以我们直接来看 StandardEngineValve 的 invoke 方法

inal class StandardEngineValve extends ValveBase {

    //------------------------------------------------------ Constructor
    public StandardEngineValve() {
        super(true);
    }


    // --------------------------------------------------------- Public Methods
    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // 从请求中获取Host
        Host host = request.getHost();
        // 如果不存在,则返回404
        if (host == null) {
            if (!response.isError()) {
                response.sendError(404);
            }
            return;
        }
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(host.getPipeline().isAsyncSupported());
        }

        // 调用Host容器下pipeline的阀门
        host.getPipeline().getFirst().invoke(request, response);
    }
}

该方法主要是选择合适的 Host,然后调用 Host 中 pipeline 的第一个 Valve 的 invoke() 方法。


Host请求处理

host.getPipeline().getFirst().invoke(request, response),可以看到 Host 容器先获取自己的管道,再获取第一个阀门。

StandardHost 的构造函数为自己的 Pipeline 添加了基本阀 StandardHostValve,同时 Host 管道中还存在 ErrorReportValve 与 StandardHostValve 两个 Valve 处于管道前面,基本阀门在管道最后。

ErrorReportValve 主要是检测 Http 请求过程中是否出现过什么异常, 有异常的话, 直接拼装 html 页面, 输出到客户端。

public class ErrorReportValve extends ValveBase {
    public ErrorReportValve() {
        super(true);
    }


    // --------------------------------------------------------- Public Methods

      @Override
    public void invoke(Request request, Response response) throws IOException, ServletException {

        // 将请求转发给下一个 Valve
        getNext().invoke(request, response);
    
             // 正常处理结束   
        if (response.isCommitted()) {
            if (response.setErrorReported()) {
                // Error wasn't previously reported but we can't write an error
                // page because the response has already been committed.

                // See if IO is allowed
                AtomicBoolean ioAllowed = new AtomicBoolean(true);
                response.getCoyoteResponse().action(ActionCode.IS_IO_ALLOWED, ioAllowed);

                if (ioAllowed.get()) {
                    // I/O is currently still allowed. Flush any data that is
                    // still to be written to the client.
                    try {
                        response.flushBuffer();
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                    }
                    // Now close immediately to signal to the client that
                    // something went wrong
                    response.getCoyoteResponse().action(ActionCode.CLOSE_NOW,
                            request.getAttribute(RequestDispatcher.ERROR_EXCEPTION));
                }
            }
            return;
        }

        // 判断请求过程中是否有异常发生
        Throwable throwable = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

         if (request.isAsync() && !request.isAsyncCompleting()) {
            return;
        }

        if (throwable != null && !response.isError()) {
           // 重置 response 里面的数据
            response.reset();
            // 500 错误码
            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
        }

         response.setSuspended(false);

        try {
            // 这里就是将 异常的堆栈信息组合成 html 页面, 输出到前台 
            report(request, response, throwable);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
    }

}

来看下 StandardHostValve 的 invoke 方法的实现。

final class StandardHostValve extends ValveBase {
  
    public StandardHostValve() {
        super(true);
    }

    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // 从请求中获取Context
        Context context = request.getContext();
        if (context == null) {
            // Don't overwrite an existing error
            if (!response.isError()) {
                response.sendError(404);
            }
            return;
        }

        if (request.isAsyncSupported()) {
            request.setAsyncSupported(context.getPipeline().isAsyncSupported());
        }

        boolean asyncAtStart = request.isAsync();

        try {
            context.bind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);

            if (!asyncAtStart && !context.fireRequestInitEvent(request.getRequest())) {
                return;
            }

            try {
                if (!response.isErrorReportRequired()) {
                    // 调用Context容器下pipeline的阀门
                    context.getPipeline().getFirst().invoke(request, response);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                container.getLogger().error("Exception Processing " + request.getRequestURI(), t);
                if (!response.isErrorReportRequired()) {
                    request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                    throwable(request, response, t);
                }
            }

             response.setSuspended(false);

            Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            if (!context.getState().isAvailable()) {
                return;
            }

            // Look for (and render if found) an application level error page
            if (response.isErrorReportRequired()) {
                // If an error has occurred that prevents further I/O, don't waste time
                // producing an error report that will never be read
                AtomicBoolean result = new AtomicBoolean(false);
                response.getCoyoteResponse().action(ActionCode.IS_IO_ALLOWED, result);
                if (result.get()) {
                    if (t != null) {
                        throwable(request, response, t);
                    } else {
                        status(request, response);
                    }
                }
            }

            if (!request.isAsync() && !asyncAtStart) {
                context.fireRequestDestroyEvent(request.getRequest());
            }
        } finally {
            // Access a session (if present) to update last accessed time, based
            // on a strict interpretation of the specification
            if (ACCESS_SESSION) {
                request.getSession(false);
            }

            context.unbind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);
        }
    
  
}

StandardHostValve 首先校验了 Request 是否存在 Context(在执行 CoyoteAdapter.postParseRequest 方法的时候设置进去),如果 Context 不存在,就返回 500 的错误码,然后调用了 Context 容器下 pipeline 的阀门

context.getPipeline().getFirst().invoke(request, response);,可以看到 Context 容器先获取自己的管道,再获取第一个阀门。


Context请求处理

同理,我们来看 StandardContextValve 的 invoke 实现。

final class StandardContextValve extends ValveBase {

    private static final StringManager sm = StringManager.getManager(StandardContextValve.class);

    public StandardContextValve() {
        super(true);
    }


    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Disallow any direct access to resources under WEB-INF or META-INF
        MessageBytes requestPathMB = request.getRequestPathMB();
        if ((requestPathMB.startsWithIgnoreCase("/META-INF/", 0))
            || (requestPathMB.equalsIgnoreCase("/META-INF"))
            || (requestPathMB.startsWithIgnoreCase("/WEB-INF/", 0))
            || (requestPathMB.equalsIgnoreCase("/WEB-INF"))) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Select the Wrapper to be used for this Request
        // 获取请求中的Wrapper
        Wrapper wrapper = request.getWrapper();
        if (wrapper == null || wrapper.isUnavailable()) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Acknowledge the request
        try {
            response.sendAcknowledgement(ContinueResponseTiming.IMMEDIATELY);
        } catch (IOException ioe) {
            container.getLogger().error(sm.getString(
                "standardContextValve.acknowledgeException"), ioe);
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            return;
        }

        if (request.isAsyncSupported()) {
            request.setAsyncSupported(wrapper.getPipeline().isAsyncSupported());
        }
        // 调用Wrapper容器下pipeline的阀门
        wrapper.getPipeline().getFirst().invoke(request, response);
    }
}

同理调用了 Wrapper 容器下的 pipeline 阀门


Wrapper请求处理

Wrapper 基础阀门为 StandardWrapperValve,来看下 StandardWrapperValve 的 invoke 实现

final class StandardWrapperValve extends ValveBase {
   @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Initialize local variables we may need
        boolean unavailable = false;
        Throwable throwable = null;
        // This should be a Request attribute...
        long t1 = System.currentTimeMillis();
        requestCount.incrementAndGet();
        StandardWrapper wrapper = (StandardWrapper) getContainer();
        Servlet servlet = null;
        Context context = (Context) wrapper.getParent();

        // Check for the application being marked unavailable
        if (!context.getState().isAvailable()) {
            response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                sm.getString("standardContext.isUnavailable"));
            unavailable = true;
        }

        // Check for the servlet being marked unavailable
        if (!unavailable && wrapper.isUnavailable()) {
            container.getLogger().info(sm.getString("standardWrapper.isUnavailable",
                wrapper.getName()));
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
            unavailable = true;
        }

        // Allocate a servlet instance to process this request
        try {
            if (!unavailable) {
                // 这儿调用Wrapper的allocate()方法分配一个Servlet实例
                servlet = wrapper.allocate();
            }
        } catch (UnavailableException e) {
            container.getLogger().error(
                sm.getString("standardWrapper.allocateException",
                    wrapper.getName()), e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
        } catch (ServletException e) {
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                wrapper.getName()), StandardWrapper.getRootCause(e));
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                wrapper.getName()), e);
            throwable = e;
            exception(request, response, e);
            servlet = null;
        }

        MessageBytes requestPathMB = request.getRequestPathMB();
        DispatcherType dispatcherType = DispatcherType.REQUEST;
        if (request.getDispatcherType() == DispatcherType.ASYNC) {
            dispatcherType = DispatcherType.ASYNC;
        }
        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, dispatcherType);
        request.setAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR,
            requestPathMB);
        // Create the filter chain for this request
        // 创建过滤器链,类似于Pipeline的功能
        ApplicationFilterChain filterChain =
            ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

        // Call the filter chain for this request
        // NOTE: This also calls the servlet's service() method
        Container container = this.container;
        try {
            if ((servlet != null) && (filterChain != null)) {
                // Swallow output if needed
                if (context.getSwallowOutput()) {
                    try {
                        SystemLogHandler.startCapture();
                        if (request.isAsyncDispatching()) {
                            request.getAsyncContextInternal().doInternalDispatch();
                        } else {
                            // 调用过滤器链的doFilter,最终会调用到Servlet的service方法
                            filterChain.doFilter(request.getRequest(),
                                response.getResponse());
                        }
                    } finally {
                        String log = SystemLogHandler.stopCapture();
                        if (log != null && log.length() > 0) {
                            context.getLogger().info(log);
                        }
                    }
                } else {
                    if (request.isAsyncDispatching()) {
                        request.getAsyncContextInternal().doInternalDispatch();
                    } else {
                        // 调用过滤器链的doFilter,最终会调用到Servlet的service方法
                        filterChain.doFilter
                            (request.getRequest(), response.getResponse());
                    }
                }

            }
        } catch (ClientAbortException | CloseNowException e) {
            if (container.getLogger().isDebugEnabled()) {
                container.getLogger().debug(sm.getString(
                    "standardWrapper.serviceException", wrapper.getName(),
                    context.getName()), e);
            }
            throwable = e;
            exception(request, response, e);
        } catch (IOException e) {
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            throwable = e;
            exception(request, response, e);
        } catch (UnavailableException e) {
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            //            throwable = e;
            //            exception(request, response, e);
            wrapper.unavailable(e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
            // Do not save exception in 'throwable', because we
            // do not want to do exception(request, response, e) processing
        } catch (ServletException e) {
            Throwable rootCause = StandardWrapper.getRootCause(e);
            if (!(rootCause instanceof ClientAbortException)) {
                container.getLogger().error(sm.getString(
                        "standardWrapper.serviceExceptionRoot",
                        wrapper.getName(), context.getName(), e.getMessage()),
                    rootCause);
            }
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            throwable = e;
            exception(request, response, e);
        } finally {
            // Release the filter chain (if any) for this request
            if (filterChain != null) {
                filterChain.release();
            }

            // Deallocate the allocated servlet instance
            try {
                if (servlet != null) {
                    // 释放掉Servlet及相关资源
                    wrapper.deallocate(servlet);
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                container.getLogger().error(sm.getString("standardWrapper.deallocateException",
                    wrapper.getName()), e);
                if (throwable == null) {
                    throwable = e;
                    exception(request, response, e);
                }
            }

            // If this servlet has been marked permanently unavailable,
            // unload it and release this instance
            try {
                if ((servlet != null) &&
                    (wrapper.getAvailable() == Long.MAX_VALUE)) {
                    // 如果servlet被标记为永远不可达,则需要卸载掉它,并释放这个servlet实例
                    wrapper.unload();
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                container.getLogger().error(sm.getString("standardWrapper.unloadException",
                    wrapper.getName()), e);
                if (throwable == null) {
                    exception(request, response, e);
                }
            }
            long t2 = System.currentTimeMillis();

            long time = t2 - t1;
            processingTime += time;
            if (time > maxTime) {
                maxTime = time;
            }
            if (time < minTime) {
                minTime = time;
            }
        }
    }
 
}

我们都知道 Wrapper 实际上是 servlet 的上层包装类,与 servlet 打交道。

上述代码实现了:

  1. Servlet 实例的分配 servlet = wrapper.allocate();
  2. 创建过滤器链 `ApplicationFilterChain filterChain =

       ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);`
  3. 调用过滤器链的 doFilter,最终会调用到 Servlet 的 service 方法 `filterChain.doFilter(request.getRequest(),

       response.getResponse());`
  4. 释放掉过滤器链及其相关资源,释放掉Servlet及相关资源,整个请求链路结束。


首先我们详细来看下 Servlet 实例是如何分配的

    @Override
    public Servlet allocate() throws ServletException {

        // If we are currently unloading this servlet, throw an exception
        // 卸载过程中,不能分配Servlet
        if (unloading) {
            throw new ServletException(sm.getString("standardWrapper.unloading", getName()));
        }

        boolean newInstance = false;

        // If not SingleThreadedModel, return the same instance every time
        // 如果Wrapper没有实现SingleThreadedModel,则每次都会返回同一个Servlet
        if (!singleThreadModel) {
            // Load and initialize our instance if necessary
            // 实例为null或者实例还未初始化,使用synchronized来保证并发时的原子性
            if (instance == null || !instanceInitialized) {
                synchronized (this) {
                    if (instance == null) {
                        try {
                            if (log.isDebugEnabled()) {
                                log.debug("Allocating non-STM instance");
                            }

                            // Note: We don't know if the Servlet implements
                            // SingleThreadModel until we have loaded it.
                            // 加载Servlet
                            instance = loadServlet();
                            newInstance = true;
                            if (!singleThreadModel) {
                                // For non-STM, increment here to prevent a race
                                // condition with unload. Bug 43683, test case
                                // #3
                                countAllocated.incrementAndGet();
                            }
                        } catch (ServletException e) {
                            throw e;
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            throw new ServletException(sm.getString("standardWrapper.allocate"), e);
                        }
                    }
                    // 初始化Servlet
                    if (!instanceInitialized) {
                        initServlet(instance);
                    }
                }
            }
            // 非单线程模型,直接返回已经创建的Servlet,也就是说,这种情况下只会创建一个Servlet
            if (singleThreadModel) {
                if (newInstance) {
                    // Have to do this outside of the sync above to prevent a
                    // possible deadlock
                    synchronized (instancePool) {
                        instancePool.push(instance);
                        nInstances++;
                    }
                }
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("  Returning non-STM instance");
                }
                // For new instances, count will have been incremented at the
                // time of creation
                if (!newInstance) {
                    countAllocated.incrementAndGet();
                }
                return instance;
            }
        }

        // 如果是单线程模式,则使用servlet对象池技术来加载多个Servlet
        synchronized (instancePool) {
            while (countAllocated.get() >= nInstances) {
                // Allocate a new instance if possible, or else wait
                if (nInstances < maxInstances) {
                    try {
                        instancePool.push(loadServlet());
                        nInstances++;
                    } catch (ServletException e) {
                        throw e;
                    } catch (Throwable e) {
                        ExceptionUtils.handleThrowable(e);
                        throw new ServletException(sm.getString("standardWrapper.allocate"), e);
                    }
                } else {
                    try {
                        instancePool.wait();
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }
            if (log.isTraceEnabled()) {
                log.trace("  Returning allocated STM instance");
            }
            countAllocated.incrementAndGet();
            return instancePool.pop();
        }
    }
   public synchronized Servlet loadServlet() throws ServletException {

        // Nothing to do if we already have an instance or an instance pool
        if (!singleThreadModel && (instance != null)) {
            return instance;
        }

        PrintStream out = System.out;
        if (swallowOutput) {
            SystemLogHandler.startCapture();
        }

        Servlet servlet;
        try {
            long t1 = System.currentTimeMillis();
            // Complain if no servlet class has been specified
            if (servletClass == null) {
                unavailable(null);
                throw new ServletException
                    (sm.getString("standardWrapper.notClass", getName()));
            }

            // 关键的地方,就是通过实例管理器,创建Servlet实例,而实例管理器是通过特殊的类加载器来加载给定的类
            InstanceManager instanceManager = ((StandardContext) getParent()).getInstanceManager();
            try {
                servlet = (Servlet) instanceManager.newInstance(servletClass);
            } catch (ClassCastException e) {
                unavailable(null);
                // Restore the context ClassLoader
                throw new ServletException
                    (sm.getString("standardWrapper.notServlet", servletClass), e);
            } catch (Throwable e) {
                e = ExceptionUtils.unwrapInvocationTargetException(e);
                ExceptionUtils.handleThrowable(e);
                unavailable(null);

                // Added extra log statement for Bugzilla 36630:
                // https://bz.apache.org/bugzilla/show_bug.cgi?id=36630
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("standardWrapper.instantiate", servletClass), e);
                }

                // Restore the context ClassLoader
                throw new ServletException
                    (sm.getString("standardWrapper.instantiate", servletClass), e);
            }

            if (multipartConfigElement == null) {
                MultipartConfig annotation =
                    servlet.getClass().getAnnotation(MultipartConfig.class);
                if (annotation != null) {
                    multipartConfigElement =
                        new MultipartConfigElement(annotation);
                }
            }

            // Special handling for ContainerServlet instances
            // Note: The InstanceManager checks if the application is permitted
            //       to load ContainerServlets
            if (servlet instanceof ContainerServlet) {
                ((ContainerServlet) servlet).setWrapper(this);
            }

            classLoadTime = (int) (System.currentTimeMillis() - t1);

            if (servlet instanceof SingleThreadModel) {
                if (instancePool == null) {
                    instancePool = new Stack<>();
                }
                singleThreadModel = true;
            }

            // 调用Servlet的init方法
            initServlet(servlet);

            fireContainerEvent("load", this);

            loadTime = System.currentTimeMillis() - t1;
        } finally {
            if (swallowOutput) {
                String log = SystemLogHandler.stopCapture();
                if (log != null && log.length() > 0) {
                    if (getServletContext() != null) {
                        getServletContext().log(log);
                    } else {
                        out.println(log);
                    }
                }
            }
        }
        return servlet;

    }
   private synchronized void initServlet(Servlet servlet)
        throws ServletException {

        if (instanceInitialized && !singleThreadModel) {
            return;
        }

        // Call the initialization method of this servlet
        try {
            if (Globals.IS_SECURITY_ENABLED) {
                boolean success = false;
                try {
                    Object[] args = new Object[]{facade};
                    SecurityUtil.doAsPrivilege("init",
                        servlet,
                        classType,
                        args);
                    success = true;
                } finally {
                    if (!success) {
                        // destroy() will not be called, thus clear the reference now
                        SecurityUtil.remove(servlet);
                    }
                }
            } else {
                servlet.init(facade);
            }

            instanceInitialized = true;
        } catch (UnavailableException f) {
            unavailable(f);
            throw f;
        } catch (ServletException f) {
            // If the servlet wanted to be unavailable it would have
            // said so, so do not call unavailable(null).
            throw f;
        } catch (Throwable f) {
            ExceptionUtils.handleThrowable(f);
            getServletContext().log(sm.getString("standardWrapper.initException", getName()), f);
            // If the servlet wanted to be unavailable it would have
            // said so, so do not call unavailable(null).
            throw new ServletException
                (sm.getString("standardWrapper.initException", getName()), f);
        }
    }


接着来看下过滤器链的执行过程

public final class ApplicationFilterChain implements FilterChain {
   @Override
    public void doFilter(ServletRequest request, ServletResponse response)
        throws IOException, ServletException {

        if (Globals.IS_SECURITY_ENABLED) {
            final ServletRequest req = request;
            final ServletResponse res = response;
            try {
                java.security.AccessController.doPrivileged(
                    new java.security.PrivilegedExceptionAction<Void>() {
                        @Override
                        public Void run()
                            throws ServletException, IOException {
                            // 执行Filter链
                            internalDoFilter(req, res);
                            return null;
                        }
                    }
                );
            } catch (PrivilegedActionException pe) {
                Exception e = pe.getException();
                if (e instanceof ServletException) {
                    throw (ServletException) e;
                } else if (e instanceof IOException) {
                    throw (IOException) e;
                } else if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                } else {
                    throw new ServletException(e.getMessage(), e);
                }
            }
        } else {
            // 执行Filter链
            internalDoFilter(request, response);
        }
     
}
    /**
     * `internalDoFilter`方法通过pos和n来调用过滤器链里面的每个过滤器。pos表示当前的过滤器下标,n表示总的过滤器数量
     * `internalDoFilter`方法最终会调用servlet.service()方法
     *
     * @param request
     * @param response
     * @throws IOException
     * @throws ServletException
     */
    private void internalDoFilter(ServletRequest request,
                                  ServletResponse response)
        throws IOException, ServletException {

        // Call the next filter if there is one
        // 当pos小于n时, 则执行Filter
        if (pos < n) {
            // 得到 过滤器 Filter,执行一次post++
            ApplicationFilterConfig filterConfig = filters[pos++];
            try {
                Filter filter = filterConfig.getFilter();

                if (request.isAsyncSupported() && "false".equalsIgnoreCase(
                    filterConfig.getFilterDef().getAsyncSupported())) {
                    request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
                }
                if (Globals.IS_SECURITY_ENABLED) {
                    final ServletRequest req = request;
                    final ServletResponse res = response;
                    Principal principal =
                        ((HttpServletRequest) req).getUserPrincipal();

                    Object[] args = new Object[]{req, res, this};
                    SecurityUtil.doAsPrivilege("doFilter", filter, classType, args, principal);
                } else {
                    // 执行Filter
                    filter.doFilter(request, response, this);
                }
            } catch (IOException | ServletException | RuntimeException e) {
                throw e;
            } catch (Throwable e) {
                e = ExceptionUtils.unwrapInvocationTargetException(e);
                ExceptionUtils.handleThrowable(e);
                throw new ServletException(sm.getString("filterChain.filter"), e);
            }
            return;
        }

        // 当pos等于n时,过滤器都执行完毕执行下面的代码
        // We fell off the end of the chain -- call the servlet instance
        try {
            if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
                lastServicedRequest.set(request);
                lastServicedResponse.set(response);
            }

            if (request.isAsyncSupported() && !servletSupportsAsync) {
                request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR,
                    Boolean.FALSE);
            }
            // Use potentially wrapped request from this point
            if ((request instanceof HttpServletRequest) &&
                (response instanceof HttpServletResponse) &&
                Globals.IS_SECURITY_ENABLED) {
                final ServletRequest req = request;
                final ServletResponse res = response;
                Principal principal =
                    ((HttpServletRequest) req).getUserPrincipal();
                Object[] args = new Object[]{req, res};
                SecurityUtil.doAsPrivilege("service",
                    servlet,
                    classTypeUsedInService,
                    args,
                    principal);
            } else {
                // service执行请求
                servlet.service(request, response);
            }
        } catch (IOException | ServletException | RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            e = ExceptionUtils.unwrapInvocationTargetException(e);
            ExceptionUtils.handleThrowable(e);
            throw new ServletException(sm.getString("filterChain.servlet"), e);
        } finally {
            if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
                lastServicedRequest.set(null);
                lastServicedResponse.set(null);
            }
        }
    

最终调用servlet.service(request, response); 方法,

真正将请求交给 Servlet 处理

protected void service(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {

        String method = req.getMethod();

        if (method.equals(METHOD_GET)) {
            long lastModified = getLastModified(req);
            if (lastModified == -1) {
                 doGet(req, resp);
            } else {
                long ifModifiedSince;
                try {
                    ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
                } catch (IllegalArgumentException iae) {
                    ifModifiedSince = -1;
                }
                if (ifModifiedSince < (lastModified / 1000 * 1000)) {
                    maybeSetLastModified(resp, lastModified);
                    doGet(req, resp);
                } else {
                    resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                }
            }

        } else if (method.equals(METHOD_HEAD)) {
            long lastModified = getLastModified(req);
            maybeSetLastModified(resp, lastModified);
            doHead(req, resp);

        } else if (method.equals(METHOD_POST)) {
            doPost(req, resp);

        } else if (method.equals(METHOD_PUT)) {
            doPut(req, resp);

        } else if (method.equals(METHOD_DELETE)) {
            doDelete(req, resp);

        } else if (method.equals(METHOD_OPTIONS)) {
            doOptions(req,resp);

        } else if (method.equals(METHOD_TRACE)) {
            doTrace(req,resp);

        } else {
        
            String errMsg = lStrings.getString("http.method_not_implemented");
            Object[] errArgs = new Object[1];
            errArgs[0] = method;
            errMsg = MessageFormat.format(errMsg, errArgs);

            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
        }
    }

至此整个请求链路结束,一次HTTP请求处理完成。

3. 小节

本文通过以 Connector 接受请求为起点,对框架源码进行了深度分析,并解答了 Tomcat 线程模型的结构的意义,以及一个 HTTP 请求经过 Tomcat 服务器的完整流程。

下图是框架接受请求各组件的时序图,以助于更好的理解。

https://www.processon.com/view/link/63c2e873e8d4677705c87b56

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://www.wormholestack.com/archives/649/
2024 © MRyan 101 ms