资讯详情

Go源码解析——Channel篇

channel、map、slice作为golang核心三剑客的使用golang程序猿作为主语言完成开发是非常重要的。了解其设计和源代码是使用的基础,因此作者将详细介绍和分析这三种数据结构的源代码…(算是大家的导演,加上自己的一点见解),如果有帮助,请表扬关注。

Go源码分析专栏

Go源码解析——Channel篇

Go源码分析——Map篇

Go源码分析——Slice篇

文章目录

  • Go源码分析专栏
    • Go源码解析——Channel篇
    • Go源码分析——Map篇
    • Go源码分析——Slice篇
  • 1.hchan
  • 2.make
    • 2.1 合法性验证
    • 2.2 地址空间空间
  • 3.send
    • 3.1 chansend函数
    • 3.2 send函数
  • 4.recv
    • 4.1 chanrecv函数
    • 4.2 recv函数
  • 5.close
    • 5.1 closechan函数
  • 6.select
    • 6.1 向channel中发送数据
    • 6.2 从channel中接收数据
  • 参考文档
  • 扩展阅读


1.hchan

  • channel底层数据结构是
  • 是读操作堵塞 channel 的 goroutine 列表,sendq 是写作操作堵塞 channel 的 goroutine 列表(双向链表) ,FIFO ,使用双端队列FIFO入队方便)
  • buf使用(环形缓存区)优点包括
  • 适合固定长度队列
  • 可提前分配的数组
  • 允许高效的模式
  • 所有缓存区的操作都是,由于不需要移动元素,它包括消耗元素
  • 本质上是一个固定长度的数组,有头尾指针,实现参考Go数据结构和实现【Ring Buffer】 - 掘金
  • 是等待goroutine数据包装,是的

部分源代码如下:

