Tomcat 源码剖析(七)Response.getOutputStream() 是如何缓存数据流的?

【问题】

Tomcat 8以后,默认都是使用的NIO,并且内部有一定的缓存,那么 Tomcat是如何缓存RequestResponse的输入输出流的?

【思路】

这个感觉比较简单,NIO的输入单位是ByteBuffer,那么直接在Response中返回一个包装流,写入字符串的时候就添加到Response中即可,当写入的字节达到一定数量后,再真正写入到NIO管道中。

【Tomcat】

Tomcat大体上是这样做的,不过由于涉及到业务逻辑上的问题,比如ResponseTomcat中还包括其他属性和方法,但是这些属性和方法不能给Tomcat用户使用,再比如ResponseRequest在相应之前还会有过滤器的逻辑等,所以Tomcat并不是简单的做了一下转换。

想要了解Tomcat的上述问题,比如清楚以下几点:

  • Response

    用户在使用Servlet中的service方法所获取参数service(Response res,Request req)对应的类为:

    org.apache.catalina.connector.Response,这个Response对象实现了Servlet规范所需要的Response对象,而用户所获取到的Response参数,是此Response的外观类ResponseFacade

    Tomcat内部所使用的是org.apache.coyot.Response它实现了NIO对应的org.apache.catalina.connector.Response类的底层功能

    因此一般数据流向应该是先由用户从org.apache.catalina.connector.Response获取OutputStream,然后再流向org.apache.coyot.Response

  • OutputBuffer

    OutputBuffer作为OutputStream的包装类,不仅仅存在于org.apache.catalina.connector.Response类中,在整个数据流程中,会遇到很多OutputBuffer,虽然名字叫OutputBuffer,但是它的作用不仅仅是缓存数据,还包含很多业务逻辑上的处理。

【代码】

首先我们一般需要输入输出数据的时候,都是使用如下代码:

OutputStream outputStream = response.getOutputStream();

因此,首先我们找到Response的外观类ResponseFacade

ResponseFacade ## getOutputStream()

    @Override
    public ServletOutputStream getOutputStream()
        throws IOException {

        ServletOutputStream sos = response.getOutputStream();
        if (isFinished()) {
            //这里的response就是真正的`Response`类
            response.setSuspended(true);
        }
        return sos;
    }

Response##getOutputStream()

@Override
public ServletOutputStream getOutputStream()
    throws IOException {

    if (usingWriter) {
        throw new IllegalStateException
            (sm.getString("coyoteResponse.getOutputStream.ise"));
    }

    usingOutputStream = true;
    if (outputStream == null) {
        outputStream = new CoyoteOutputStream(outputBuffer);
    }
    return outputStream;

}

可以看到,上面便是通过outputBuffer生成了一个CoyoteOutputStream

  • CoyoteOutputStream

    CoyoteOutputStream是对Outputbuffer的包装,使用CoyoteOutputStream和直接使用OutputBuffer差别不到,不过CoyoteOutputStream涉及到一些业务逻辑,比如

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      boolean nonBlocking = checkNonBlockingWrite();
      ob.write(b, off, len);
      if (nonBlocking) {
          checkRegisterForWrite();
      }
    }
    

    可以发现虽然写都是通过outputBuffer写的,但是剩下的逻辑还检查了一下缓冲区是否还有遗留数据。

  • outputBuffer这是我们遇到的第一个outputBuffer

    它继承于Writer类,里面包含了两个比较重要的属性:

    • private Response coyoteResponse;

    前面说过,这个对象是用来真真处理数据流向的类,因此当这个buffer超过预定的值后,会将数据写入到这个类中

    • private ByteBuffer bb;NIO中的ByteBuffer,因为里面的方法比较繁琐,因此这里Tomcat简单包装了下,使得更加易用

在平时,我们需要写数据的时候,我们都是写的如下代码:

outputStream.write(i);

对应到outputBuffer中的代码如下:

OutputBuffer##writeByte()

private void writeBytes(byte b[], int off, int len) throws IOException {

    if (closed) {
        return;
    }

    append(b, off, len);
    bytesWritten += len;

    // if called from within flush(), then immediately flush
    // remaining bytes
    //可以先忽略这部分,因为貌似没有找到有其他地方设置doFlush为true
    if (doFlush) {
        flushByteBuffer();
    }
}

OutputBuffer##append()

public void append(byte src[], int off, int len) throws IOException {
    //如果缓存没有剩下的空间,则使用appendByteArray 先发送一部分到底层
    if (bb.remaining() == 0) {
        appendByteArray(src, off, len);
    } else {
        //先放一部分到缓存中
        int n = transfer(src, off, len, bb);
        len = len - n;
        off = off + n;
        //如果缓存满了
        if (isFull(bb)) {
            //将缓存的数据发送到底层
            flushByteBuffer();
            //将剩下的数据再放入缓存中
            appendByteArray(src, off, len);
        }
    }
}

OutputBuffer##appendByteBuffer()

private void appendByteArray(byte src[], int off, int len) throws IOException {
    if (len == 0) {
        return;
    }

   int limit = bb.capacity();
    //一直将数据发送到底层,直到缓冲能装下剩余的数据
    while (len >= limit) {
        realWriteBytes(ByteBuffer.wrap(src, off, limit));
        len = len - limit;
        off = off + limit;
    }

    if (len > 0) {
        transfer(src, off, len, bb);
    }
}

OutputBuffer##flushByteBuffer()

private void flushByteBuffer() throws IOException {
    realWriteBytes(bb.slice());
    clear(bb);
}

可以看到,最终调用的都是realWriteBytes,而这个方法看名字就知道是真正写入数据

