广告
返回顶部
首页 > 资讯 > 精选 >Go调度器学习之goroutine调度怎么创建
  • 352
分享到

Go调度器学习之goroutine调度怎么创建

2023-07-05 18:07:15 352人浏览 八月长安
摘要

今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。1. 协程

今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

    1. 协程调度发生的时机

    在以下情形中,goroutine可能会发生调度:

    情形说明
    go func(){}使用go关键字创建一个新的goroutine,调度器会考虑调度
    GC由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度
    系统调用发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来
    同步内存访问mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行

    2. 创建协程时的调度

    其中,使用go关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。

    func newproc(fn *funcval) {   gp := getg()   pc := getcallerpc()   systemstack(func() {      newg := newproc1(fn, gp, pc)      _p_ := getg().m.p.ptr()      runqput(_p_, newg, true)      if mainStarted {         wakep()      }   })}

    我们还记得,go关键字在创建协程时,Go的编译器会将其转换为runtime.newproc函数,上篇我们详细分析了main goroutine的创建过程,在runtime.main函数中,全局变量mainStarted会被置为true,之后普通协程的创建,则会调用runtime.wakep函数尝试唤醒空闲的P。

    func wakep() {   if atomic.Load(&sched.npidle) == 0 {      return   }   // be conservative about spinning threads   if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {      return   }   startm(nil, true)}

    wakep函数首先确认是否有其他线程正在处于spinning状态,即M是否在找工作,如果没有的话,则调用startm函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。

    func startm(_p_ *p, spinning bool) {   // Disable preemption.   //   // Every owned P must have an owner that will eventually stop it in the   // event of a GC stop request. startm takes transient ownership of a P   // (either from argument or pidleget below) and transfers ownership to   // a started M, which will be responsible for perfORMing the stop.   //   // Preemption must be disabled during this transient ownership,   // otherwise the P this is running on may enter GC stop while still   // holding the transient P, leaving that P in limbo and deadlocking the   // STW.   //   // Callers passing a non-nil P must already be in non-preemptible   // context, otherwise such preemption could occur on function entry to   // startm. Callers passing a nil P may be preemptible, so we must   // disable preemption before acquiring a P from pidleget below.   mp := acquirem()  // 保证在此期间不会发生栈扩展   lock(&sched.lock)   if _p_ == nil {   // 没有指定p,那么需要从空闲队列中取一个p      _p_ = pidleget()      if _p_ == nil {// 如果没有空闲的p,直接返回         unlock(&sched.lock)         if spinning {            // The caller incremented nmspinning, but there are no idle Ps,            // so it's okay to just undo the increment and give up.            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {               throw("startm: negative nmspinning")            }         }         releasem(mp)         return      }   }   nmp := mget()  // 如果有空闲的p,那么取出一个空闲的m   if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回      // No M is available, we must drop sched.lock and call newm.      // However, we already own a P to assign to the M.      //      // Once sched.lock is released, another G (e.g., in a syscall),      // could find no idle P while checkdead finds a runnable G but      // no running M's because this new M hasn't started yet, thus      // throwing in an apparent deadlock.      //      // Avoid this situation by pre-allocating the ID for the new M,      // thus marking it as 'running' before we drop sched.lock. This      // new M will eventually run the scheduler to execute any      // queued G's.      id := mReserveID()      unlock(&sched.lock)      var fn func()      if spinning {         // The caller incremented nmspinning, so set m.spinning in the new M.         fn = mspinning      }      newm(fn, _p_, id)      // Ownership transfer of _p_ committed by start in newm.      // Preemption is now safe.      releasem(mp)      return   }   unlock(&sched.lock)   if nmp.spinning {      throw("startm: m is spinning")   }   if nmp.nextp != 0 {      throw("startm: m has p")   }   if spinning && !runqempty(_p_) {      throw("startm: p has runnable gs")   }   // The caller incremented nmspinning, so set m.spinning in the new M.   nmp.spinning = spinning   nmp.nextp.set(_p_)   notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m   // Ownership transfer of _p_ committed by wakeup. Preemption is now   // safe.   releasem(mp)}

    startm函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,wakep函数就是为了更大程度的利用P,利用CPU资源。

    说到这里,我们就需要重温一下上篇博客讲到的,调度中获取goroutine的规则是:

    • 每调度61次就需要从全局队列中获取goroutine

    • 其次优先从本P所在队列中获取goroutine

    • 如果还没有获取到,则从其他P的运行队列中窃取goroutine

    其中,从其他P队列中窃取goroutine,调用的是findrunnable函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:

    func findrunnable() (gp *g, inheritTime bool) {   _g_ := getg()top:   _p_ := _g_.m.p.ptr()   ...   // local runq   // 再从本地队列找找   if gp, inheritTime := runqget(_p_); gp != nil {      return gp, inheritTime   }   // global runq   // 再看看全局队列   if sched.runqsize != 0 {      lock(&sched.lock)      gp := globrunqget(_p_, 0)      unlock(&sched.lock)      if gp != nil {         return gp, false      }   }   ...   // Spinning Ms: steal work from other Ps.   //   // Limit the number of spinning Ms to half the number of busy Ps.   // This is necessary to prevent excessive CPU consumption when   // GOMAXPROCS>>1 but the program parallelism is low.   procs := uint32(gomaxprocs)   if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {      if !_g_.m.spinning {         _g_.m.spinning = true         atomic.Xadd(&sched.nmspinning, 1)      }      gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine      now = tnow      if gp != nil {         // Successfully stole.         return gp, inheritTime      }      if newWork {         // There may be new timer or GC work; restart to         // discover.         goto top      }      if w != 0 && (pollUntil == 0 || w < pollUntil) {         // Earlier timer to wait for.         pollUntil = w      }   }   ...   // return P and block   // 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠   lock(&sched.lock)   if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {      unlock(&sched.lock)      goto top   }   if sched.runqsize != 0 {      gp := globrunqget(_p_, 0)      unlock(&sched.lock)      return gp, false   }   if releasep() != _p_ {      throw("findrunnable: wrong p")   }   pidleput(_p_)   unlock(&sched.lock)   ...      _g_.m.spinning = false // m即将睡眠,状态不再是spinning      if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {         throw("findrunnable: negative nmspinning")      }   ...   stopm() // 休眠   goto top}

    从上面的代码可以看出,工作线程会反复尝试寻找运行的goroutine,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine时的状态称之为自旋(spinning)状态,而前面讲到wakep调用startm函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。

    窃取算法stealWork我们就不分析了,有兴趣的同学可以看看。下面具体分析下stopm是怎么实现线程睡眠的。

    func stopm() {   _g_ := getg()   if _g_.m.locks != 0 {      throw("stopm holding locks")   }   if _g_.m.p != 0 {      throw("stopm holding p")   }   if _g_.m.spinning {      throw("stopm spinning")   }   lock(&sched.lock)   mput(_g_.m)         // 把m放到sched.midle空闲队列   unlock(&sched.lock)   mPark()   acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作   _g_.m.nextp = 0}func mPark() {   gp := getg()   notesleep(&gp.m.park) // 进入睡眠状态   noteclear(&gp.m.park)}

    可以看出,stopm主要是将m对象放到调度器的空闲线程队列,然后通过notesleep进入睡眠状态。notego runtime实现的一次性睡眠和唤醒机制,通过notesleep进入睡眠状态,然后另一个线程可以通过notewakeup唤醒这个线程。

    小结

    上面巴拉巴拉讲了那么多,看的人有点头晕,我们接下来讲一个很小的例子梳理一下以上的逻辑(主线程的创建和执行在上一篇博客中详细叙述过,这里不再赘述),主线程创建了一个goroutine,这时候会触发wakep,接下来可能会唤醒空闲的工作线程(如果是第一个非main goroutine,就没有空闲的工作线程),或者创建一个新的工作线程,或者什么都不做。

    如果是创建一个新的工作线程,那么其开启执行的点也是mstart函数(注意区分mstartstartm),然后在schedule函数中会尝试去获取goroutine,如果全局和本地的goroutine队列都没有,则会去其他的P上窃取goroutine,如果窃取不成功,则会休眠。

    如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。

    窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。

    Go调度器学习之goroutine调度怎么创建

    3. 主动挂起

    Go中,有很多种情形会导致goroutine阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如channel的读写,我们以通道的阻塞读为例,来介绍goroutine的主动挂起的调度方式。

    3.1 协程挂起

    和前面介绍的Map一样,channel的读也有以下两种读取方式:

    v := <- chv, ok := <- ch

    分别对应以下chanrecv1chanrecv2函数:

    //go:nosplitfunc chanrecv1(c *hchan, elem unsafe.Pointer) {   chanrecv(c, elem, true)}//go:nosplitfunc chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {   _, received = chanrecv(c, elem, true)   return}

    无论是哪个函数,最终调用的都是chanrecv函数:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {   ...      c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中      atomic.Store8(&gp.parkingOnChan, 1)   // 挂起这个goroutine   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)   ...}

    chanrecv会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个goroutine放到channelrecvqueue中,然后调用gopark函数将当前goroutine挂起并阻塞。

    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {   if reason != waitReasonSleep {      checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy   }   mp := acquirem()   gp := mp.curg   status := readgstatus(gp)   if status != _Grunning && status != _Gscanrunning {      throw("gopark: bad g status")   }   mp.waitlock = lock   mp.waitunlockf = unlockf   gp.waitreason = reason   mp.waittraceev = traceEv   mp.waittraceskip = traceskip   releasem(mp)   // can't do anything that might move the G between Ms here.   mcall(park_m)}

    gopark函数则使用mcall函数(前面分析过,主要作用是保存当前goroutine现场,然后切换到g0栈去调用作为参数传入的函数)取执行park_m函数:

    // park continuation on g0.func park_m(gp *g) {   _g_ := getg()   if trace.enabled {      traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)   }   casgstatus(gp, _Grunning, _Gwaiting)   dropg()   if fn := _g_.m.waitunlockf; fn != nil {      ok := fn(gp, _g_.m.waitlock)      _g_.m.waitunlockf = nil      _g_.m.waitlock = nil      if !ok {         if trace.enabled {            traceGoUnpark(gp, 2)         }         casgstatus(gp, _Gwaiting, _Grunnable)         execute(gp, true) // Schedule it back, never returns.      }   }   schedule()}

    park_m首先把当前goroutine的状态设置为_Gwaiting(因为它正在等待其它goroutinechannel里面写数据),然后调用dropg函数解除gm之间的关系,最后通过调用schedule函数进入调度循环。

    至此,一个goroutine就被主动挂起了。

    3.2 协程唤醒

    我们继续以上例子,当另一个goroutine对这个channel发送数据的时候

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {   ...   if sg := c.recvq.dequeue(); sg != nil {      // Found a waiting receiver. We pass the value we want to send      // directly to the receiver, bypassing the channel buffer (if any).      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true   }   ...}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {   ...   goready(gp, skip+1)}

    channel的发送流程和读取类似,当检查到接收队列中有等待着时,会调用send函数然后调用goready唤醒协程:

    func goready(gp *g, traceskip int) {   systemstack(func() {      ready(gp, traceskip, true)   })}func ready(gp *g, traceskip int, next bool) {   if trace.enabled {      traceGoUnpark(gp, traceskip)   }   status := readgstatus(gp)   // Mark runnable.   _g_ := getg()   mp := acquirem() // disable preemption because it can be holding p in a local var   if status&^_Gscan != _Gwaiting {      dumpgstatus(gp)      throw("bad g->status in ready")   }   // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq   casgstatus(gp, _Gwaiting, _Grunnable)   runqput(_g_.m.p.ptr(), gp, next)   wakep()   releasem(mp)}

    这里发现,ready函数和创建协程时一样,会触发wakep来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的goroutine会放到P的本地队列的下一个执行goroutine,以提升时效性。

    到这里,一个被挂起的协程也就被唤醒了。

    以上就是“Go调度器学习之goroutine调度怎么创建”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网精选频道。

    --结束END--

    本文标题: Go调度器学习之goroutine调度怎么创建

    本文链接: https://www.lsjlt.com/news/354512.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    本篇文章演示代码以及资料文档资料下载

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档
    猜你喜欢
    • Go调度器学习之goroutine调度怎么创建
      今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。1. 协程...
      99+
      2023-07-05
    • Go调度器学习之系统调用详解
      目录0. 简介1. 系统调用1.1 场景1.2 陷入系统调用1.3 从系统调用恢复2. 小结0. 简介 上篇博客,我们分析了Go调度器中的抢占策略,这篇,我们将分析一下,在系统调用时...
      99+
      2023-05-14
      Go调度器 系统调用 Go 系统调用 Go调度器
    • Go调度器学习之系统调用的方法是什么
      本篇内容主要讲解“Go调度器学习之系统调用的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Go调度器学习之系统调用的方法是什么”吧!1. 系统调用下面,我们将以一个简单的文件打开的系统...
      99+
      2023-07-05
    • Go调度器学习之协作与抢占详解
      目录0. 简介1. 用户主动让出CPU:runtime.Gosched函数2. 基于协作的抢占式调度2.1 场景2.2 栈扩张与抢占标记2.3 栈扩张怎么触发重新调度2.4 何时设置...
      99+
      2023-05-14
      Go调度器协作 抢占 Go调度器协作 Go调度器抢占 Go调度器
    • Golang并发编程之main goroutine的创建与调度的方法是什么
      今天小编给大家分享一下Golang并发编程之main goroutine的创建与调度的方法是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获...
      99+
      2023-07-05
    • springboot怎么动态创建任务调度
      在Spring Boot中动态创建任务调度可以使用Spring的TaskScheduler接口来实现。TaskScheduler接口...
      99+
      2023-10-20
      springboot
    • boost.asio框架系列之调度器io_service怎么使用
      本篇内容介绍了“boost.asio框架系列之调度器io_service怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!IO模型io_...
      99+
      2023-07-02
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作