上篇我们讲了channel的基础操作和hchan的结构,这篇我们就channel的核心发送和接收流程的底层实现原理来展开了解,代码会很多,但是都是方便我们去理解里面流程,会对我们对理解原理有大的提升。
向 channel 中发送数据时,编译器在编译它时,实际调用的是src/runtime/chan.go中的chansend函数。
// ch <- 1 发送代码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
}
阻塞式发送在调用chansend函数,block=true,代码方式如下:
ch <- 10
// 编译后实际调用函数
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
阻塞式发送在调用chansend函数,block=true,通过select在其阻塞时直接返回,代码方式如下:
select {
case ch <- 10:
...
default
}
//select 编译后实际调用函数
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
通过上面我们知道channel发送的时候分为阻塞式和非阻塞式,但是最终都是调用的chansend函数,那么就一起理一理chansend的主要逻辑
1:在chan为nil 未初始化的情况下,对于select这种非阻塞的发送,直接返回 false;对于阻塞的发送,将 goroutine 挂起
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
2:非阻塞发送时,channel 不为 nil,channel 没有关闭时,并且缓冲区已满,返回false。full()函数是用来判断channel是否会发生阻塞(也就是channel容量是否满了)
if !block && c.closed == 0 && full(c) {
return false
}
func full(c *hchan) bool {
// 如果循环队列大小为0
if c.dataqsiz == 0 {
// c.datasiz == 0 没有接收者
return c.recvq.first == nil
}
// 队列满了
return c.qcount == c.dataqsiz
}
3:下面就开始真正数据的发送流程了当然也分几种情况,首先会对channel加锁,判断channel是否关闭状态(对已关闭的channel发送数据,会panic)。然后从接收等待队列中获取一个接受者sudog,且接受者存在,那么绕过缓冲buf,直接向接受者发送数据,此时的buf一定是空的。向接收者sg发送数据的时,会唤醒等待接收的goroutine。也就是调用goready() 将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable。具体的代码我们可以一步步追踪下去
{
...
// 对channel加锁, mutex
lock(&c.lock)
// 判断channel是否关闭
if c.closed != 0 {
//释放锁
unlock(&c.lock)
// paninc
panic(plainError("send on closed channel"))
}
// 从接收队列中获取一个接收者,且接受者不为空
if sg := c.recvq.dequeue(); sg != nil {
// 接收者存在,直接向该接收者发送数据,绕过channel缓冲
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
4:缓冲区未满,缓冲区未满的情况是c.qcount < c.datasiz (缓冲区队列数量小于容量),这种发送模式类似异步,需要同时也有接收者等待接收。chanbuf是获取channel的sendx索引元素的指针值 unsafe.Pointer,然后调用typedmemmove()函数进行转换
// 如果缓冲区未满
if c.qcount < c.dataqsiz {
// 找到要发送数据到循环队列buf的索引位置
qp := chanbuf(c, c.sendx)
......
// 数据拷贝到循环队列中
typedmemmove(c.elemtype, qp, ep)
// 将待发送数据索引加1
c.sendx++
if c.sendx == c.dataqsiz {
//因为循环队列,如果到了末尾,从0开始
c.sendx = 0
}
// chan中队列元素个数加1
c.qcount++
// 释放锁
unlock(&c.lock)
return true
}
5:缓冲区已满,对于select这种非阻塞发送会直接返回false,并且recvq没有接收者,此时将会将goroutine挂起,放到sendq中,等待被唤醒。当goroutine唤醒以后,解除阻塞的状态
// 缓冲区已满,对于select这种非阻塞调用直接返回false
if !block {
unlock(&c.lock)
return false
}
// 下面的逻辑是将当前goroutine挂起
// 调用 getg()方法获取当前goroutine的指针,用于绑定给一个 sudog
gp := getg()
// 调用 acquireSudog()方法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好sudog要发送的数据和状态。比如发送的 Channel、是否在 select 中和待发送数据的内存地址等等。
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 调用 c.sendq.enqueue 方法将配置好的 sudog 加入待发送的等待队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel接收者的激活
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 最后,KeepAlive() 确保发送的值保持活动状态,直到接收者将其复制出来
KeepAlive(ep)
是不是感觉整个流程还挺多的,那么结合流程图将加深我们的理解
从channel 中接收数据时,编译器在编译它时,实际调用的是src/runtime/chan.go中的chanrecv函数。selected, received都是 bool类型, selected表示是否能接收到值
// i <- ch
// i, ok <- ch
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
}
1:跟发送的时候一样,在chan为nil 未初始化的情况下,对于select这种非阻塞的发送,直接返回 false;对于阻塞的发送,将 goroutine 挂起
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
2:非阻塞接收,通过empty()判断是无缓冲chan或者是chan中没有数据
if !block && empty(c) {
...
}
3:接下来就是阻塞式接收,对chan加锁,判断chan如果已经关闭,并且chan中没有数据,返回 (true,false),这里的第一个true表示chan关闭后读取的 0 值
// 已关闭, 并且channel中没有数据, 此时返回零值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
4:发送队列不为空,从发送等待队列中获取一个发送者sudog,当发送者存在不为nil,会唤醒等待发送的goroutine。也就是调用goready() 将等待发送的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable。此时的缓冲区一定是满的,因为有等待的发送者。对应缓冲区满的情况,从队列的头部接收数据,发送者的值添加到队列的末尾
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
5:缓冲区有数据,直接从缓冲区接收数据,然后对recvx的下标进行逻辑处理,这里跟发送的时候很像
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
...
}
6:缓冲区没有数据,如果是select这种非阻塞读取的情况,直接返回(false, false),表示获取不到数据;否则,会获取sudog绑定当前接收者goroutine,调用gopark()挂起当前接收者goroutine,等待chan的其他发送者唤醒
// 如果是select非阻塞读取的情况,直接返回(false, false)
if !block {
unlock(&c.lock)
return false, false
}
// 获取当前 goroutine 的指针,用于绑定给一个 sudog
gp := getg()
mysg := acquireSudog()
...
//挂起goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
channel的关闭close(ch)底层是调用closechan()函数,整个回收流程分为两步:1:判断是否可以关闭, 将channel关闭 2:获取代接收、发送goroutine并唤醒到ready状态
func closechan(c *hchan) {
...
}
1:判断channel是否为nil,如果为nil会panic,然后判断channel是否已经被关闭了,我们知道关闭已经关闭的channel也是会panic的,这两个关于channel的知识点其实就在这里
// 如果c为nil,关闭它会panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 关闭已经关闭的chan会panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
2:关闭channel,将channel的closed的值设置为1,获取所有的recvq的接收者,sendq的写入者到glist,然后将glist中所有的goroutine都唤醒,关闭channel的流程结束
c.closed = 1
// 申明一个存放所有接收者和发送者goroutine的list
var glist gList
//获取recvq的接受者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
...
}
//获取sendq的发送者
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
...
}
// 唤醒glist的goroutine到Grunnable状态
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
Channel是在用有锁队列实现数据在不同协程之间传输数据,数据传输的方式其实就是值传递,使用时需要注意以下口诀
channel操作 | chan为nil | 关闭的chan | 非空、未关闭的chan |
---|---|---|---|
读 <- chan | 阻塞 | 里面的内容读完了,之后获取到的是类型的零值 | 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞 |
写 chan <- | 阻塞 | panic | 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞 |
关闭 close(chan) | panic | panic | 关闭成功 |
关于channel的使用和底层原理就总结完了,希望对大家对加深channel的理解提供实质性帮助!