Golang 网络 IO 和 epoll 调用
之前看了 linux 内核关于网络收包和 IO 多路复用相关的原理:Linux 网络收包与 IO 多路复用 - tk_sky的博客 (mcyou.cc)
今天来看看 golang 对网络 IO 模型的包装和向上实现。在 linux 平台上,golang 使用 epoll 来实现。在其他平台上,编译时会选择其他的解决方案。
一个标准的 tcp 服务器
先来看看在 golang 中一个标准的 tcp 服务要怎么完成:
// 启动一个 tcp 服务端代码示例
func main(){
// 创建一个 tcp 端口监听器
l,_ := net.Listen("tcp",":8080")
// 主动轮询模型
for{
// 等待 tcp 连接到达
conn,_ := l.Accept()
// 开启一个 goroutine 负责一次客户端请求的处理
go serve(conn)
}
}
// 处理一个 tcp 连接
func serve(conn net.Conn){
defer conn.Close()
var buf []byte
// 读取连接中的数据
_,_ = conn.Read(buf)
// 业务逻辑...
}
可以看出有下面几个比较核心的步骤:
- 创建 tcp 端口监听器(listener)
- 使用 for 循环建立主动轮询模型
- 每轮尝试从 listener 中获取到达的 tcp 连接
- 对取到的每个连接启动一个 goroutine 异步处理连接
- 如果没有连接到达,就阻塞
我们主要从创建 listener 的 net.listen,以及获取连接的 listner.Accept 方法作为入口去看整个流程。
创建端口监听器
要创建 listener,listen 方法会根据 listener 的 config 判断对应的协议,这里是 tcp。接着就是创建 socket 了。golang 做 socket 创建和初始化的代码在 runtime/sock_posix.go:
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// ...
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
在 socket 方法中,要发起两次系统调用:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 创建一个 socket 套接字
s, err := sysSocket(family, sotype, proto)
// ...
// 绑定、监听端口,并对 socket fd 进行初始化. epoll_create 和 epoll_ctl 执行的执行正是在初始化的流程中.
fd.listenStream(laddr, listenerBacklog(), ctrlFn)
// ...
}
// socketFunc 宏指令,关联执行系统调用 syscall.Socket 创建套接字
var socketFunc func(int, int, int) (int, error) = syscall.Socket
func sysSocket(family, sotype, proto int) (int, error) {
// 通过系统调用创建一个 socket
s, err = socketFunc(family, sotype, proto)
// 通过系统调用将 socket 设置为非阻塞模式
syscall.SetNonblock(s, true)
// ...
}
这里分别使用 syscall.Socket 和 syscall.SetNonblock 创建 socket 并将其设置为非阻塞模式。
接下来就是 netFD.listenStream 方法,将 socket fd 和端口进行监听绑定,然后调用 epoll 指令设定 io 多路复用模式:
// listenFunc 宏指令,关联执行系统调用 syscall.Listen 监听端口
var listenFunc func(int, int) error = syscall.Listen
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...
// 调用 bind 系统调用,将 socket fd 和端口进行绑定
syscall.Bind(fd.pfd.Sysfd, lsa)
// 通过宏指令调用 listen 系统调
listenFunc(fd.pfd.Sysfd, backlog)
// 初始化 socket fd,在其中执行了 epoll 操作
fd.init()
// ...
}
这里的 netFD 是个文件描述符的封装,包含 sysfd 和 pollDesc 两个结构。
在初始化 pollDesc 时,调用 runtime 函数在系统上创建一个 epoll 实例。这里使用 sync.Once,保证了全局只会执行一次 epoll 的初始化。
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
// ...
}
func netpollinit() {
// 执行 epoll_create 执行,开辟一块 epoll 池
epfd = epollcreate1(_EPOLL_CLOEXEC)
// ...
}
获取 tcp 连接
回顾一下,用 c 写的时候,在调用 epoll_ctl 之前,需要先通过 accept 系统调用获取连接,返回对应的 socket 的 fd。在 golang 中,入口是 Listener.Accept:
func (l *TCPListener) Accept() (Conn, error) {
// ...
c, err := l.accept()
// ...
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
// ...
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 倘若有 tcp 连接到达,则成功取出并返回
// 倘若没有 tcp 连接到达,会 gopark 进入被动阻塞,等待被唤醒
d, rsa, errcall, err := fd.pfd.Accept()
// ...
}
下面就在 fd.Accept 方法中尝试获取tcp连接。我们需要先使用系统调用 syscall.Accept 以非阻塞模式尝试获取一次到达的连接。如果没有就绪连接,走入 pollDesc.waitRead,使用 gopark 操作阻塞当前 goroutine:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// ...
for {
// 通过 accept 系统调用,尝试接收一个连接
s, rsa, errcall, err := accept(fd.Sysfd)
// ...
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// ...
}
// ...
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
// ...
res := runtime_pollWait(pd.runtimeCtx, mode)
// ...
}
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...
for !netpollblock(pd, int32(mode), false) {
// ...
}
// ...
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// ...
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// 调用 gopark 被动阻塞挂起
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// ...
}
如果 accept 系统调用成功获取到了到达的 tcp 连接,golang 会将其封装成为一个 netFD,通过 epoll_ctl 指令将该 fd 添加到 epoll 里,实现对获取到的连接的监听:
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用 accept 系统调用,接收 tcp 连接
d, rsa, errcall, err := fd.pfd.Accept()
// 将 connet fd 封装成 netfd
netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
// 对 netfd 进行初始化,底层会调用 epoll_ctl 将其注册到 listener 对应的 epoll 池中
netfd.init()
// ...
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
// ...
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
// 将待监听的 socket fd 添加到 epoll 池中,并注册好回调路径
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
tcp 连接读数据
在获取到 tcp 连接后,在缓存区数据还没就绪的时候,用户执行 read 一样会阻塞:
func (c *conn) Read(b []byte) (int, error) {
// ...
n, err := c.fd.Read(b)
// ...
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
// ...
}
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err == syscall.EAGAIN && fd.pd.pollable() {
fd.pd.waitRead(fd.isFile)
// ...
}
// ...
}
}
如果数据未就绪,会返回 syscall.EAGAIN 错误,同上一样触发 gopark 把 goroutine 挂起。
那么问题来了,golang runtime 要如何唤醒这些 goroutine 嘞?
在 runtime/proc.go 的 main 函数中,runtime 会单独启动一个 M(GMP 的 M),专门用于执行 sysmon 监控任务。在sysmon 函数中,会每隔 10ms 调用 netpoll 函数,尝试取出 io event 已到达的 goroutine,进行唤醒:
func sysmon() {
// ...
for {
// 每 10 ms 周期性执行一次
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
// 取出就绪的 loop goroutine
list := netpoll(0)
// ...
// 唤醒 list 中的 goruotine
injectglist(&list)
}
// ...
}
}
这个 netpoll() 就会调用非阻塞模式的 epoll_wait 系统调用,获取到就绪事件队列 events,然后遍历事件队列,调用 netpollready 方法把对应的 goroutine 添加到 gList 中返回给上层,然后执行唤醒:
func netpoll(delay int64) gList {
// ...
var events [128]epollevent
retry:
// 非阻塞调用 epoll_wait,接收到就绪的事件列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// ...
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
// 添加关心的事件模式
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
// 从 event 中获取已就绪的 fd,调用 netpollready 方法将 fd 添加到 gList 中用于返回,在上层进行唤醒和执行
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode)
}
}
return toRun
}
对于唤醒,只需要把对应的 goroutine 添加到 toRun 链表就行:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将 read、write 的就绪 fd 对应的 goroutine ,添加到 toRun 链表中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
总结
由于事件驱动机制的软中断只能由内核来管理,所以在用户空间是无法完成 epoll 的工作的,必须借助 epoll 系统调用来完成,所以 golang runtime 中的有关逻辑主要是对 epoll 进行封装,在 goroutine 唤醒方面通过轮询非阻塞调用 epoll_wait,然后通过 goroutine 的相关调度逻辑进行唤醒。