Golang 网络 IO 和 epoll 调用

Golang 网络 IO 和 epoll 调用

tk_sky 131 2024-02-22

Golang 网络 IO 和 epoll 调用

之前看了 linux 内核关于网络收包和 IO 多路复用相关的原理:Linux 网络收包与 IO 多路复用 - tk_sky的博客 (mcyou.cc)

今天来看看 golang 对网络 IO 模型的包装和向上实现。在 linux 平台上,golang 使用 epoll 来实现。在其他平台上,编译时会选择其他的解决方案。

一个标准的 tcp 服务器

先来看看在 golang 中一个标准的 tcp 服务要怎么完成:

// 启动一个 tcp 服务端代码示例
func main(){
   // 创建一个 tcp 端口监听器
   l,_ := net.Listen("tcp",":8080")
   // 主动轮询模型
   for{
       // 等待 tcp 连接到达
       conn,_ := l.Accept()     
       // 开启一个 goroutine 负责一次客户端请求的处理
       go serve(conn)
   }
}

// 处理一个 tcp 连接
func serve(conn net.Conn){
    defer conn.Close()
    var buf []byte
    // 读取连接中的数据
    _,_ = conn.Read(buf)    
    // 业务逻辑...
}

可以看出有下面几个比较核心的步骤:

  • 创建 tcp 端口监听器(listener)
  • 使用 for 循环建立主动轮询模型
  • 每轮尝试从 listener 中获取到达的 tcp 连接
  • 对取到的每个连接启动一个 goroutine 异步处理连接
  • 如果没有连接到达,就阻塞

我们主要从创建 listener 的 net.listen,以及获取连接的 listner.Accept 方法作为入口去看整个流程。

创建端口监听器

要创建 listener,listen 方法会根据 listener 的 config 判断对应的协议,这里是 tcp。接着就是创建 socket 了。golang 做 socket 创建和初始化的代码在 runtime/sock_posix.go:

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // ...
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

在 socket 方法中,要发起两次系统调用:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 创建一个 socket 套接字
    s, err := sysSocket(family, sotype, proto)   
    // ...
    // 绑定、监听端口,并对 socket fd 进行初始化. epoll_create 和 epoll_ctl 执行的执行正是在初始化的流程中.
    fd.listenStream(laddr, listenerBacklog(), ctrlFn)
    // ...
}

// socketFunc 宏指令,关联执行系统调用 syscall.Socket 创建套接字
var socketFunc        func(int, int, int) (int, error)  = syscall.Socket

func sysSocket(family, sotype, proto int) (int, error) {
    // 通过系统调用创建一个 socket
    s, err = socketFunc(family, sotype, proto)
    // 通过系统调用将 socket 设置为非阻塞模式
    syscall.SetNonblock(s, true)
    // ...
}

这里分别使用 syscall.Socket 和 syscall.SetNonblock 创建 socket 并将其设置为非阻塞模式。

接下来就是 netFD.listenStream 方法,将 socket fd 和端口进行监听绑定,然后调用 epoll 指令设定 io 多路复用模式:

// listenFunc 宏指令,关联执行系统调用 syscall.Listen 监听端口
var listenFunc        func(int, int) error              = syscall.Listen

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    // ...
    // 调用 bind 系统调用,将 socket fd 和端口进行绑定
    syscall.Bind(fd.pfd.Sysfd, lsa)
    // 通过宏指令调用 listen 系统调
    listenFunc(fd.pfd.Sysfd, backlog)
    // 初始化 socket fd,在其中执行了 epoll 操作
    fd.init()
    // ...
}

这里的 netFD 是个文件描述符的封装,包含 sysfd 和 pollDesc 两个结构。

在初始化 pollDesc 时,调用 runtime 函数在系统上创建一个 epoll 实例。这里使用 sync.Once,保证了全局只会执行一次 epoll 的初始化。

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    // ...
}

func netpollinit() {
    // 执行 epoll_create 执行,开辟一块 epoll 池
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    // ...
}

获取 tcp 连接

回顾一下,用 c 写的时候,在调用 epoll_ctl 之前,需要先通过 accept 系统调用获取连接,返回对应的 socket 的 fd。在 golang 中,入口是 Listener.Accept:

func (l *TCPListener) Accept() (Conn, error) {
    // ...
    c, err := l.accept()
    // ...
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    // ...
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 倘若有 tcp 连接到达,则成功取出并返回
    // 倘若没有 tcp 连接到达,会 gopark 进入被动阻塞,等待被唤醒
    d, rsa, errcall, err := fd.pfd.Accept()
    // ...
}

下面就在 fd.Accept 方法中尝试获取tcp连接。我们需要先使用系统调用 syscall.Accept 以非阻塞模式尝试获取一次到达的连接。如果没有就绪连接,走入 pollDesc.waitRead,使用 gopark 操作阻塞当前 goroutine:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    // ...
    for {
        // 通过 accept 系统调用,尝试接收一个连接
        s, rsa, errcall, err := accept(fd.Sysfd)
        // ...
        switch err {    
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        // ...
        }
        // ...
    }
}

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    // ...
    res := runtime_pollWait(pd.runtimeCtx, mode)
    // ...
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // ...
    for !netpollblock(pd, int32(mode), false) {
        // ...
    }
    // ...
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // ...
    if waitio || netpollcheckerr(pd, mode) == pollNoError {
        // 调用 gopark 被动阻塞挂起
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // ...
}

如果 accept 系统调用成功获取到了到达的 tcp 连接,golang 会将其封装成为一个 netFD,通过 epoll_ctl 指令将该 fd 添加到 epoll 里,实现对获取到的连接的监听:

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 调用 accept 系统调用,接收 tcp 连接
    d, rsa, errcall, err := fd.pfd.Accept()
    // 将 connet fd 封装成 netfd
    netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
    // 对 netfd 进行初始化,底层会调用 epoll_ctl 将其注册到 listener 对应的 epoll 池中 
    netfd.init()
    // ...
}

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    // ...
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    // 将待监听的 socket fd 添加到 epoll 池中,并注册好回调路径
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd    
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

tcp 连接读数据

在获取到 tcp 连接后,在缓存区数据还没就绪的时候,用户执行 read 一样会阻塞:

func (c *conn) Read(b []byte) (int, error) {
    // ...
    n, err := c.fd.Read(b)
    // ...
    return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    // ...
}

func (fd *FD) Read(p []byte) (int, error) {
    // ...
    for {
        n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
        
        if err == syscall.EAGAIN && fd.pd.pollable() {
            fd.pd.waitRead(fd.isFile)
            // ...
        }
        // ...
    }
}

如果数据未就绪,会返回 syscall.EAGAIN 错误,同上一样触发 gopark 把 goroutine 挂起。

那么问题来了,golang runtime 要如何唤醒这些 goroutine 嘞?

在 runtime/proc.go 的 main 函数中,runtime 会单独启动一个 M(GMP 的 M),专门用于执行 sysmon 监控任务。在sysmon 函数中,会每隔 10ms 调用 netpoll 函数,尝试取出 io event 已到达的 goroutine,进行唤醒:

func sysmon() {
    // ...
    for {        
        // 每 10 ms 周期性执行一次
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // 取出就绪的 loop goroutine
            list := netpoll(0) 
            // ...
            // 唤醒 list 中的 goruotine
            injectglist(&list)
        }
        // ...
    }
}

这个 netpoll() 就会调用非阻塞模式的 epoll_wait 系统调用,获取到就绪事件队列 events,然后遍历事件队列,调用 netpollready 方法把对应的 goroutine 添加到 gList 中返回给上层,然后执行唤醒:

func netpoll(delay int64) gList {
    // ...
    var events [128]epollevent
retry:
    // 非阻塞调用 epoll_wait,接收到就绪的事件列表
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    // ...
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        // 添加关心的事件模式
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        // 从 event 中获取已就绪的 fd,调用 netpollready 方法将 fd 添加到 gList 中用于返回,在上层进行唤醒和执行
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.setEventErr(ev.events == _EPOLLERR)
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

对于唤醒,只需要把对应的 goroutine 添加到 toRun 链表就行:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 将 read、write 的就绪 fd 对应的 goroutine ,添加到 toRun 链表中
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

总结

由于事件驱动机制的软中断只能由内核来管理,所以在用户空间是无法完成 epoll 的工作的,必须借助 epoll 系统调用来完成,所以 golang runtime 中的有关逻辑主要是对 epoll 进行封装,在 goroutine 唤醒方面通过轮询非阻塞调用 epoll_wait,然后通过 goroutine 的相关调度逻辑进行唤醒。