Tomcat源码剖析(三)Connector 处理请求连接分析

Connector 处理请求连接分析

Tomcat 是如何解析Http请求协议的?

在上一章中,分析了Tomcat中的connector的初始化与启动。但是由于使用的是Tomcat 9的代码,代码中默认使用的是NIO处理的IO连接,而NIO本身便具有一定的复杂度,这无疑增加了我们分析Tomcat解析Http请求的难度,因此,本篇文章根据Tomcat 7分析Tomcat连接器处理Http请求。

通过之前我们分析Tocmat整体结构可以知道,Tomcat中,负责接收请求的是连接器(Connector),负责处理请求的是容器(Container),但是在上一篇文章中一直没有找到Connector如何将requestresponse包装然后传给Container

这里再复习一下,Connector中,会根据不同的协议(Http/Ajp)以及不同的处理器(NIO/BIO/APR)的方式监听和处理请求。也就是这里应该都会有模块的结合。

前面说过,Tomcat#Connector会根据配置的的协议以及IO方式生成不同的ProtocoHandler,在Tomcat 7中,如果选择HTTP/1.1以及BIO的方式的话,默认使用的是org.apache.coyote.http11.Http11Protocol

  • 通过Connector#startInternal()方法会执行protocolHandler.start();启动protocoHandler.

  • protocolHandler#start()中,会执行endPoint#start()方法。

  • endPoint#start()会调用对应的bind()以及startInternal()

BIO对应的EndpointJIoEndpoint

JIoEndpoint#bind()

 @Override
    public void bind() throws Exception {

        //初始化接收请求线程数
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        // 初始化最大连接数
        if (getMaxConnections() == 0) {
            // User hasn't set a value - use the default
            setMaxConnections(getMaxThreadsWithExecutor());
        }
        //设置ssl
        if (serverSocketFactory == null) {
            if (isSSLEnabled()) {
                serverSocketFactory =
                    handler.getSslImplementation().getServerSocketFactory(this);
            } else {
                serverSocketFactory = new DefaultServerSocketFactory(this);
            }
        }

        //创建socket
        if (serverSocket == null) {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog());
                } else {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog(), getAddress());
                }         
        }

    }

JIoEndpoint # startInternal()

    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
                createExecutor();
            }
            //初始化连接拦截器
            initializeConnectionLatch();
            //启动接收请求线程
            startAcceptorThreads();

            // Start async timeout thread
            Thread timeoutThread = new Thread(new AsyncTimeout(),
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    }

可以发现,首先调用的bind()方法绑定了端口和IP,相当于初始化了serverSocket,接着startInternal()方法中,调用了startAcceptorThreads()方法,启动Acceptor线程

JIoEndpoint #Acceptor#run()

    @Override
    public void run() {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            //如果系统没有关闭并且处于暂停状态,则一直循环等待
            while (paused && running) {
                state = AcceptorState.PAUSED;
                Thread.sleep(50);

            }

            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;


            //如果已经到达了最大连接数,则等待
            countUpOrAwaitConnection();

            Socket socket = null;

            //阻塞,并接受请求
            socket = serverSocketFactory.acceptSocket(serverSocket);

            // Successful accept, reset the error delay
            errorDelay = 0;

            //配置socket
            if (running && !paused && setSocketOptions(socket)) {
                //处理socket
                if (!processSocket(socket)) {
                    countDownConnection();
                    // Close socket right away
                    closeSocket(socket);
                }
            } else {
                countDownConnection();
                // Close socket right away
                closeSocket(socket);
            }

        }
        state = AcceptorState.ENDED;
    }
}

代码中删除了try-catch 块

到这里,其实就能大概明白整体流程了,在JIoEndpointstart()方法中,会启动acceptorThreadCount(默认为1)个Acceptor进行监听并处理Socket

  • 处理Socket是调用processSocket(socket)处理的socket
  • processSocket(socket)方法会启动另外的线程进行处理
  • 在处理socket之前会使用setSocketOptions()方法配置socket,比如timeout,linger

JIoEndpoint#processSocket()

protected boolean processSocket(Socket socket) {

        SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
        wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
        wrapper.setSecure(isSSLEnabled());
        // During shutdown, executor may be null - avoid NPE
        if (!running) {
            return false;
        }
        getExecutor().execute(new SocketProcessor(wrapper));

        return true;
}