OutputBuffer##realWriteBytes()

public void realWriteBytes(ByteBuffer buf) throws IOException {

    if (closed) {
        return;
    }
    if (coyoteResponse == null) {
        return;
    }

    // If we really have something to write
    if (buf.remaining() > 0) {
        // real write to the adapter
        try {
            coyoteResponse.doWrite(buf);
        } catch (IOException e) {
            // An IOException on a write is almost always due to
            // the remote client aborting the request. Wrap this
            // so that it can be handled better by the error dispatcher.
            throw new ClientAbortException(e);
        }
    }

}

可以看到,关键就一行代码coyoteResponse.doWrite(buf);.


到这里,OutputBuffer的职责就完成了,大体总结下就是:

  • 通过CoyoteOutputStream包装成一个数据流
  • OutputBuffer在写数据的时候,先存在本地的ByteBuffer中,当ByteBuffer缓存满了,再将数据写入到coyoteResponse中。
  • OutputBUffer中,默认大小为DEFAULT_BUFFER_SIZE = 8 * 1024;

Coyote\Response

coyote\Response中,也包含一个OutputBuffer,而这个Outputbuffer和上面的OutputBuffer完全不一样,这里的Outputbuffer涉及到一些业务逻辑上的问题,比如对于Http协议,它会根据一些Http版本以及设置等进行不同的处理,对于Ajp协议,它的处理方式也有不同。同时,对于不同的输出方式,也有不同的方法,比如NIOBIO

Coyote\Response##doWrite()

    @Override
    public int doWrite(ByteBuffer chunk) throws IOException {
        //如果请求头还没有发送,则先发送请求头
        //为什么要先发送请求头?因为涉及到客户端如何处理接收到的消息,比如Encode
        if (!response.isCommitted()) {
            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeaders) and
            // set the filters accordingly.
            response.action(ActionCode.COMMIT, null);
        }
        //如果所有过滤器都处理完,则使用outputStreamOutputBuffer 写出数据
        if (lastActiveFilter == -1) {
            return outputStreamOutputBuffer.doWrite(chunk);
        } else {
            return activeFilters[lastActiveFilter].doWrite(chunk);
        }
    }

接下来我们只看outputStreamOutputBuffer.doWrite()

org.apache.coyote.http11#Http11OutputBuffer#SocketOutputBuffer#doWrite()

@Override
public int doWrite(ByteBuffer chunk) throws IOException {
    try {
        int len = chunk.remaining();
        socketWrapper.write(isBlocking(), chunk);
        len -= chunk.remaining();
        byteCount += len;
        return len;
    } catch (IOException ioe) {
        response.action(ActionCode.CLOSE_NOW, ioe);
        // Re-throw
        throw ioe;
    }
}

可以看到,这里主要是通过socketWrapper写入数据

最后看看SocketWrapper

NioSocketWrapper##doWrite()

@Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
    long writeTimeout = getWriteTimeout();
    Selector selector = null;
    try {
        selector = pool.get();
    } catch (IOException x) {
        // Ignore
    }
    try {
        pool.write(from, getSocket(), selector, writeTimeout, block);
        if (block) {
            // Make sure we are flushed
            do {
                if (getSocket().flush(true, selector, writeTimeout)) {
                    break;
                }
            } while (true);
        }
        updateLastWrite();
    } finally {
        if (selector != null) {
            pool.put(selector);
        }
    }
    // If there is data left in the buffer the socket will be registered for
    // write further up the stack. This is to ensure the socket is only
    // registered for write once as both container and user code can trigger
    // write registration.
}

到这里,数据就到了底层的NIO,通过NIO,数据就通过Socket发送到客户端了


流程基本捋明白了,这里简单总结一下:

  • 首先,用户获得的输入输出是通过Response返回的CoyoteOutputStream,而这个类里面包含了OutputBuffer
  • 用户实际的输入输出都是OutputBuffer的输入输出,平时少量的输入会一直存在OutputBuffer中,当字节到达了一定程度,OutputBuffer才会调用realWriteBytes()写入
  • realWriteBytes()底层是通过coyote/Response写入,coyote/Response中包含另外一个outputBuffer
  • 对于Http1.1来说outputBuffer对应的是Http11OutputBuffer,涉及到对Http1.1的特殊处理,比如分块发送等
  • Http11OutputBuffer再底层便是NIO的输出

最后,再带着几个问题:

  • 通常情况下,是用户写满缓存就会发送一次数据,那么没有写满怎么办?

    关键在于close()方法,如果没有写满,会一直等待用户调用close()方法,close()方法会把缓存中的数据真正发送出去:

  • 在《Head First Servlets》中提到过,对于Http Header,应该在最前面设置,其次再写Body的内容,因为有些时候,Header可能会被提前发送,这个提前是什么时候?

    答案在Http11OutputBuffer## doWrite()

      @Override
      public int doWrite(ByteBuffer chunk) throws IOException {
    
          if (!response.isCommitted()) {
              // Send the connector a request for commit. The connector should
              // then validate the headers, send them (using sendHeaders) and
              // set the filters accordingly.
              response.action(ActionCode.COMMIT, null);
          }
    
          if (lastActiveFilter == -1) {
              return outputStreamOutputBuffer.doWrite(chunk);
          } else {
              return activeFilters[lastActiveFilter].doWrite(chunk);
          }
      }
    

    可以看到,在调用这个方法的时候,首先会判断是否Committed,如果没有,就Committed,这个Committed干了啥呢?简单来说就是生成一些默认的和已知的Http Header发送出去。

    而这个方法,刚开始就分析过了,要是缓存满了,就会被调用。因此如果先使用outputStream发送body,那么Header就无法修改了