golang netpoll 源码分析

在之前讲解goroutine的时候,我们说过,想要实现goroutine,必须解决的一个问题便是系统IO,系统调用不能长时间阻塞M,否则随着M的阻塞与新的M创建,协程会退化为线程导致系统负载过高。

golang选择的解决方案是重构网络库代码,通过NIO的方式是网络IO仅阻塞G,而M不会被阻塞。这里再贴一次流程图:

image-20221012172655517

接下来我们结合流程图看一下golangnetpoll代码。


net.Listen

listener, err := net.Listen("tcp", ":8080")
conn, err := listener.Accept()

当我们编写一个服务端程序时,都会通过上述代码初始化一个listener,并开始监听TCP连接。

对于net.Listen()函数,最终调用的核心方法为netFD.listenStream()

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
  //设置TCP的一些属性,包括:SOL_SOCKET、SO_REUSEADDR
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
  //获取监听的地址
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
  //如果有设置Contrl回调,则回调此方法
  //这个方法一般用在tcp建立完成之后的一些设置。
    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
  //绑定地址和端口
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }

  //开始监听连接
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }

  //fd初始化,主要是将这个fd添加到netpoll轮询中,为后面的accept()函数做准备
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

对于Listen()函数,最核心的是fd.init()方法,fd.init()会将fd注册到netpoll轮询中,这样当我们调用accept()时,便可以直接等待netpoll唤醒即可。

func (pd *pollDesc) init(fd *FD) error {
    //只执行一次 初始化netpoll 主要是初始化kqueue
  //serverInit实现中包含了一个标准的double check代码,以实现延迟初始化
  serverInit.Do(runtime_pollServerInit)
  //将fd注册到netpoll中,为后面的accept()做准备
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

可以看到这里fd.init()主要是先尝试初始化netpoll,然后将listenerfd注册到netpoll中。

这里比较重要的是runtime_pollOpen()的返回值,这是一个比较重要的结构体,其主要是对fd的封装,同时保存了一些属性用在netpoll中:

type pollDesc struct {
  //下一个pollDesc对象,用在缓存和批量分配对象
    link *pollDesc // in pollcache, protected by pollcache.lock

  //锁,保护下面所有的字段
    lock    mutex // protects the following fields
  //句柄
  fd      uintptr
    closing bool
    //记录使用过程中出现的错误
  everr   bool      // marks event scanning error happened
  user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    //标记读操作时fd当前状态:pdReady、pdWait、goroutine
  rg      uintptr   // pdReady, pdWait, G waiting for read or nil
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline
    wseq    uintptr   // protects from stale write timers
  //标记写操作时fd当前状态:pdReady、pdWait、goroutine
    wg      uintptr   // pdReady, pdWait, G waiting for write or nil
    wt      timer     // write deadline timer
    wd      int64     // write deadline
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

其中比较特殊的字段为:rgwg,其在注册到netpoll中时,这两个字段表示的是当前的goroutine的指针

当将pollDesc注册到netpoll时,会将keventudata字段指向pollDesc,这样当netpoll发现有数据可读时,便可以通过udata复原pollDesc。通过rgwg字段可以找到对应的goroutine,然后重新将其加入到调度队里中。

Listener.Accept()

对于listener.Accept(),最核心方法便是下面的代码:

func (fd *netFD) accept() (netfd *netFD, err error) {
  //获取连接
  //该方法会阻塞g直到有连接建立
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }
  //创建连接使用到的fd
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
  //初始化连接conn
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
  //返回conn
    return netfd, nil
}

继续看pfd.Accept,这是Accept()方法的核心所在,Accept()会一直阻塞g直到有数据准备好。

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    //加锁
  if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

  //初始化fd的一些属性
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }

    for {
    //调用syscall的accept,
    //由于fd在初始化时已经设置了非阻塞,因此这里不会阻塞
        s, rsa, errcall, err := accept(fd.Sysfd)
    //获取成功,则直接返回
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
    //EINTER错误,直接重试
        case syscall.EINTR:
            continue
    //EAGAIN错误,说明此时没有数据可读,则阻塞  
        case syscall.EAGAIN:
      //如果fd没有关闭,则阻塞直到有数据可读
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
    //如果为ECONNABORTED错误,说明此时接受到的conn已经关闭,继续accpet下一个
        case syscall.ECONNABORTED:
            continue
        }
        return -1, nil, errcall, err
    }
}

  • 注意这里加锁时锁叫readLock(),但是这里的readLock()并不是常见的读写锁,这个锁的功能主要如下:
    • 原子判断锁对应的fd是否已经关闭,若是则返回加锁失败
    • 如果请求的是写锁,并且写锁已经被其他协程持有,则休眠等待被唤醒
    • 如果请求的是读锁,并且读锁已经被其他协程持有,则休眠等待被唤醒
    • 写锁的持有与释放不影响读锁的持有与释放
    • 如果fd执行关闭操作,则会唤醒并释放所有的阻塞的读锁和写锁

    可以看到这个锁是专门为IO设计的,可读,可写,可关闭的锁,其实现非常巧妙,后面我们可以专门分析。

  • Listen()方法中我们已经将fd设置为非阻塞,同时将fd注册到了netpoll中,如果netpoll检测到有数据可读,则会通知g

  • 在函数中,我们首先尝试了一次读取数据,而不是直接休眠等待唤醒,这样可以第一时间读取到数据,因为是非阻塞的函数,因此这里不会阻塞到后面的方法的执行。

  • 如果fd没有数据可读,则调用waitRead()waitRead()会调用runtime_pollWait将当前goroutine挂起,当有数据可读时再将数据唤醒