可以看到,porcessSocket()方法其实就是简单的使用线程池执行SocketProcessor线程而已

JIoEndpoint#SocketProcess#run()

    @Override
    public void run() {
        boolean launch = false;
        synchronized (socket) {
            try {
                SocketState state = SocketState.OPEN;


                 // SSL handshake
                serverSocketFactory.handshake(socket.getSocket());


                if ((state != SocketState.CLOSED)) {
                    if (status == null) {
                        state = handler.process(socket, SocketStatus.OPEN_READ);
                    } else {
                        state = handler.process(socket,status);
                    }
                }
                if (state == SocketState.CLOSED) {
                    // Close socket
                    if (log.isTraceEnabled()) {
                        log.trace("Closing socket:"+socket);
                    }
                    countDownConnection();
                    try {
                        socket.getSocket().close();
                    } catch (IOException e) {
                        // Ignore
                    }
                } else if (state == SocketState.OPEN ||
                        state == SocketState.UPGRADING ||
                        state == SocketState.UPGRADING_TOMCAT  ||
                        state == SocketState.UPGRADED){
                    socket.setKeptAlive(true);
                    socket.access();
                    launch = true;
                } else if (state == SocketState.LONG) {
                    socket.access();
                    waitingRequests.add(socket);
                }
            } 
        }
        socket = null;
        // Finish up this request
    }

}

删除了大部分try-catch

上面的方法中,其实最主要的便是下面这行代码:

 state = handler.process(socket, SocketStatus.OPEN_READ);

对于HTTP请求,Handler具体的的实现类为Http11Protocolhandler类便是真正用来处理HttpSocket的请求的一部分:

Http11Protocol # process()

