在上一篇文章《深入理解 go chan》中,我们讲解了 chan 相关的一些概念、原理等东西, 今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。
整体设计
我们可以从以下三个角度看 chan 的设计(源码位于 runtime/chan.go,结构体 hchan 就是 chan 的底层数据结构):
- 存储:chan 里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲 chan 不用存储,会直接从 sender 复制到 receiver)
- 发送:数据发送到 chan 的时候,如果 chan 满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan 空闲的时候会唤醒这个协程队列。如果 chan 没满,则发送队列为空。
- 接收:从 chan 中接收数据的时候,如果 chan 是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当 chan 有数据的时候会唤醒这个协程队列。如果 chan 有数据,则接收队列为空。
文中一些比较关键的名词解释:
- sender: 表示尝试写入 chan 的 goroutine。
- receiver: 表示尝试从 chan 读取数据的 goroutine。
- sendq 是一个队列,存储那些尝试写入 channel 但被阻塞的 goroutine。
- recvq 是一个队列,存储那些尝试读取 channel 但被阻塞的 goroutine。
- g 表示一个协程。
- gopark 是将协程挂起的函数,协程状态:_Grunning => _Gwaiting。
- goready 是将协程改为可运行状态的函数,协程状态: _Gwaiting => _Grunnable。
现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下 chan 的总体设计:
package main
func main() {
// 创建一个缓冲区大小为 9 的 chan
ch := make(chan int, 9)
// 往 chan 写入 [1,2,3,4,5,6,7]
for i := 0; i < 7; i++ {
ch <- i + 1
}
// 将 1 从缓冲区移出来
<-ch
}
现在,我们的 chan 大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:
上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和 sendq 至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。
数据结构
在底层,go 是使用 hchan 这个结构体来表示 chan 的,下面是结构体的定义:
type hchan struct {
qcount uint // 缓冲区(环形队列)元素个数
dataqsiz uint // 缓冲区的大小(最多可容纳的元素个数)
buf unsafe.Pointer // 指向缓冲区入口的指针(从 buf 开始 qcount * elemsize 大小的内存就是缓冲区所用的内存)
elemsize uint16 // chan 对应类型元素的大小(主要用以计算第 i 个元素的内存地址)
closed uint32 // chan 是否已经关闭(0-未关闭,1-已关闭)
elemtype *_type // chan 的元素类型
sendx uint // chan 发送操作处理到的位置
recvx uint // chan 接收操作处理到的位置
recvq waitq // 等待接收数据的协程队列(双向链表)
sendq waitq // 等待发送数据的协程队列(双向链表)
// 锁
lock mutex
}
waitq 的数据结构如下:
type waitq struct {
first *sudog
last *sudog
}
waitq 用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。
对应上图各字段详细说明
hchan,对于 hchan 这个结构体,我们知道,在 go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解 go unsafe》),所以图中用了连续的一段单元格表示。
下面是各字段说明:
- qcount: 写入 chan 缓冲区元素个数。我们的代码往 chan 中存入了 7 个数,然后从中取出了一个数,最终还剩 6 个,因此 qcount 是 6。
- dataqsiz: hchan 缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过 make 创建的时候传入的,是 9。
- buf:hchan 缓冲区指针。指向了一个数组,这个数组就是用来保存发送到 chan 的数据的。
- sendx、recvx:写、读操作的下标。指向了 buf 指向的数组中的下标,sendx 是下一个发送操作保存的下标,recvx 是下一个接收操作的下标。
- recvq、sendq: 阻塞在 chan 读写上的协程列表。底层是双向链表,链表的元素是 sudog(sudog 是一个对 g 的封装),我们可以简单地理解为 recvq 和 sendq 的元素就是 g(协程)。
g 和 sudog 是什么?
上面提到了 g 和 sudog,g 是底层用来表示协程的结构体,而 sudog 是对 g 的封装,记录了一些额外的信息,比如关联的 hchan。
在 go 里面,协程调度的模型是 GMP 模型,G 代表协程、M 代表线程、P 表示协程调度器。我上图里面的 G 就是代表协程(当然,实际上是 sudog)。
还有一个下面会提到的就是 g0,g0 表示 P 上启动的第一个协程。
GMP 模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在 chan 阻塞的时候实际上也是一个协程调度的过程。
具体来说,就是从 g 的栈切换到 g0 的栈,然后重新进行协程调度。这个时候 g 因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行,
而是会去找其他可执行的协程来执行。
创建chan
我们的 make(chan int, 9) 最终会调用 makechan 方法:
// chantype 是 chan 元素类型,size 是缓冲区大小
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 检查元素个数是否合法(不能超过 1<<16 个)
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判断内存是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// mem 是 chan 缓冲区(环形队列)所需要的内存大小
// mem = 元素大小 * 元素个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 定义 hchan
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小是 0(比如 make(chan int, 0))
// 只需要分配 hchan 所需要的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// ...
case elem.ptrdata == 0:
// elem 类型里面不包含指针
// 分配的内存 = hchan 所需内存 + 缓冲区内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 分配的是连续的一段内存,缓冲区内存在 hchan 后面
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型里面包含指针
c = new(hchan)
// buf 需要另外分配内存
c.buf = mallocgc(mem, elem, true)
}
// 单个元素的大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 缓冲区大小
c.dataqsiz = uint(size)
// ...
}
创建 chan 的过程主要就是给 hchan 分配内存的过程:
- 非缓冲 chan,只需要分配 hchan 结构体所需要的内存,无需分配环形队列内存(数据会直接从 sender 复制到 receiver)
- 缓冲 chan(不包含指针),分配 hchan 所需要的内存和环形队列所需要的内存,其中 buf 会紧挨着 hchan
- 缓冲 chan(含指针),hchan 和环形队列所需要的内存单独进行分配
对应到文章开头的图就是,底下的 hchan 和 buf 那两段内存。
发送数据
<- 语法糖
在《深入理解 go chan》中,我们说也过,<- 这个操作符号是一种语法糖, 实际上,<- 会被编译成一个函数调用,对于发送操作而言,c <- x 会编译为对下面的函数的调用:
// elem 是被发送到 chan 的数据的指针。
// 对于 ch <- x,ch 对应参数中的 c,unsafe.Pointer(&x) 对应参数中的 elem。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
另外,对于 select 里面的调用,chansend 会返回一个布尔值给 select 用来判断是否是要选中当前 case 分支。
如果 chan 发送成功,则返回 true,则 select 的那个分支得以执行。(select…case 本质上是 if…else,返回 false 表示判断失败。)
chansend 第二个参数的含义
chansend 第二个参数 true 表示是一个阻塞调用,另外一种是在 select 里面的发送操作,在 select 中的操作是非阻塞的。
package main
func main() {
ch := make(chan int, 2)
ch <- 1 // 如果 ch 满了,会阻塞
select {
case ch <- 3: // 非阻塞
}
}
在 select 中对 chan 的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan 满或者空无法发送或接收, 则不会导致阻塞在 case 的某一个分支上,还可以继续判断其他 case 分支。
select 中的 send 实现:
// go 代码:
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 实际效果:
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
// select 里面往 chan 发送数据的分支,返回的 selected 表示当前的分支是否被选中
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend 发送实现
- 1.发送到 nil chan(select 中发送不阻塞,其他情况阻塞)
如果是在 select 的 case 里面发送,则不会阻塞,其他情况会导致当前 goroutine 挂起,永远阻塞:
示例代码
// 下面的代码运行会报错:
var ch chan int
// 发送到 nil chan 会永久阻塞
ch <- 1
select {
// 这个发送失败,但是不会阻塞,可继续判断其他分支。
case ch <- 3:
}
- 2.发送到满了的 chan(select 中发送不阻塞,其他情况阻塞)
对于无缓冲而且又没有 receiver,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为 full,也就是满了,满了的 chan 是放不下任何数据了的,所以就无法再往 chan 发送数据了):
receiver 表示等待从 chan 接收数据的协程。
对于满了的 chan,什么时候可以再次发送呢?那就是有 receiver 接收数据的时候。chan 之所以会满就是因为没有 receiver,也就是没有从 chan 接收数据的协程。
A. 对于无缓冲的 chan,在满了的情况下,当有 receiver 来读取数据的时候,数据会直接从 sender 复制到 receiver 中:
B. 对于有缓冲,但是缓冲满了的情况(图中 chan 满了,并且有两个 g 正在等待写入 chan):
这个发送过程大概如下:
- receiver 从 chan 中获取到 chan 队头元素,然后 chan 的队头元素出队。
- 发送队列 sendq 对头元素出队,将其要发送的数据写入到 chan 缓冲中。最后,sendq 只剩下一个等待写入 chan 的 g
示例代码
package main
// 注意:以下代码可能不能正常执行,只是为了描述问题。
func main() {
// 情况 2.A.
var ch1 = make(chan int) // 无缓冲的 chan
ch1 <- 1 // 阻塞
select {
// 不阻塞,但是不会执行这个分支
case ch1 <- 1:
}
// 情况 2.B.
var ch2 = make(chan int, 1) // 有缓冲,缓冲区容量为 1
ch2 <- 1 // 1 写入之后,ch2 的缓冲区满了
go func() {
ch2 <- 2 // 阻塞,调用 gopark 挂起
}()
go func() {
ch2 <- 3 // 阻塞
}()
select {
// 不阻塞,但是不会执行这个分支
case ch2 <- 4:
}
}
- 3.发送到有缓冲,但是缓冲还没满的 chan(不阻塞,发送成功)
这种情况比较简单,就是将 sender 要发送的数据写入到 chan 缓冲区:
示例代码
var ch = make(chan int, 1)
// 不阻塞,1 写入 chan 缓冲区
ch <- 1
chansend 源码解读
阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用 gopark 来将协程挂起,然后当前协程陷入阻塞状态。
非阻塞模式下(select),在发送过程中,任何无法发送的情况,都会直接返回 false,表示发送失败。
// 参数说明:
// c 表示 hchan 实例
// ep 表示要发送的数据所在的地址
// block 是否是阻塞模式(select 语句的 case 里面的发送是非阻塞模式,其他情况是阻塞模式)
// 非阻塞模式下,遇到无法发送的情况,会返回 false。阻塞模式下,遇到无法发送的情况,协程会挂起。
// 返回值:表示是否发送成功。false 的时候,如果是 select 的 case,则表示没有选中这个 case。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 情况 1:nil chan
if c == nil {
// select 语句里面发送数据到 chan 的操作失败,直接返回 false,表示当前的 case 没有被选中。
if !block {
// select 分支没有被选中
return false
}
// 阻塞模式,协程挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ... 其他代码...
// 不获取锁的情况下快速失败。select 中 chan 满了的时候无法发送成功,直接返回 false,协程无需挂起。
// 场景:非阻塞模式、chan 未关闭、chan 已满(无缓冲且没有接收数据的协程、或者有缓冲但是缓冲区满)
if !block && c.closed == 0 && full(c) {
return false
}
// ... 其他代码...
// 获取锁
lock(&c.lock)
// 如果 chan 已经关闭,则释放锁并 panic,不能往一个已经关闭的 chan 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 情况 2.A,又或者是有缓冲但是缓冲区空,有一个正在等待接收数据的 receiver。
// 如果有协程在等待接收数据(说明 chan 缓冲区空、或者 chan 是无缓冲的)
// 则直接将元素传递给这个接收数据的协程,这样就避免了 sender -> chan -> receiver 这个数据复制的过程,效率更高。
// 返回 true 表示 select 的分支可以执行(发送成功)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 情况 3,发送到缓冲 chan,且 chan 未满
// 没有协程在等待接收数据。
// 缓冲区还有空余,则将数据写入到 chan 的缓冲区
if c.qcount < c.dataqsiz {
// 获取写入的地址
qp := chanbuf(c, c.sendx)
// 通过内存复制的方式写入
typedmemmove(c.elemtype, qp, ep)
// 写入的下标指向下一个位置
c.sendx++
// 如果到超出环形队列尾了,则指向第一个位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// chan 里面的元素个数加上 1
c.qcount++
// 释放锁
unlock(&c.lock)
// 发送成功,返回 true
return true
}
// 没有协程在接收数据,而且缓冲区满了。
// 如果是 select 语句里面的发送,则释放锁,返回 false
if !block {
unlock(&c.lock)
return false
}
// 发不出去,当前协程阻塞。
// 阻塞模式下,缓冲区满了,需要将当前协程挂起。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // chan 要操作的元素指针
mysg.waitlink = nil
mysg.g = gp // sudog 上的 g 属性
mysg.isSelect = false // 如果是 select,上面已经返回了,因此这里是 false
mysg.c = c // sudog 上的 c 属性
gp.waiting = mysg // g 正在等待的 sudog
gp.param = nil // 当通道操作唤醒被阻塞的 goroutine 时,它将 param 设置为指向已完成的阻塞操作的 sudog
c.sendq.enqueue(mysg) // 将 sudog 放入发送队列
// 在 chan 读写上阻塞的标志
gp.parkingOnChan.Store(true)
// 最关键的一步:将当前协程挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保证 ep 指向的地址不被垃圾回收器回收
KeepAlive(ep)
// ...被唤醒了之后的一些收尾操作...
return true
}
// 参数说明:c 是 chan 实例,sg 是等待接收数据的 g,ep 是被发送进 chan 的数据,unlockf 是释放锁的函数。
// 空 chan 上发送,会直接发送给等待接收数据的协程。
// ep 指向的值会被复制到 sg 中(ep -> sg,ep 是被发送的值,sg 是要接收数据的 g)。
// 接收数据的协程会被唤醒。
// 通道 c 必须是空的并且获取了锁。send 会通过 unlockf 来释放锁。
// sg 必须已从 c 中退出队列(从 recvq 这个接收队列中移除)。
// ep 必须不能为 nil,同时指向堆或者调用者的栈。
// sg 是接收队列上的 g。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...其他代码...
// 如果没有忽略返回值,将值直接从 ep 复制到 sg 中
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最关键的一步:唤醒等待队列中的那个接收到数据的 g
//(也就是之前因为接收不到数据而被阻塞的那个 g)
goready(gp, skip+1)
}
// 参数:t 是 chan 的元素类型,sg 是接收数据的 g(协程),src 是被发送的数据的指针。
// 场景:无缓冲 chan、有缓冲但是缓冲区没数据。
// 作用:将数据直接从发送数据的协程复制到接收数据的协程。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 将 ep 的值直接复制到 sg 中
memmove(dst, src, t.size)
}
// full 报告 c 上的发送是否会阻塞(即通道已满)。
func full(c *hchan) bool {
// c.dataqsiz 是不可变的(创建 chan 后不会再去修改)
// 因此在 chan 操作期间的任何时间读取都是安全的。
if c.dataqsiz == 0 {
// 如果是非缓冲 chan,则看接收队列有没有数据,有则表明满了(没有正在发送的 g)
return c.recvq.first == nil
}
// 如果是缓冲 chan,只需要比较实际元素总数跟缓冲区容量即可
return c.qcount == c.dataqsiz
}
接收数据
<- 语法糖
在发送数据的那一节我们提到了,ch <- x 编译之后,实际上是对 chansend1 的函数调用。同样的,在接收数据的时候, <- 这个操作符也会根据不同情况编译成不同的函数调用:
// elem 是用来保存从 c 中接收到的值的地址的指针
// <- c 编译器处理之后实际上就是下面的这个函数调用。(从通道接收,但是忽略接收到的值)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// received 表示是否是从 chan 中接收到的(如果 chan 关闭,则接收到的是零值,received 是 false)
// v, ok := <-c 编译之后的函数(从通道接收,第一个 v 对应 elem,第二个 ok 对应 received)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// select 里面的接收操作:
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 实际 go 实现
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
//
// select 里面从 chan 接收数据的分支,返回的 selected 表示当前的分支是否被选中,received 表示是否有数据被接收到
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
还需要再提醒一下的是:chan 关闭之后,并且 chan 缓冲区所有数据被接收完之后,received 才会是 false,并不是一关闭 received 马上返回 false
chanrecv 函数 block 参数的含义
跟 chansend 中的 block 参数的作用一样,用来判断是否是 select 模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。
chanrecv 接收数据实现
- 1.从 nil chan 接收(select 中接收不阻塞,其他情况阻塞)
从 nil chan 中读取的时候,如果是阻塞模式,会调用 gopark 将协程阻塞起来。
示例代码
var ch chan int
<-ch
- 2.从空 chan 接收(select 中接收不阻塞,其他情况阻塞)
判断空的条件为:无缓冲并且没有等待发送数据的 g,或者有缓冲但是缓冲区无数据。
示例代码
package main
// 注意:以下代码执行不了,只是展示一下实际中对应的代码
func main() {
// 情况 1,无缓冲的 chan,空的
var ch1 = make(chan int)
<-ch1 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch1:
}
// 情况 2,有缓冲的 chan,空的
var ch2 = make(chan int, 1)
<-ch2 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch2:
}
}
3.从缓冲区满的 chan 接收(不会阻塞,这个时候 sendq 一定不为空)
4.从缓冲区不满的 chan 接收(不会阻塞)
示例代码
package main
func main() {
var ch = make(chan int, 2)
ch <- 1
// 从缓冲区没满的 chan 接收
<-ch
}
chanrecv 源码解读
chanrecv 函数:
- 参数:c 是 chan 实例,ep 是用来接收数据的指针,block 表示是否是阻塞模式。
- 返回值:selected 表示 select 语句的 case 是否被选中,received 表示接收到的值是否有效。
- 功能:从 c 这个通道接收数据,同时将接收到的数据写入到 ep 里。
概览:
- ep 可能是 nil,这意味着接收到的值被忽略了(对应 <-c 这种形式的接收)。
- 如果是非阻塞模式,并且通道无数据,返回 (false, false),也就是 select 语句中的 case 不会被选中。
- 否则,如果 c 关闭了,会对 ep 指向的地址设置零值,然后返回 (true, false)。如果是 select 语句,意味被选中,
- 但是 received 为 false 表明返回的数不是通道关闭之前发送的。
- 否则,将从通道中获取到的值写入 ep 指向的地址,并且返回 (true, true)
- 一个非 nil 的 ep 必须指向堆或者调用者的栈。
// 从 c 读取数据,写入到 ep 指向的地址。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// c 是 nil chan
if c == nil {
// select 里面的 case 不会被选中
if !block {
return
}
// 阻塞模式时,协程挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// 在实际执行的时候,如果其他协程都执行完了,只剩下这一个协程(又或者全部协程都是睡眠状态,并且无法被唤醒的那种),那么会报错:
// "fatal error: all goroutines are asleep - deadlock!"
throw("unreachable")
}
// 如果是非阻塞模式(select),并且 c 是空的
if !block && empty(c) {
// chan 未关闭,并且是空的,返回 false,false
if atomic.Load(&c.closed) == 0 {
return
}
// chan 已经关闭,并且 chan 是空的
if empty(c) {
// ...
// 返回一个零值
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// select 分支被选中,但是返回值是无效的,是一个零值
return true, false
}
}
// ...
// 获取锁
lock(&c.lock)
// chan 已关闭
if c.closed != 0 {
// chan 已经关闭,同时也没有数据
if c.qcount == 0 {
// ...
// 释放锁
unlock(&c.lock)
if ep != nil {
// 设置零值
typedmemclr(c.elemtype, ep)
}
// select 的分支被选中,但是返回值无效
return true, false
}
} else {
// chan 未关闭,并且有一个等待发送的元素(对应情况:chan 是满的或者无缓冲而且没有 receiver)
// 如果无缓冲:则将元素直接从 sender 复制到 receiver 中。
// 否则:意味着 c 的缓冲区满了,从环形队列中接收值,将 sg 需要发送的值添加到环形队列尾,
// 实际上这个时候,队列头和队列尾都是同一个位置,因为队列满了。
// 只不过,队列头和队列尾指向的位置会发生变化(都加 1,然后对缓冲区长度取模)。
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个 sender。
// 如果无缓冲,直接从 sender 复制到 receiver
// 否则,环形队列对头元素复制给 receiver,sender 要发送的元素复制进环形队列队尾。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// select 分支被选中,接收成功,并且接收的值是有效的。
return true, true
}
}
// 缓冲区有数据,并且缓冲区没满
if c.qcount > 0 {
// qp 是被接收元素的地址
qp := chanbuf(c, c.recvx)
// ...
// 将 qp 指向的值复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空队列中 ep 的空间(设置为零值)
typedmemclr(c.elemtype, qp)
// 被接收的下标指向下一个元素
c.recvx++
// 环形队列,回到开头
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区长度减 1
c.qcount--
// 释放锁
unlock(&c.lock)
// select 分支被选中,并且接收的值是有效的。
return true, true
}
// 缓冲区空的,并且是非阻塞(select)
if !block {
// 释放锁
unlock(&c.lock)
// 返回 false,false
return false, false
}
// 缓冲区空,并且是阻塞模式,同时没有等待发送的 g
// 没有 sender,阻塞
gp := getg()
mysg := acquireSudog()
// ...
// c 的 recvq,也就是等待接收的队列,在队尾添加当前的 g
c.recvq.enqueue(mysg)
// ...
// g 挂起,等待下一个发送数据的协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// ... 被唤醒后的操作 ...
return true, success
}
// recv 处理缓冲区已满的 chan 的接收操作(或者无缓冲,这个函数处理这两种情况)。
// 有两部分:
// 1. 等待发送数据的协程(sender),会将其要发送的数据放入 chan 中,然后这个协程会被唤醒
// 2. 被接收协程接收的值会写入到 ep 中
//
// 对于同步 chan(无缓冲 chan),两个值是同一个。
// 对于异步 chan,接收者从 chan 的缓冲区获取数据,发送方的输入放入 chan 缓冲区。
// 通道 c 必须已满并锁定。recv 会使用 unlockf 来解锁 c。
// sg 必须已经从 c 中移除(准确来说是 c.sendq)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果无缓冲区
if c.dataqsiz == 0 {
// ...
// 直接将 sender 的要发送的值复制到 ep
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲区,但是缓冲区满了。
// 从队列头获取元素,将要发送的值放入队列尾。(实际上操作的是同一个位置,因为环形队列满了)
// 需要获取的值的指针地址
qp := chanbuf(c, c.recvx)
// ...
// 如果需要接收值,则将 qp 复制到 ep(没有忽略返回值)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将要发送的值写入到 qp(sendq 对头元素要发送的值写入到 qp,也就是 chan 刚刚空出来的位置)
typedmemmove(c.elemtype, qp, sg.elem)
// 队列头、尾指针移动
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
// ...
// 唤醒协程(这个被唤醒的协程是之前因为发送不出去被阻塞的协程)
goready(gp, skip+1)
}
// 将数据直接从 sender 复制到 receiver
// 场景:发送到无缓冲的 chan
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// dst 是 receiver 栈里保存接收值的地址,src 是 sender 栈里要被发送的值的地址
memmove(dst, src, t.size)
}
关闭chan
chan 关闭的过程比较简单,修改 closed 为 1,然后唤醒发送队列和接收队列里面的 g,如果发送队列有 g,被唤醒之后会 panic,因为不能往一个已经关闭的 chan 发送数据。
// 关闭 chan
func closechan(c *hchan) {
// 不能关闭 nil chan
if c == nil {
panic(plainError("close of nil channel"))
}
// 开启锁
lock(&c.lock)
if c.closed != 0 {
// chan 已经关闭,panic,不能重复关闭。释放锁
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// ...
// 设置 closed 标志
c.closed = 1
// gList 用来保存阻塞在 chan 上的 g(链表,包括了 sender 和 receiver)
var glist gList
// 释放所有等待读取 chan 的协程(解除阻塞状态)
for {
// recvq 队头元素出队
sg := c.recvq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// 关闭之后,从 chan 接收到的都是零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// ...
glist.push(gp)
}
// 释放所有正在等待写入 chan 的协程(解除阻塞状态,这些协程会 panic)
for {
// sendq 队头元素出队
sg := c.sendq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// ...
glist.push(gp)
}
// 释放锁
unlock(&c.lock)
// 将所有等待的协程修改为就绪态
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// g 状态修改为可运行状态
goready(gp, 3)
}
}
对于实际开发的作用
在上一篇文章和本文中,花了很大的篇幅来讲述 chan 的设计、实现与使用,这么多东西对我们有什么用呢?
其中非常重要的一个作用是,清楚地了解 chan 的工作机制,便于我们对程序实际运行情况进行分析,
尤其是一些非常隐晦的读写 chan 场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。
比如下面的这种代码:
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second)
go func() {
// 永远阻塞,协程泄漏
var ch chan int
ch <- 1
}()
// 我们会看到协程数量逐渐增长。
// 但是这部分挂起的协程永远不会被调度。
fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine())
}
time.Sleep(time.Hour)
}
tips:在 chan 读写的地方需要注意自己的写法会不会让 goroutine 永远陷入阻塞,或者长时间阻塞。
总结
chan 底层是 hchan 结构体。
go 语法里面的 <- 不过是语法糖,在编译的时候,会编译成 hchan 相关的方法调用。最终都会调用 chansend 或者 chanrecv。select…case 里面的 chan 读写最终也会编译为对 chansend 或 chanrecv 的调用。
chan 总体设计:维护了三个队列:
- hchan.buf: chan 中暂存 sender 发送数据的队列(在有 receiver 读取的时候会从这个队列中复制到 receiver 中)
- hchan.recvq: 接收队列,存储那些尝试读取 channel 但被阻塞的 goroutine。
- hchan.sendq: 发送队列,存储那些尝试写入 channel 但被阻塞的 goroutine。
读写 chan 的协程阻塞是通过 gopark 实现的,而从阻塞态转换为可运行状态是通过 goready 实现的。
在 chan 读写操作阻塞的时候,如果是在 select 语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用 gopark 挂起当前协程。
在关闭 chan 的时候,会调用 goready 唤醒阻塞在发送或者接收操作上的 g(协程)。
无缓冲 chan 的操作有点特殊,对于无缓冲 chan,必须同时有 sender 和 receiver 才能发送和接收成功,否则另一边都会陷入阻塞(当然,select 不会阻塞)。