在之前讲解goroutine的时候,我们说过,想要实现goroutine
,必须解决的一个问题便是系统IO
,系统调用不能长时间阻塞M
,否则随着M
的阻塞与新的M
创建,协程会退化为线程导致系统负载过高。
golang
选择的解决方案是重构网络库代码,通过NIO
的方式是网络IO
仅阻塞G
,而M
不会被阻塞。这里再贴一次流程图:
接下来我们结合流程图看一下golang
的netpoll
代码。
net.Listen
listener, err := net.Listen("tcp", ":8080")
conn, err := listener.Accept()
当我们编写一个服务端程序时,都会通过上述代码初始化一个listener
,并开始监听TCP
连接。
对于net.Listen()
函数,最终调用的核心方法为netFD.listenStream()
:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
//设置TCP的一些属性,包括:SOL_SOCKET、SO_REUSEADDR
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
//获取监听的地址
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
//如果有设置Contrl回调,则回调此方法
//这个方法一般用在tcp建立完成之后的一些设置。
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
//绑定地址和端口
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
//开始监听连接
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
//fd初始化,主要是将这个fd添加到netpoll轮询中,为后面的accept()函数做准备
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
对于Listen()
函数,最核心的是fd.init()
方法,fd.init()
会将fd
注册到netpoll
轮询中,这样当我们调用accept()
时,便可以直接等待netpoll
唤醒即可。
func (pd *pollDesc) init(fd *FD) error {
//只执行一次 初始化netpoll 主要是初始化kqueue
//serverInit实现中包含了一个标准的double check代码,以实现延迟初始化
serverInit.Do(runtime_pollServerInit)
//将fd注册到netpoll中,为后面的accept()做准备
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
可以看到这里fd.init()
主要是先尝试初始化netpoll
,然后将listener
的fd
注册到netpoll
中。
这里比较重要的是runtime_pollOpen()
的返回值,这是一个比较重要的结构体,其主要是对fd
的封装,同时保存了一些属性用在netpoll
中:
type pollDesc struct {
//下一个pollDesc对象,用在缓存和批量分配对象
link *pollDesc // in pollcache, protected by pollcache.lock
//锁,保护下面所有的字段
lock mutex // protects the following fields
//句柄
fd uintptr
closing bool
//记录使用过程中出现的错误
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
//标记读操作时fd当前状态:pdReady、pdWait、goroutine
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
//标记写操作时fd当前状态:pdReady、pdWait、goroutine
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
其中比较特殊的字段为:rg
和wg
,其在注册到netpoll
中时,这两个字段表示的是当前的goroutine
的指针
当将pollDesc
注册到netpoll
时,会将kevent
的udata
字段指向pollDesc
,这样当netpoll
发现有数据可读时,便可以通过udata
复原pollDesc
。通过rg
和wg
字段可以找到对应的goroutine
,然后重新将其加入到调度队里中。
Listener.Accept()
对于listener.Accept()
,最核心方法便是下面的代码:
func (fd *netFD) accept() (netfd *netFD, err error) {
//获取连接
//该方法会阻塞g直到有连接建立
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
//创建连接使用到的fd
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
//初始化连接conn
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
//返回conn
return netfd, nil
}
继续看pfd.Accept
,这是Accept()
方法的核心所在,Accept()
会一直阻塞g
直到有数据准备好。
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
//加锁
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
//初始化fd的一些属性
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
//调用syscall的accept,
//由于fd在初始化时已经设置了非阻塞,因此这里不会阻塞
s, rsa, errcall, err := accept(fd.Sysfd)
//获取成功,则直接返回
if err == nil {
return s, rsa, "", err
}
switch err {
//EINTER错误,直接重试
case syscall.EINTR:
continue
//EAGAIN错误,说明此时没有数据可读,则阻塞
case syscall.EAGAIN:
//如果fd没有关闭,则阻塞直到有数据可读
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
//如果为ECONNABORTED错误,说明此时接受到的conn已经关闭,继续accpet下一个
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
- 注意这里加锁时锁叫
readLock()
,但是这里的readLock()
并不是常见的读写锁,这个锁的功能主要如下:- 原子判断锁对应的
fd
是否已经关闭,若是则返回加锁失败 - 如果请求的是写锁,并且写锁已经被其他协程持有,则休眠等待被唤醒
- 如果请求的是读锁,并且读锁已经被其他协程持有,则休眠等待被唤醒
- 写锁的持有与释放不影响读锁的持有与释放
- 如果
fd
执行关闭操作,则会唤醒并释放所有的阻塞的读锁和写锁
可以看到这个锁是专门为
IO
设计的,可读,可写,可关闭的锁,其实现非常巧妙,后面我们可以专门分析。 - 原子判断锁对应的
-
在
Listen()
方法中我们已经将fd
设置为非阻塞,同时将fd
注册到了netpoll
中,如果netpoll
检测到有数据可读,则会通知g
-
在函数中,我们首先尝试了一次读取数据,而不是直接休眠等待唤醒,这样可以第一时间读取到数据,因为是非阻塞的函数,因此这里不会阻塞到后面的方法的执行。
-
如果
fd
没有数据可读,则调用waitRead()
,waitRead()
会调用runtime_pollWait
将当前goroutine
挂起,当有数据可读时再将数据唤醒
接下来继续看runtime_pollWait
: runtime_pollWait()
具体实现在runtime/netpoll.go
中。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
//检查netpoll参数
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
//循环等待直到IO为ready状态
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 检查当前fd的状态,如果已经是`ready`了,那就不用再休眠
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
//修改状态为pdWait
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
//调用gopark()使得当前g休眠
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
//被唤醒后初始化rg/wg的状态
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
//返回当前被唤醒时状态是否为ready
return old == pdReady
}
对于gopark()
,我们在前面已经简单的介绍过,其主要作用便是切换到g0
栈将当前g
与p
解绑,然后继续调用schedule()
函数调度其他g
从这里我们可以看到Accept()
的总体流程。总体来说便是:初始化注册到netpoll
->尝试读取数据->读取失败休眠g
->等待唤醒->读取成功返回数据。
这里我们只看到了注册netpoll
,而没有看到什么时候去轮询netpoll()
.其实在前面讲解M
的状态流转时说过,在schedule()
中查找可运行的g
时,会查看netpoll
是否已经初始化,如果已经初始化则将会查看是否有有数据可读写的g
,如果有,则将其加入到调度队列中。其具体代码在findrunable()
函数调用中:如果在本地队列和全局队列中都没有找到可执行的g
,则执行下面的代码:
//如果netpoll已经初始化,并且等待数据的g的数量大于0同时sched.lastpoll不为零
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
//非阻塞调用netpoll,查找有数据可读写的g
//netpoll会将返回的pollDesc的rg/wg 置为pdReady状态
if list := netpoll(0); !list.empty() { // non-blocking
//取出第一个g
gp := list.pop()
//将剩余的g放入全局队列中,等待调度
injectglist(&list)
//修改当前g的状态为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
//返回g使其在m上运行
return gp, false
}
}
从findrunable()
运行逻辑可以看出来,findrunable()
会先查找本地是否有g
需要运行,再查找全局队列是否有g
需要运行。如果两个都查找失败,才会去检查netpoll()
,那如果系统中存在大量g
的,导致findrunable()
一直没有查看netpoll()
怎么办?此时还有系统监控函数sysmon()
兜底检查,其每间隔20us-10ms
运行一次,每次也会检查netpoll()
。
TCPConn# Read()
最后再来看一下TCPConn.Read()
函数:
当我们执行read()
方法,从一个连接中读取数据的时候,一般都是调用Read()
方法。
// net.conn # Read
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
//状态检查
if !c.ok() {
return 0, syscall.EINVAL
}
//调用封装的fd的read()方法
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
可以看到主要是调用了fd.Read()
方法
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
//加锁,防止多线程读取数据导致乱序
//如果err!=nil ,说明这个fd已经关闭
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
//如果p为空,则直接返回
//这里为什么要加加锁之后才判断?毕竟这个操作和锁无关
//因为锁还有个功能是判断fd是否关闭,如果fd已经关闭,则即使len(p)==0,也应该返回err
if len(p) == 0 {
return 0, nil
}
//初始化fd字段的一些信息
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
//对于UDP类型,需要限制最大包大小
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
//循环读并且忽略EINTR错误
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
//如果读到了EAGAIN错误,则说明没有数据可读了
if err == syscall.EAGAIN && fd.pd.pollable() {
//阻塞,直到被唤醒
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
//包装EOF错误
err = fd.eofError(n, err)
return n, err
}
}
这里逻辑其实比较简单:
- 首先,加锁然后执行一些准备操作
- 接下来先尝试读取数据,如果读取成功,则直接返回
- 否则,调用
waitRead()
方法阻塞直到被唤醒后继续读取
可以看到,核心逻辑便是 读取->阻塞->等待唤醒->继续读取,对于阻塞等待唤醒,我们又看见了熟悉的waitRead()
函数,因此这里不再赘述。
对于read()
和accept()
最大区别在于:read()
方法需要通过for
循环读够传入的数组的长度的数据或者报错才会返回,而accpet()
方法只要有数据可读即可返回。
总结
从golang
的netpoll
我们可以看到,golang
通过将NIO
与协程结合起来,解决了协程执行系统调用时会导致线程阻塞的问题;同时,通过NIO
与协程的结合,使得开发者既能享受非阻塞开发的简便性,又能享受到NIO
的高性能,这也是为什么golang
非常适合做基础网络服务的原因。
但是对于golang
来说,其SDK
只暴露了阻塞IO
的接口,这也使得对于类似websocket
长连接业务的场景,阻塞会导致每个连接都占用至少两个goroutine
,那么对于大量用户的场景会导致大量goroutine
依然会比较浪费内存。同时,过多的goroutine
也会导致runtime
调度会有一定的延迟,因此对于想要达到极致的性能要求时,也可以尝试放弃阻塞IO
的便利性,通过使用原生的NIO API
实现非阻塞IO
。