@SuppressWarnings("deprecation") // Old HTTP upgrade method has been deprecated
public SocketState process(SocketWrapper<S> wrapper,
        SocketStatus status) {
    if (wrapper == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    S socket = wrapper.getSocket();
    if (socket == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    Processor<S> processor = connections.get(socket);
    if (status == SocketStatus.DISCONNECT && processor == null) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
    }

    wrapper.setAsync(false);
    ContainerThreadMarker.markAsContainerThread();

        if (processor == null) {
            processor = recycledProcessors.poll();
        }
        if (processor == null) {
            processor = createProcessor();
        }

        initSsl(wrapper, processor);

        SocketState state = SocketState.CLOSED;
        do {
            if (status == SocketStatus.DISCONNECT &&
                    !processor.isComet()) {
                // Do nothing here, just wait for it to get recycled
                // Don't do this for Comet we need to generate an end
                // event (see BZ 54022)
            } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
                state = processor.asyncDispatch(status);
                if (state == SocketState.OPEN) {
                    // release() won't get called so in case this request
                    // takes a long time to process, remove the socket from
                    // the waiting requests now else the async timeout will
                    // fire
                    getProtocol().endpoint.removeWaitingRequest(wrapper);
                    // There may be pipe-lined data to read. If the data
                    // isn't processed now, execution will exit this
                    // loop and call release() which will recycle the
                    // processor (and input buffer) deleting any
                    // pipe-lined data. To avoid this, process it now.
                    state = processor.process(wrapper);
                }
            } else if (processor.isComet()) {
                state = processor.event(status);
            } else if (processor.getUpgradeInbound() != null) {
                state = processor.upgradeDispatch();
            } else if (processor.isUpgrade()) {
                state = processor.upgradeDispatch(status);
            } else {
                state = processor.process(wrapper);
            }

            if (state != SocketState.CLOSED && processor.isAsync()) {
                state = processor.asyncPostProcess();
            }

            if (state == SocketState.UPGRADING) {
                // Get the HTTP upgrade handler
                HttpUpgradeHandler httpUpgradeHandler =
                        processor.getHttpUpgradeHandler();
                // Release the Http11 processor to be re-used
                release(wrapper, processor, false, false);
                // Create the upgrade processor
                processor = createUpgradeProcessor(
                        wrapper, httpUpgradeHandler);
                // Mark the connection as upgraded
                wrapper.setUpgraded(true);
                // Associate with the processor with the connection
                connections.put(socket, processor);
                // Initialise the upgrade handler (which may trigger
                // some IO using the new protocol which is why the lines
                // above are necessary)
                // This cast should be safe. If it fails the error
                // handling for the surrounding try/catch will deal with
                // it.
                httpUpgradeHandler.init((WebConnection) processor);
            } else if (state == SocketState.UPGRADING_TOMCAT) {
                // Get the UpgradeInbound handler
                org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
                        processor.getUpgradeInbound();
                // Release the Http11 processor to be re-used
                release(wrapper, processor, false, false);
                // Create the light-weight upgrade processor
                processor = createUpgradeProcessor(wrapper, inbound);
                inbound.onUpgradeComplete();
            }
            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + wrapper +
                        "], Status in: [" + status +
                        "], State out: [" + state + "]");
            }
        } while (state == SocketState.ASYNC_END ||
                state == SocketState.UPGRADING ||
                state == SocketState.UPGRADING_TOMCAT);

        if (state == SocketState.LONG) {
            // In the middle of processing a request/response. Keep the
            // socket associated with the processor. Exact requirements
            // depend on type of long poll
            connections.put(socket, processor);
            longPoll(wrapper, processor);
        } else if (state == SocketState.OPEN) {
            // In keep-alive but between requests. OK to recycle
            // processor. Continue to poll for the next request.
            connections.remove(socket);
            release(wrapper, processor, false, true);
        } else if (state == SocketState.SENDFILE) {
            // Sendfile in progress. If it fails, the socket will be
            // closed. If it works, the socket either be added to the
            // poller (or equivalent) to await more data or processed
            // if there are any pipe-lined requests remaining.
            connections.put(socket, processor);
        } else if (state == SocketState.UPGRADED) {
            // Need to keep the connection associated with the processor
            connections.put(socket, processor);
            // Don't add sockets back to the poller if this was a
            // non-blocking write otherwise the poller may trigger
            // multiple read events which may lead to thread starvation
            // in the connector. The write() method will add this socket
            // to the poller if necessary.
            if (status != SocketStatus.OPEN_WRITE) {
                longPoll(wrapper, processor);
            }
        } else {
            // Connection closed. OK to recycle the processor. Upgrade
            // processors are not recycled.
            connections.remove(socket);
            if (processor.isUpgrade()) {
                processor.getHttpUpgradeHandler().destroy();
            } else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
                // NO-OP
            } else {
                release(wrapper, processor, true, false);
            }
        }
        return state;
    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    // Don't try to add upgrade processors back into the pool
    if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
            && !processor.isUpgrade()) {
        release(wrapper, processor, true, false);
    }
    return SocketState.CLOSED;
}

这个方法上面被标记了deprecation,但是在Tomcat 7中,这个方法还是会被继续使用

上面socket会经过各种状态的检查以及处理,比如Http升级等

涉及到的东西比较多,这里我们只用关心其中最关键的一行:

state = processor.process(wrapper);

Processor # process()