type hchan struct { 
             qcount   uint           // 队列中的数据数量     dataqsiz uint           // buf 大小     buf      unsafe.Pointer // 环形数组存储数据     elemsize uint16         // channel 中数据类型的大小     closed   uint32         // 表示 channel 是否关闭     elemtype *_type // 元素数据类型     sendx    uint   // send 的数组索引     recvx    uint   // recv 的数组索引     recvq    waitq  // 由 recv 行为(即 <-ch)阻塞在 channel 上的 goroutine 队列     sendq    waitq  // 由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列      // lock protects all fields in hchan, as well as several     // fields in sudogs blocked on this channel.     //     // Do not change another G's status while holding this lock     // (in particular, do not ready a G), as this can deadlock     // with stack shrinking.     lock mutex } type waitq struct { 
             first *sudog     last  *sudog } type sudog struct { 
             // The following fields are protected by the hchan.lock of the     // channel this sudog is blocking on. shrinkstack depends on     // this for sudogs involved in channel ops.      g          *g     selectdone *uint32 // CAS to 1 to win select race (may poin to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

2.make

Go 语言中所有 Channel 的创建都会使用 make 关键字。编译器会将 make(chan int, 10) 表达式转换成 类型的节点,并在类型检查阶段将 OMAKE 类型的节点转换成 类型:

func typecheck1(n *Node, top int) (res *Node) { 
        
        switch n.Op { 
        
        case OMAKE:
                ...
                switch t.Etype { 
        
                case TCHAN:
                        l = nil
                        if i < len(args) { 
         // 带缓冲区的异步 Channel
                                ...
                                n.Left = l
                        } else { 
         // 不带缓冲区的同步 Channel
                                n.Left = nodintconst(0)
                        }
                        n.Op = OMAKECHAN
                }
        }
}

这一阶段会对传入 make 关键字的进行检查,如果我们不向 make 传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。 OMAKECHAN 类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成调用 runtime. 或者 runtime. 的函数:

func walkexpr(n *Node, init *Nodes) *Node { 
        
        switch n.Op { 
        
        case OMAKECHAN:
                size := n.Left
                fnname := "makechan64"
                argtype := types.Types[TINT64]

                if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 { 
        
                        fnname = "makechan"
                        argtype = types.Types[TINT]
                }
                n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
        }
}

runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,因为这在 Channel 中并不常见,所以我们重点关注

func makechan(t *chantype, size int) *hchan { 
        
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 { 
        
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign { 
        
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 { 
        
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch { 
        
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan { 
        
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

runtime.makechan主要分为两个部分:

2.1 合法性验证

  • 数据类型大小,大于1<<16时异常​
  • 内存对齐(降低寻址次数),大于最大的内存8字节数时异常​
  • 传入的size大小,大于堆可分配的最大内存时异常
if elem.size >= 1<<16 { 
        ​
      throw("makechan: invalid channel element type")​
}​

if hchanSize%maxAlign != 0 || elem.align > maxAlign { 
        ​
      throw("makechan: bad alignment")​
}​
​
mem, overflow := math.MulUintptr(elem.size, uintptr(size))​
if overflow || mem > maxAlloc-hchanSize || size < 0 { 
        ​
      panic(plainError("makechan: size out of range"))​
}

2.2 分配地址空间

根据 channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和缓冲区

  • 如果 channel 不存在,分配 hchan 结构体空间,即无缓存 channel
  • 如果 channel 存储的类型不是类型,分配连续地址空间,包括
  • 默认情况包括指针,为 hchan 和 buf 单独分配数据地址空间
var c *hchan
switch { 
        
case mem == 0:
   // Queue or element size is zero.
   c = (*hchan)(mallocgc(hchanSize, nil, true))
   // Race detector uses this location for synchronization.
   c.buf = c.raceaddr()
case elem.ptrdata == 0:
   // Elements do not contain pointers.
   // Allocate hchan and buf in one call.
   c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
   c.buf = add(unsafe.Pointer(c), hchanSize)
default:
   // Elements contain pointers.
   c = new(hchan)
   c.buf = mallocgc(mem, elem, true)
}

更新 hchan 结构体的数据,包括 elemsize elemtype 和 dataqsiz

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

3.send

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成 runtime.

case OSEND:
   n1 := n.Right
   n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
   n1 = walkexpr(n1, init)
   n1 = nod(OADDR, n1, nil)
   n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)

runtime.chansend1 只是调用了 runtime. 并传入 Channel 和需要发送的数据runtime.chansend 是向 Channel 中发送数据时一定会调用的函数,该函数包含了发送数据的全部逻辑,如果我们在调用时将 参数设置成 ,那么表示当前发送操作是的:

func chansend1(c *hchan, elem unsafe.Pointer) { 
        
   chansend(c, elem, true, getcallerpc())//阻塞发送
}
  • ch <- x时阻塞发送
  • x := <- ch时阻塞接收

3.1 chansend函数

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { 
        
   if c == nil { 
        //向nil chan发送数据会发生阻塞
      if !block { 
        
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)//休眠
      throw("unreachable")
   }

   if debugChan { 
        
      print("chansend: chan=", c, "\n")
   }

   if raceenabled { 
        
      racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second full()).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation. However, nothing here
   // guarantees forward progress. We rely on the side effects of lock release in
   // chanrecv() and closechan() to update this thread's view of c.closed and full().
   if !block && c.closed == 0 && full(c) { 
        //full为ture的两种情况1)无缓存通道,recvq为空2)缓存通道,但是buffer已满
      return false
   }

   var t0 int64
   if blockprofilerate > 0 { 
        
      t0 = cputicks()
   }

   lock(&c.lock)

   if c.closed != 0 { 
        //再次检查channel是否关闭,向已关闭的chan发送元素会引起panic
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   if sg := c.recvq.dequeue(); sg != nil { 
        //取出第一个非空并且未被选择过的的sudog
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { 
         unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz { 
        
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled { 
        
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz { 
        
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   if !block { 
        
      unlock(&c.lock)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 { 
        
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting { 
        
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 { 
        
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   if closed { 
        
      if c.closed == 0 { 
        
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   return true
}

chansend函数主要可以归纳为三部分:​

未初始化时为nil,向nil channel发送数据会阻塞 从nil channel读取数据同样会阻塞

if c == nil { 
        //向nil chan发送数据会发生阻塞
   if !block { 
        
      return false
   }
   gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)//休眠
   throw("unreachable")
}

向已经关闭的channel发送数据会引起panic

 if c.closed != 0 { 
        //再次检查channel是否关闭,向已关闭的chan发送元素会引起panic
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

如果有等待的接受者,也就是recvq队列中有waitq,通过send方法直接将数据发送给等待的接受者

if sg := c.recvq.dequeue(); sg != nil { 
        //取出第一个非空并且未被选择过的的sudog​
      // Found a waiting receiver. We pass the value we want to send​
      // directly to the receiver, bypassing the channel buffer (if any).​
      send(c, sg, ep, func() { 
         unlock(&c.lock) }, 3)​
      return true​
   }

如果缓存区存在空余空间,写入buffer

if c.qcount < c.dataqsiz { 
        
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)//获取缓存区index地址
      if raceenabled { 
        
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)//数据写入buffer
      c.sendx++
      if c.sendx == c.dataqsiz { 
        
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

缓存区已满或无缓存channel,

  • 获取发送数据的 Goroutine
  • 获取 sudog 结构
  • 将创建并初始化的 sudog 加入,并设置到当前 Goroutine 的 上,表示 Goroutine 正在等待该 sudog 准备就绪
  • gopark 将当前的 Goroutine
  • 被调度器后会将一些属性置零并且释放 runtime.sudog 结构体
// Block on the channel. Some receiver will complete our operation for us.​
   gp := getg()​
   mysg := acquireSudog()​
   mysg.releasetime = 0​
   if t0 != 0 { 
        ​
      mysg.releasetime = -1​
   }​
   // No stack splits between assigning elem and enqueuing mysg​
   // on gp.waiting where copystack can find it.​
   mysg.elem = ep​
   mysg.waitlink = nil​
   mysg.g = gp​
   mysg.isSelect = false​
   mysg.c = c​
   gp.waiting = mysg​
   gp.param = nil​
   c.sendq.enqueue(mysg)​
   // Signal to anyone trying to shrink our stack that we're about​
   // to park on a channel. The window between when this G's status​
   // changes and when we set gp.activeStackChans is not safe for​
   // stack shrinking.​
   
   // 把 goroutine 相关的线索结构入队,等待条件满足的唤醒;
   atomic.Store8(&gp.parkingOnChan, 1)​
   // goroutine 切走,让出 cpu 执行权限;
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), 

标签: 方头静态扭矩传感器pt124b

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台