接下来继续看runtime_pollWait: runtime_pollWait()具体实现在runtime/netpoll.go中。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
  //检查netpoll参数
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }

  //循环等待直到IO为ready状态
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // 检查当前fd的状态,如果已经是`ready`了,那就不用再休眠
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
    //修改状态为pdWait
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

  //调用gopark()使得当前g休眠
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    //被唤醒后初始化rg/wg的状态
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
  //返回当前被唤醒时状态是否为ready
    return old == pdReady
}

对于gopark(),我们在前面已经简单的介绍过,其主要作用便是切换到g0栈将当前gp解绑,然后继续调用schedule()函数调度其他g

从这里我们可以看到Accept()的总体流程。总体来说便是:初始化注册到netpoll->尝试读取数据->读取失败休眠g->等待唤醒->读取成功返回数据。

这里我们只看到了注册netpoll,而没有看到什么时候去轮询netpoll().其实在前面讲解M的状态流转时说过,在schedule()中查找可运行的g时,会查看netpoll是否已经初始化,如果已经初始化则将会查看是否有有数据可读写的g,如果有,则将其加入到调度队列中。其具体代码在findrunable()函数调用中:如果在本地队列和全局队列中都没有找到可执行的g,则执行下面的代码:

//如果netpoll已经初始化,并且等待数据的g的数量大于0同时sched.lastpoll不为零
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
     //非阻塞调用netpoll,查找有数据可读写的g  
   //netpoll会将返回的pollDesc的rg/wg 置为pdReady状态
   if list := netpoll(0); !list.empty() { // non-blocking
            //取出第一个g
      gp := list.pop()
            //将剩余的g放入全局队列中,等待调度
      injectglist(&list)
      //修改当前g的状态为_Grunnable
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
      //返回g使其在m上运行
            return gp, false
        }
    }

findrunable()运行逻辑可以看出来,findrunable()会先查找本地是否有g需要运行,再查找全局队列是否有g需要运行。如果两个都查找失败,才会去检查netpoll(),那如果系统中存在大量g的,导致findrunable()一直没有查看netpoll()怎么办?此时还有系统监控函数sysmon()兜底检查,其每间隔20us-10ms运行一次,每次也会检查netpoll()

TCPConn# Read()

最后再来看一下TCPConn.Read()函数:

当我们执行read()方法,从一个连接中读取数据的时候,一般都是调用Read()方法。

// net.conn # Read
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    //状态检查
  if !c.ok() {
        return 0, syscall.EINVAL
    }
  //调用封装的fd的read()方法
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

可以看到主要是调用了fd.Read()方法

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
  //加锁,防止多线程读取数据导致乱序
  //如果err!=nil ,说明这个fd已经关闭
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
  //如果p为空,则直接返回
  //这里为什么要加加锁之后才判断?毕竟这个操作和锁无关
  //因为锁还有个功能是判断fd是否关闭,如果fd已经关闭,则即使len(p)==0,也应该返回err
    if len(p) == 0 {
        return 0, nil
    }
  //初始化fd字段的一些信息
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
  //对于UDP类型,需要限制最大包大小
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
    //循环读并且忽略EINTR错误
        n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
        if err != nil {
            n = 0
      //如果读到了EAGAIN错误,则说明没有数据可读了
            if err == syscall.EAGAIN && fd.pd.pollable() {
        //阻塞,直到被唤醒
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
    //包装EOF错误
        err = fd.eofError(n, err)
        return n, err
    }
}

这里逻辑其实比较简单:

  • 首先,加锁然后执行一些准备操作
  • 接下来先尝试读取数据,如果读取成功,则直接返回
  • 否则,调用waitRead()方法阻塞直到被唤醒后继续读取

可以看到,核心逻辑便是 读取->阻塞->等待唤醒->继续读取,对于阻塞等待唤醒,我们又看见了熟悉的waitRead()函数,因此这里不再赘述。

对于read()accept()最大区别在于:read()方法需要通过for循环读够传入的数组的长度的数据或者报错才会返回,而accpet()方法只要有数据可读即可返回。

总结

golangnetpoll我们可以看到,golang通过将NIO与协程结合起来,解决了协程执行系统调用时会导致线程阻塞的问题;同时,通过NIO与协程的结合,使得开发者既能享受非阻塞开发的简便性,又能享受到NIO的高性能,这也是为什么golang非常适合做基础网络服务的原因。

但是对于golang来说,其SDK只暴露了阻塞IO的接口,这也使得对于类似websocket长连接业务的场景,阻塞会导致每个连接都占用至少两个goroutine,那么对于大量用户的场景会导致大量goroutine依然会比较浪费内存。同时,过多的goroutine也会导致runtime调度会有一定的延迟,因此对于想要达到极致的性能要求时,也可以尝试放弃阻塞IO的便利性,通过使用原生的NIO API实现非阻塞IO