@Override
public SocketState process(SocketWrapper<S> socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
    setSocketWrapper(socketWrapper);
    getInputBuffer().init(socketWrapper, endpoint);
    getOutputBuffer().init(socketWrapper, endpoint);

    // Flags
    keepAlive = true;
    comet = false;
    openSocket = false;
    sendfileInProgress = false;
    readComplete = true;
    if (endpoint.getUsePolling()) {
        keptAlive = false;
    } else {
        keptAlive = socketWrapper.isKeptAlive();
    }

    if (disableKeepAlive()) {
        socketWrapper.setKeepAliveLeft(0);
    }

    while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
            upgradeInbound == null &&
            httpUpgradeHandler == null && !endpoint.isPaused()) {

        // Parsing the request header
        try {
            setRequestLineReadTimeout();

            if (!getInputBuffer().parseRequestLine(keptAlive)) {
                if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (endpoint.isPaused()) {
                // 503 - Service unavailable
                response.setStatus(503);
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            } else {
                keptAlive = true;
                // Set this every time in case limit has been changed via JMX
                request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                request.getCookies().setLimit(getMaxCookieCount());
                // Currently only NIO will ever return false here
                if (!getInputBuffer().parseHeaders()) {
                    // We've read part of the request, don't recycle it
                    // instead associate it with the socket
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!disableUploadTimeout) {
                    setSocketTimeout(connectionUploadTimeout);
                }
            }
        } catch (IOException e) {
            if (getLog().isDebugEnabled()) {
                getLog().debug(
                        sm.getString("http11processor.header.parse"), e);
            }
            setErrorState(ErrorState.CLOSE_NOW, e);
            break;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            UserDataHelper.Mode logMode = userDataHelper.getNextMode();
            if (logMode != null) {
                String message = sm.getString(
                        "http11processor.header.parse");
                switch (logMode) {
                    case INFO_THEN_DEBUG:
                        message += sm.getString(
                                "http11processor.fallToDebug");
                        //$FALL-THROUGH$
                    case INFO:
                        getLog().info(message, t);
                        break;
                    case DEBUG:
                        getLog().debug(message, t);
                }
            }
            // 400 - Bad Request
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
            getAdapter().log(request, response, 0);
        }

        if (!getErrorState().isError()) {
            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (getLog().isDebugEnabled()) {
                    getLog().debug(sm.getString(
                            "http11processor.request.prepare"), t);
                }
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        if (maxKeepAliveRequests == 1) {
            keepAlive = false;
        } else if (maxKeepAliveRequests > 0 &&
                socketWrapper.decrementKeepAlive() <= 0) {
            keepAlive = false;
        }

        // Process the request in the adapter
        if (!getErrorState().isError()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                adapter.service(request, response);
                // Handle when the response was committed before a serious
                // error occurred.  Throwing a ServletException should both
                // set the status to 500 and set the errorException.
                // If we fail here, then the response is likely already
                // committed, so we can't try and set headers.
                if(keepAlive && !getErrorState().isError() && (
                        response.getErrorException() != null ||
                                (!isAsync() &&
                                statusDropsConnection(response.getStatus())))) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
                setCometTimeouts(socketWrapper);
            } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_NOW, e);
            } catch (HeadersTooLargeException e) {
                getLog().error(sm.getString("http11processor.request.process"), e);
                // The response should not have been committed but check it
                // anyway to be safe
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                getLog().error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        // Finish the handling of the request
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);

        if (!isAsync() && !comet) {
            if (getErrorState().isError()) {
                // If we know we are closing the connection, don't drain
                // input. This way uploading a 100GB file doesn't tie up the
                // thread if the servlet has rejected it.
                getInputBuffer().setSwallowInput(false);
            } else {
                // Need to check this again here in case the response was
                // committed before the error that requires the connection
                // to be closed occurred.
                checkExpectationAndResponseStatus();
            }
            endRequest();
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

        // If there was an error, make sure the request is counted as
        // and error, and update the statistics counter
        if (getErrorState().isError()) {
            response.setStatus(500);
        }
        request.updateCounters();

        if (!isAsync() && !comet || getErrorState().isError()) {
            if (getErrorState().isIoAllowed()) {
                getInputBuffer().nextRequest();
                getOutputBuffer().nextRequest();
            }
        }

        if (!disableUploadTimeout) {
            if(endpoint.getSoTimeout() > 0) {
                setSocketTimeout(endpoint.getSoTimeout());
            } else {
                setSocketTimeout(0);
            }
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        if (breakKeepAliveLoop(socketWrapper)) {
            break;
        }
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || endpoint.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync() || comet) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else if (getUpgradeInbound() != null) {
        return SocketState.UPGRADING_TOMCAT;
    } else {
        if (sendfileInProgress) {
            return SocketState.SENDFILE;
        } else {
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

这个方法内容也比较多,前面的内容是解析HTTP协议的方法,我们需要的最关键的一行如下:

 adapter.service(request, response);

看到这里,终于到达了我们的目的地:将请求交给容器处理


前面的方法比较多,这里没有一一仔细梳理,不过我们可以从大体上来看整个请求的处理:

  • 首先,通过EndPoint执行Socket绑定,并启动Acceptor执行socket监听
  • Acceptor在通过socket.accept()获取连接的socket后,启动一个新的线程处理该socket
  • Acceptor会调用Handler来处理接收到的请求,对于HTTP协议来说,Handler的实现类为Http11Protocol
  • Handler在处理socket的时候,会调用**Processor **的process()方法生成requestresponse并将请求转交给容器进行后续的处理

到这里,我们就将Tomcat的连接器与容器连接起来了。