Connector 处理请求连接分析
Tomcat 是如何解析
Http
请求协议的?
在上一章中,分析了Tomcat
中的connector
的初始化与启动。但是由于使用的是Tomcat 9
的代码,代码中默认使用的是NIO
处理的IO
连接,而NIO
本身便具有一定的复杂度,这无疑增加了我们分析Tomcat
解析Http
请求的难度,因此,本篇文章根据Tomcat 7
分析Tomcat
连接器处理Http
请求。
通过之前我们分析Tocmat
整体结构可以知道,Tomcat
中,负责接收请求的是连接器(Connector
),负责处理请求的是容器(Container
),但是在上一篇文章中一直没有找到Connector
如何将request
和response
包装然后传给Container
?
这里再复习一下,Connector
中,会根据不同的协议(Http
/Ajp
)以及不同的处理器(NIO/BIO/APR
)的方式监听和处理请求。也就是这里应该都会有模块的结合。
前面说过,Tomcat
#Connector
会根据配置的的协议以及IO
方式生成不同的ProtocoHandler
,在Tomcat 7
中,如果选择HTTP/1.1
以及BIO
的方式的话,默认使用的是org.apache.coyote.http11.Http11Protocol
- 通过
Connector
#startInternal()
方法会执行protocolHandler.start();
启动protocoHandler
. -
protocolHandler#start()
中,会执行endPoint#start()
方法。 -
endPoint#start()
会调用对应的bind()
以及startInternal()
BIO
对应的Endpoint
为JIoEndpoint
JIoEndpoint#bind()
@Override
public void bind() throws Exception {
//初始化接收请求线程数
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
// 初始化最大连接数
if (getMaxConnections() == 0) {
// User hasn't set a value - use the default
setMaxConnections(getMaxThreadsWithExecutor());
}
//设置ssl
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
serverSocketFactory =
handler.getSslImplementation().getServerSocketFactory(this);
} else {
serverSocketFactory = new DefaultServerSocketFactory(this);
}
}
//创建socket
if (serverSocket == null) {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
}
}
JIoEndpoint # startInternal()
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
//初始化连接拦截器
initializeConnectionLatch();
//启动接收请求线程
startAcceptorThreads();
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
可以发现,首先调用的bind()
方法绑定了端口和IP
,相当于初始化了serverSocket
,接着startInternal()
方法中,调用了startAcceptorThreads()
方法,启动Acceptor
线程
JIoEndpoint #Acceptor#run()
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
//如果系统没有关闭并且处于暂停状态,则一直循环等待
while (paused && running) {
state = AcceptorState.PAUSED;
Thread.sleep(50);
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
//如果已经到达了最大连接数,则等待
countUpOrAwaitConnection();
Socket socket = null;
//阻塞,并接受请求
socket = serverSocketFactory.acceptSocket(serverSocket);
// Successful accept, reset the error delay
errorDelay = 0;
//配置socket
if (running && !paused && setSocketOptions(socket)) {
//处理socket
if (!processSocket(socket)) {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
}
state = AcceptorState.ENDED;
}
}
代码中删除了try-catch 块
到这里,其实就能大概明白整体流程了,在JIoEndpoint
的start()
方法中,会启动acceptorThreadCount
(默认为1)个Acceptor
进行监听并处理Socket
。
- 处理
Socket
是调用processSocket(socket)
处理的socket
processSocket(socket)
方法会启动另外的线程进行处理- 在处理
socket
之前会使用setSocketOptions()
方法配置socket
,比如timeout
,linger
等
JIoEndpoint#processSocket()
protected boolean processSocket(Socket socket) {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
return true;
}
可以看到,
porcessSocket()
方法其实就是简单的使用线程池执行SocketProcessor
线程而已
JIoEndpoint#SocketProcess#run()
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
if ((state != SocketState.CLOSED)) {
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT ||
state == SocketState.UPGRADED){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
}
}
socket = null;
// Finish up this request
}
}
删除了大部分try-catch
上面的方法中,其实最主要的便是下面这行代码:
state = handler.process(socket, SocketStatus.OPEN_READ);
对于HTTP
请求,Handler
具体的的实现类为Http11Protocol
,handler
类便是真正用来处理Http
中Socket
的请求的一部分:
Http11Protocol # process()
@SuppressWarnings("deprecation") // Old HTTP upgrade method has been deprecated
public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.markAsContainerThread();
if (processor == null) {
processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor();
}
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
// release() won't get called so in case this request
// takes a long time to process, remove the socket from
// the waiting requests now else the async timeout will
// fire
getProtocol().endpoint.removeWaitingRequest(wrapper);
// There may be pipe-lined data to read. If the data
// isn't processed now, execution will exit this
// loop and call release() which will recycle the
// processor (and input buffer) deleting any
// pipe-lined data. To avoid this, process it now.
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.getUpgradeInbound() != null) {
state = processor.upgradeDispatch();
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else {
state = processor.process(wrapper);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
} else if (state == SocketState.UPGRADING_TOMCAT) {
// Get the UpgradeInbound handler
org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
processor.getUpgradeInbound();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the light-weight upgrade processor
processor = createUpgradeProcessor(wrapper, inbound);
inbound.onUpgradeComplete();
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
connections.put(socket, processor);
} else if (state == SocketState.UPGRADED) {
// Need to keep the connection associated with the processor
connections.put(socket, processor);
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
processor.getHttpUpgradeHandler().destroy();
} else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
// NO-OP
} else {
release(wrapper, processor, true, false);
}
}
return state;
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
&& !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}
这个方法上面被标记了
deprecation
,但是在Tomcat 7
中,这个方法还是会被继续使用
上面socket
会经过各种状态的检查以及处理,比如Http
升级等
涉及到的东西比较多,这里我们只用关心其中最关键的一行:
state = processor.process(wrapper);
Processor # process()
@Override
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);
getOutputBuffer().init(socketWrapper, endpoint);
// Flags
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}
if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
upgradeInbound == null &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// Parsing the request header
try {
setRequestLineReadTimeout();
if (!getInputBuffer().parseRequestLine(keptAlive)) {
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
request.getCookies().setLimit(getMaxCookieCount());
// Currently only NIO will ever return false here
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (getLog().isDebugEnabled()) {
getLog().debug(
sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString(
"http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString(
"http11processor.fallToDebug");
//FALL-THROUGH
case INFO:
getLog().info(message, t);
break;
case DEBUG:
getLog().debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
adapter.service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
} catch (HeadersTooLargeException e) {
getLog().error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync() && !comet) {
if (getErrorState().isError()) {
// If we know we are closing the connection, don't drain
// input. This way uploading a 100GB file doesn't tie up the
// thread if the servlet has rejected it.
getInputBuffer().setSwallowInput(false);
} else {
// Need to check this again here in case the response was
// committed before the error that requires the connection
// to be closed occurred.
checkExpectationAndResponseStatus();
}
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();
if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}
if (!disableUploadTimeout) {
if(endpoint.getSoTimeout() > 0) {
setSocketTimeout(endpoint.getSoTimeout());
} else {
setSocketTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync() || comet) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else if (getUpgradeInbound() != null) {
return SocketState.UPGRADING_TOMCAT;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
这个方法内容也比较多,前面的内容是解析HTTP
协议的方法,我们需要的最关键的一行如下:
adapter.service(request, response);
看到这里,终于到达了我们的目的地:将请求交给容器处理
前面的方法比较多,这里没有一一仔细梳理,不过我们可以从大体上来看整个请求的处理:
- 首先,通过EndPoint执行
Socket
绑定,并启动Acceptor
执行socket
监听 - Acceptor在通过
socket.accept()
获取连接的socket
后,启动一个新的线程处理该socket
Acceptor
会调用Handler来处理接收到的请求,对于HTTP
协议来说,Handler
的实现类为Http11Protocol
Handler
在处理socket
的时候,会调用**Processor **的process()
方法生成request
和response
并将请求转交给容器进行后续的处理
到这里,我们就将Tomcat
的连接器与容器连接起来了。