Go BIO/NIO探讨(5):net库的非阻塞支持
2023-2-17 15:2:34 Author: Go语言中文网(查看原文) 阅读量:34 收藏

在涉及到Accept/Read/Write之类的操作时,Go net库默认使用了非阻塞的方式去实现,这样提高了性能,但给编程增加了额外的复杂度。本文我们将探讨下net库中如何用非阻塞的方式实现读写。为了兼顾底层细节(I/O概念)、同时了解上层如何应用(net库实现),分为五个部分:

  1. I/O模型概述:了解缓冲区的概念

  2. 阻塞vs非阻塞的区别

  3. net.Listener & net.Conn

  4. net.TCPListener & net.TCPConn

  5. net.conn上的非阻塞读写

I/O模型概述

在Linux下我们可以使用5种I/O模型:

  • Blocking I/O (阻塞式I/O)

  • NonBlocking I/O (非阻塞式 I/O)

  • I/O Multiplexing(I/O多路复用): select、poll、epoll

  • Signal Driven I/O (信号驱动式I/O)

  • AIO (posix异步I/O)

为了方便理解这五种模式的区别,首先需要理解进程和tcp/ip网络栈的交互模式。从套接字read数据或向套接字write数据时,应用进程与套接字的缓冲区进行交互。应用进程运行在用户态,套接字缓冲区在内核态,所以数据的拷贝是必不可少的。流程图如下:

我们把这个流程分为两个阶段:

  1. 第一阶段:read数据可用/write缓冲区可用之前,等待的过程

  2. 第二阶段:read数据可用/write缓冲区可用之后,数据拷贝的过程

基于这样的拆分方式,以read为例,上面的5种I/O模型工作方式如下:

Go net/http库使用了非阻塞I/O,无论是在监听套戒指上通过accept获取新的tcp连接,还是在已连接套接字上进行read/write,走的都是上图中第二种模式。我们看一下细节。

阻塞vs非阻塞

对于阻塞/非阻塞的套接字, 执行accept系统调用时:

  1. 阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp connection(已完成三次握手)

  2. 非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误

执行read/recvfrom/recvmsg系统调用时:

  1. 阻塞的sockfd: 调用方会一直被阻塞,直到数据到达套接字缓冲区或发生错误才返回

  2. 非阻塞的sockfd: 进程缓冲区没有数据时,函数会返回 EAGAIN 或 EWOULDBLOCK

执行write/sendto/sendmsg系统调用时:

  1. 阻塞的sockfd: 调用方一直被阻塞,直到数据全部写入套接字缓冲区

  2. 非阻塞的sockfd: 如果缓冲区空间不够,会返回EAGAIN或EWOULDBLOCK,如果被其他信号打断,则返回EINTER

Go net/http库走的是per goroutine per tcp connection模式,不过网络模式是NonBlocking I/O。在用户态校验accept/read/write系统调用的错误码,并通过waitRead/waitWrite操作(底层是runtime_pollWait)函数进行polling。

polling 翻译成中文是“轮询”, polling 有两种:

  1. busy wait(忙等待): 占用一个cpu,持续地查询一个I/O设备的状态

  2. interrupt-driven(中断驱动): polling的进程/线程进入休眠状态,等待中断信号的唤醒

显然,net库使用的是后者。在开始聊net库的polling之前,先回顾下net库里的常见的结构:net.Listener和net.Conn

net.Listener & net.Conn

回顾一下之前聊过的概念,HTTP是应用层协议,其下层是传输层传输层,传输层协议有TCP/UDP。Server端有两类套接字

一类是通过socket系统调用创建的监听套接字(LISTEN状态),只有一个。这个套接字默认状态是CLOSED,通过bind绑定IP:port,通过listen启动监听;
另一类是通过accept系统调用获取的已连接套接字(ESTABLISHED状态);

在Go net库中,net.Lister interface 定义了监听套接字上的行为;net.Conn定义了已连接套接字的行为。

net.Listener interface 的具体实现有 TCPListener和UnixListener,分别对应TCP/Unix Server的实现。通过调用Accept方法,可以获取新的net.Conn,对应的实现是TCPConn/UnixConn:

// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error

// Addr returns the listener's network address.
Addr() Addr
}

net.Conn interface定义了在一个已连接套接字上的行为,比如我们要聊的Read/Write。

// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)

// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
Close() error

// LocalAddr returns the local network address, if known.
LocalAddr() Addr

// RemoteAddr returns the remote network address, if known.
RemoteAddr() Addr

// ... 省略部分代码
}

在具体实现中,net库定义了一个conn struct,作为TCPConn/UnixConn/IPConn/UDPConn的基础,嵌入到这些结构中。然而,这些XXXConn并不直接实现net.Conn Read/Write方法,而是都依赖 conn struct,自己只封装独有的操作。

type conn struct {
fd *netFD
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}

// UnixConn is an implementation of the Conn interface for connections
// to Unix domain sockets.
type UnixConn struct {
conn
}

为了更好理解这个过程,我们看看net.TCPListener和net.TCPConn的实现。

net.TCPListener & net.TCPConn

