channel、map、slice作为golang核心三剑客的使用golang程序猿作为主语言完成开发是非常重要的。了解其设计和源代码是使用的基础,因此作者将详细介绍和分析这三种数据结构的源代码…(算是大家的导演,加上自己的一点见解),如果有帮助,请表扬关注。
Go源码分析专栏
Go源码解析——Channel篇
Go源码分析——Map篇
Go源码分析——Slice篇
文章目录
- Go源码分析专栏
-
- Go源码解析——Channel篇
- Go源码分析——Map篇
- Go源码分析——Slice篇
- 1.hchan
- 2.make
-
- 2.1 合法性验证
- 2.2 地址空间空间
- 3.send
-
- 3.1 chansend函数
- 3.2 send函数
- 4.recv
-
- 4.1 chanrecv函数
- 4.2 recv函数
- 5.close
-
- 5.1 closechan函数
- 6.select
-
- 6.1 向channel中发送数据
- 6.2 从channel中接收数据
- 参考文档
- 扩展阅读
1.hchan
- channel底层数据结构是
- 是读操作堵塞 channel 的 goroutine 列表,sendq 是写作操作堵塞 channel 的 goroutine 列表(双向链表) ,FIFO ,使用双端队列FIFO入队方便)
- buf使用(环形缓存区)优点包括
- 适合固定长度队列
- 可提前分配的数组
- 允许高效的模式
- 所有缓存区的操作都是,由于不需要移动元素,它包括消耗元素
- 本质上是一个固定长度的数组,有头尾指针,实现参考Go数据结构和实现【Ring Buffer】 - 掘金
- 是等待goroutine数据包装,是的
部分源代码如下:
type hchan struct {
qcount uint // 队列中的数据数量 dataqsiz uint // buf 大小 buf unsafe.Pointer // 环形数组存储数据 elemsize uint16 // channel 中数据类型的大小 closed uint32 // 表示 channel 是否关闭 elemtype *_type // 元素数据类型 sendx uint // send 的数组索引 recvx uint // recv 的数组索引 recvq waitq // 由 recv 行为(即 <-ch)阻塞在 channel 上的 goroutine 队列 sendq waitq // 由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } type waitq struct {
first *sudog last *sudog } type sudog struct {
// The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g selectdone *uint32 // CAS to 1 to win select race (may poin to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
2.make
Go 语言中所有 Channel 的创建都会使用 make 关键字。编译器会将 make(chan int, 10) 表达式转换成 类型的节点,并在类型检查阶段将 OMAKE 类型的节点转换成 类型:
func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OMAKE:
...
switch t.Etype {
case TCHAN:
l = nil
if i < len(args) {
// 带缓冲区的异步 Channel
...
n.Left = l
} else {
// 不带缓冲区的同步 Channel
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
}
}
这一阶段会对传入 make 关键字的进行检查,如果我们不向 make 传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。 OMAKECHAN 类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成调用 runtime. 或者 runtime. 的函数:
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,因为这在 Channel 中并不常见,所以我们重点关注 :
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
runtime.makechan主要分为两个部分: 和
2.1 合法性验证
- 数据类型大小,大于1<<16时异常
- 内存对齐(降低寻址次数),大于最大的内存8字节数时异常
- 传入的size大小,大于堆可分配的最大内存时异常
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
2.2 分配地址空间
根据 channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和缓冲区
- 如果 channel 不存在,分配 hchan 结构体空间,即无缓存 channel
- 如果 channel 存储的类型不是类型,分配连续地址空间,包括
- 默认情况包括指针,为 hchan 和 buf 单独分配数据地址空间
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
更新 hchan 结构体的数据,包括 elemsize elemtype 和 dataqsiz
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
3.send
当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成 runtime.:
case OSEND:
n1 := n.Right
n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
n1 = walkexpr(n1, init)
n1 = nod(OADDR, n1, nil)
n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
runtime.chansend1 只是调用了 runtime. 并传入 Channel 和需要发送的数据runtime.chansend 是向 Channel 中发送数据时一定会调用的函数,该函数包含了发送数据的全部逻辑,如果我们在调用时将 参数设置成 ,那么表示当前发送操作是的:
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())//阻塞发送
}
- ch <- x时阻塞发送
- x := <- ch时阻塞接收
3.1 chansend函数
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
//向nil chan发送数据会发生阻塞
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)//休眠
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
//full为ture的两种情况1)无缓存通道,recvq为空2)缓存通道,但是buffer已满
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
//再次检查channel是否关闭,向已关闭的chan发送元素会引起panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
//取出第一个非空并且未被选择过的的sudog
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() {
unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
chansend函数主要可以归纳为三部分:
未初始化时为nil,向nil channel发送数据会阻塞 从nil channel读取数据同样会阻塞
if c == nil {
//向nil chan发送数据会发生阻塞
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)//休眠
throw("unreachable")
}
向已经关闭的channel发送数据会引起panic
if c.closed != 0 {
//再次检查channel是否关闭,向已关闭的chan发送元素会引起panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
如果有等待的接受者,也就是recvq队列中有waitq,通过send方法直接将数据发送给等待的接受者
if sg := c.recvq.dequeue(); sg != nil {
//取出第一个非空并且未被选择过的的sudog
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() {
unlock(&c.lock) }, 3)
return true
}
如果缓存区存在空余空间,写入buffer
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)//获取缓存区index地址
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)//数据写入buffer
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
缓存区已满或无缓存channel,
- 获取发送数据的 Goroutine
- 获取 sudog 结构
- 将创建并初始化的 sudog 加入,并设置到当前 Goroutine 的 上,表示 Goroutine 正在等待该 sudog 准备就绪
- gopark 将当前的 Goroutine
- 被调度器后会将一些属性置零并且释放 runtime.sudog 结构体
// Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. // 把 goroutine 相关的线索结构入队,等待条件满足的唤醒; atomic.Store8(&gp.parkingOnChan, 1) // goroutine 切走,让出 cpu 执行权限; gopark(chanparkcommit, unsafe.Pointer(&c.lock),