net.TCPListener有两个成员变量:

  • fd是对监听套接字的封装,socket系统调用的返回值是int值,net.netFD是面向对象方式的封装

  • lc是监听的配置,比如ipv6支持,keepalive配置

accept方法依赖net.netFD通过NonBlock的方式获取新的TCPConn,它是对已连接套接字的封装。代码如下:

// net/tcpsock_posix.go

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

// net/tcpsock.go
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}

通过Accept得到的TCPConn实例,它的表面类型是net.Conn,底层类型是*TCPConn。在net/http库里,通过server.newConn将其封装成一个net/http.conn对象。tcp conn上的数据读写和HTTP协议的实现都被封装在net/http.conn struct里。

func (srv *Server) Serve(l net.Listener) error {
for {
// rw 表面类型是net.Conn, 底层类型是*net.TCPConn
rw, err := l.Accept()
// ...省略部分逻辑

// c 的类型是 *http.conn
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return

// http处理连接上的数据
go c.serve(connCtx)
}

// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}

从套接字读取数据,数据可用后,操作上是把数据从套接字缓冲区(内核态)拷贝到应用缓冲区(用户态)。http.conn struct的实现是:通过bufio.NewReader分配一个大小为4096的缓冲区c.bufr,后续应用层只需要读取自己的缓冲区。

// net/http.conn serve方法
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)

读取数据以后的处理涉及到HTTP协议和Web框架,我们这里进行深入探讨。Mozilla MDN网站上有对HTTP协议的详细解释,net/http库对这个标准进行了实现,有兴趣的话可以结合那个文档一起看。

net.conn上非阻塞读写

Go net/http库中,net.conn上的Read/Write是通过非阻塞的方式实现的。当数据可用时,就是读取缓冲区;数据不可用时,具体的方式就是polling。

polling操作会通过 gopark 函数休眠当前goroutine。runtime_pollWait函数的实现被链接到poll_runtime_pollWait函数。在linux系统下,它会循环调用netpollblock函数,以支持超时重试。涉及polling逻辑的代码调用是:

  1. net/net.go: conn struct, 封装了基于流的网络连接,更关心之上的数据读写;

  2. net/fd_posix.go: netFD struct, 对操作套接字socket的面向对象封装, 提供accept/connect/read/write/close等操作语义;

  3. internal/poll/fd_unix.go: FD struct, 对文件描述符的面向对象封装,它可以是socket或操作系统文件描述符;

  4. internal/poll/fd_poll_runtime.go: pollDesc struct, 提供polling的能力,方法有waitRead/waitWrite等;

  5. internal/poll/fd_poll_runtime.go: func runtime_pollWait 支持polling的底层方法。

由此我们也可以发现,polling不仅仅是对于网络文件描述符(套接字/socket),对于普通文件描述符也同样适用。

以Read为例子,函数调用链路的代码如下:

// 位置: net/net.go
type conn struct {
fd *netFD
}
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// 位置: net/fd_posix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}

// 位置: internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc

// ...省略部分代码
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...省略部分代码
// 从Sysfd缓冲区读取数据写入goroutine缓冲区p
// 忽略中断信号 EINTER
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}

通过面向对象的封装,对代码逻辑进行了分层,每层的struct有自己的职能。I/O polling 涉及到 runtime 的职能范围,所以通过 go:linkname 链接到runtime库的函数。上面提到的internal/poll.runtime_pollWait的代码实现是 runtime.poll_runtime_pollWait,下面是具体的代码实现:

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...省略部分代码

for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}

poll_runtime_pollWait 通过一个for循环反复调用 netpollblock 函数,直到其返回pollNoError。 在netpollblock 函数内部的工作流程如下:

  1. 获取当前goroutine的polling状态,可以是pdReady或pdWait

  2. 将其置为pdWait状态

  3. 通过gopark休眠当前goroutine

  4. (一段时间后) 当前goroutine被唤醒,其polling状态必然是pdReady,做最后的校验

  5. (polling状态是ready,进行后续的read/write)

关键流程的代码如下:

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// poller持有的goroutine
// rg 是read goroutine
// wg 是write goroutine
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// 把goroutine置为等待状态
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}

// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}

// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 当前goroutine被唤醒
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

在Go net/http库中,accept每成功获取一个新的套接字(等价于tcp conn),都会启动一个新的goroutine对这个套接字进行polling。现代的大型web服务通常支持非常高的qps,流量高峰到来时,会导致goroutine数量暴增,副作用主要有两方面:gc性能衰减和Goroutine的调度性能恶化。从软件层面,runtime设置了goroutine的上限是10000个(sched.maxmcount), linux能支持的文件描述符数量也有一个上限。具体到多少个goroutine服务器的性能会出现严重衰减,与业务逻辑有很大的相关性。

对于Java等其他语言,往往通过操作系统线程去处理一个tcp conn,线程上下文切换的代价比goroutine高很多。为了解决这类问题,有了I/O多路复用和异步I/O模型。

篇幅有限,下篇文章接着说。


推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651454090&idx=1&sn=c47afb90d513ab5faacd5cd833cefedc&chksm=80bb2478b7ccad6ea54914e69fc73dec9c2f320d0acf48852ebeae72723fbea2d8e3b15dca0a#rd
如有侵权请联系:admin#unsafe.sh