0%

Golang的调度模型

Go有四大核心模块,基本全部体现在runtime,有调度系统、GC、goroutine、channel,那么深入理解其中的精髓可以帮助我们理解Go这一门语言!

1、Go调度模型发展历史

  • 单线程调度器 (0.x 版本)
    • 只包含 40 多行代码;
    • 程序中只能存在一个活跃线程,由 G-M 模型组成;
  • 多线程调度器 ·(1.0版本)
    • 允许运行多线程的程序;
    • 全局锁导致竞争严重;
  • 任务窃取调度器 · (1.1版本)
    • 引入了处理器 P,构成了目前的 G-M-P 模型;
    • 在处理器 P 的基础上实现了基于工作窃取的调度器;
    • 在某些情况下,Goroutine 不会让出线程,进而造成饥饿问题;(单个p+空转)
    • 时间过长的垃圾回收(Stop-the-world,STW)会导致程序长时间无法工作;
  • 抢占式调度器 · (1.2版本~ 至今)
    • 基于协作的抢占式调度器 - 1.2 ~ 1.13
      • 通过编译器在函数调用时插入抢占检查指令,在函数调用时检查当前 Goroutine 是否发起了抢占请求,实现基于协作的抢占式调度;
      • Goroutine 可能会因为垃圾回收和循环长时间占用资源导致程序暂停;
    • 基于信号的抢占式调度器 - 1.14 ~ 至今
      • 实现基于信号的真抢占式调度
      • 垃圾回收在扫描栈时会触发抢占调度;
      • 抢占的时间点不够多,还不能覆盖全部的边缘情况;
  • 非均匀存储访问调度器 · 提案
    • 对运行时的各种资源进行分区;
    • 实现非常复杂,到今天还没有提上日程;

上面说到的1.3版本以前历史,其实都是go的非发行版本,所以我们关注与的是go的发行版本,也就是go的gpm模型!

  • G: Goroutine,即我们在 Go 程序中使用 go 关键字创建的执行体;
  • M: Machine,或 worker thread,即传统意义上进程的线程;
  • P: Processor,即一种人为抽象的、用于执行 Go 代码被要求局部资源。只有当 M 与一个 P 关联后才能执行 Go 代码。除非 M 发生阻塞或在进行系统调用时间过长时,没有与之关联的 P。

参考: 调度系统设计精要

2、GM模型

​ P的作用不光光是队列这种抽象,如果理解为队列,那么它的地位只是M的一个子集,GM模型,我们核心关注的是G和M,这也是一般线程池的模型!目标就是高效的调度G在M上!

下面是我用Go语言简单写的一个调度器,大家可以看看设计思路,以及存在的问题!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"fmt"
"go.uber.org/atomic"
"os"
"os/signal"
"time"
)

func main() {

sig := make(chan os.Signal, 0) // 监听程序的信号
signal.Notify(sig, os.Interrupt, os.Kill)
down := make(chan struct{}, 0) // 程序down机信号

threadNum := 2 // 运行的线程
taskQueue := make(chan func(), 1<<20) // 任务队列大小限制

addTask := func(foo func()) {
select {
case <-down:
case taskQueue <- foo:
}
}

schedule := func() {
for x := 0; x < threadNum; x++ {
go func() {
for {
select {
case <-down:
return
case foo := <-taskQueue:
foo()
}
}
}()
}
}

schedule() // 启动调度器

count := &atomic.Int64{}
for x := 0; x < 1; x++ {
addTask(func() { // 添加一个任务,循环的塞入任务
for {
addTask(func() {
count.Add(1)
})
}
})
}

// 等待程序结束
select {
case <-sig:
close(down)
fmt.Println("程序退出, exec ctrl+c")
case <-time.After(time.Second):
close(down)
fmt.Printf("调用量: %v\n", count.Load())
return
}
}

1、现象

1、测试条件,调度器只启动两个线程,然后一个线程主要是负责循环的添加任务,一个线程循环的去执行任务

1
2
3
4
➜  go-tool git:(master) ✗ bin/app                         
调用量: 5078714
➜ go-tool git:(master) ✗ bin/app
调用量: 5043506

2、测试条件,调度器启动三个线程,然后两个线程去执行任务,一个添加任务

1
2
3
4
➜  go-tool git:(master) ✗ bin/app                         
调用量: 4333959
➜ go-tool git:(master) ✗ bin/app
调用量: 4359804

3、继续测试,启动十个线程,一个添加任务,九个执行任务

1
2
3
4
➜  go-tool git:(master) ✗ bin/app                         
调用量: 1663691
➜ go-tool git:(master) ✗ bin/app
调用量: 1692096

4、我们添加一些阻塞的任务

1
2
3
4
addTask(func() {
count.Inc()
time.Sleep(time.Second * 2)
})

执行可以看到完全不可用

1
2
➜  go-tool git:(master) ✗ bin/app                            
调用量: 9

2、问题

​ 1、 可以看到随着M的不断的增加,可以发现执行任务的数量也不断的减少,原因是什么呢?有兴趣的同学可以加一个pprof可以看看,其实大量的在等待锁的过程!

​ 2、如果我的M运行了类似于Sleep操作的方法如何解决了,我的调度器还能支撑这个量级的调度吗?

关于pprof如何使用:在代码头部加一个这个代码:

1
2
3
4
5
6
7
8
file, err := os.OpenFile("/Users/fanhaodong/go/code/go-tool/main/prof.pporf", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
if err := pprof.StartCPUProfile(file); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()

我们查看一下 go tool pprof main/prof.pporf

1
2
3
4
5
6
7
8
9
10
11
12
Showing top 10 nodes out of 36
flat flat% sum% cum cum%
2.45s 63.80% 63.80% 2.45s 63.80% runtime.usleep
0.40s 10.42% 74.22% 0.40s 10.42% runtime.pthread_cond_wait
0.28s 7.29% 81.51% 0.28s 7.29% runtime.(*waitq).dequeueSudoG
0.25s 6.51% 88.02% 0.25s 6.51% runtime.pthread_cond_signal
0.17s 4.43% 92.45% 0.94s 24.48% main.main.func2.1
0.10s 2.60% 95.05% 0.10s 2.60% runtime.procyield
0.07s 1.82% 96.88% 0.07s 1.82% runtime.(*waitq).dequeue
0.03s 0.78% 97.66% 0.03s 0.78% runtime.madvise
0.02s 0.52% 98.18% 0.99s 25.78% main.main.func3
0.02s 0.52% 98.70% 1.72s 44.79% runtime.selectgo

可以看到真正执行代码的时间只有 0.17s + 0.02s 其他时间都被阻塞掉了!

3、GPM模型

1、GM模型问题

1、GM模型中的所有G都是放入到一个queue,那么导致所有的M取执行任务时都会去竞争锁,我们插入G也会去竞争锁,所以解决这种问题一般就是减少对单一资源的竞争,那就是桶化,其实就是每个线程都分配一个队列

2、GM模型中没有任务状态,只有runnable,假如任务遇到阻塞,完全可以把任务挂起再唤醒

  • 运行队列去存放所有的可以运行的任务,runnable
  • 所有线程执行运行中的任务,running
  • 运行中的任务被阻塞的任务,需要放弃运行权利,挂起到等待队列,blocking

2、GPM如何优化GM的(核心)

1、引入P

​ 这里其实会遇到一个问题,假如要分配很多个线程,那么此时随着线程的增加,也会造成队列的增加,其实也会造成调度器的压力,因为它需要遍历全部线程的队列去分配任务以及后续会讲到的窃取任务!

​ 因为我们知道CPU的最大并行度其实取决于CPU的核数,也就是我们没必要为每个线程都去分配一个队列,因为就算是给他们分配了,他们自己去那执行调度,其实也会出现大量阻塞,原因就是CPU调度不过来这些线程!

​ Go里面是只分配了CPU个数的队列,这里就是P这个概念,你可以理解为P其实是真正的资源分配器,M很轻只是执行程序,所有的资源内存都维护在P上!M只有绑定P才能执行任务(强制的)!

img{512x368}

这样做的好处:

  • 如果线程很轻,那么销毁和创建会变得很简单,也就是后面会讲到的内核阻塞调用创建线程
  • 如果全部资源分配在固定数量的P上,那么可以充分利用CPU并行度

2、GPM调度器

1、首先调度程序其实就是调度不同状态的任务,go里面为Go标记了不同的状态,其实大概就是分为:runnable,running,block等,所以如何充分调度不同状态的G成了问题,那么关于阻塞的G如何解决,其实可以很好的解决G调度的问题!

  • channel上发送和接收
  • 网络I/O操作
  • 阻塞的系统调用
  • 使用定时器或者Sleep
  • 使用互斥锁 sync.Mutex

上面这些情况其实就分为:

  • 用户态阻塞
  • 内核态阻塞但是不会挂起当前线程
  • 内核态阻塞会挂起当前线程

2、用户态阻塞,一般Go里面依靠gopark 函数去实现,大体的代码逻辑基本上和go的调度绑定死了

源码在:https://golang.org/src/runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule() // 调度程序
}
  • 他会标记当前的g的状态从 running -> waiting 状态
  • 然后开始执行调度: schedule() 方法
    • 首先就是看看有没有其他特殊情况:比如GC或者trace
    • 其次就是可能先去获取全局队列的(代码里写的偶尔,根据随机性去执行的)
    • 获取当前g绑定的p队列里获取g
    • 从其他地方获取可以运行的g (优先级是:从本地队列->全局队列->网络轮巡器->窃取其他P的G,具体可以看 findrunnable 方法)
    • 最后执行那个唤醒的G
  • 最后就是unpark,就是将当前goroutine置于等待状态并解锁锁。 可以通过调用goready方法使goroutine再次运行。

3、其实对于netpool 这种nio模型,其实内核调用是非阻塞的,所以go开辟了一个网络轮训器队列,来存放这些被阻塞的g,等待内核被唤醒!那么什么时候会被唤醒了,其实就是需要等待调度器去调度了!

1
2
3
4
5
6
7
8
9
10
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil { // 这里会等待读,其实就是挂起了当前g,也就是主动让出了m
continue
}
}
// 。。。。。。。
}

4、如果是内核态阻塞了(内核态阻塞一般都会将线程挂起,线程需要等待被唤醒),我们此时P只能放弃此线程的权利,然后再找一个新的线程去运行P!

关于着新线程:找有没有idle的线程,没有就会创建一个新的线程!

关于当内核被唤醒后的操作:因为GPM模型所以需要找到个P绑定,所以G会去尝试找一个可用的P,如果没有可用的P,G会标记为runnable放到全局队列中!

​ 关于内核唤醒后G如何执行的代码我没有找到,不好意思,逻辑没有看太清晰!其实疑问点:如何找到可用的P,所以固定数量的P的好处就是查询时间比较可控!为何不找到上一次绑定的P呢?为何切换上下文了!

5、其实了解上面大致其实就了解了Go的基本调度模型

  • G运行的优先级是啥
  • P和M啥时候会接触绑定
  • P啥时候会窃取G

答案文章里慢慢品味!

3、如何防止G长时间占用P

如果某个 G 执行时间过长,其他的 G 如何才能被正常的调度? 这便涉及到有关调度的两个理念:协作式调度与抢占式调度。协作式和抢占式这两个理念解释起来很简单: 协作式调度依靠被调度方主动弃权;抢占式调度则依靠调度器强制将被调度方被动中断。

1、空转代码

例如下面的代码,我本地的版本是go1.13.5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"runtime"
)

func main() {
go func() {
for {
{
}
}
}()
runtime.Gosched() // 表示主线程让出当前线程,可以理解为先让go执行
fmt.Println("close")
}

2、存在的问题

执行: GOMAXPROCS=1 配置全局只能有一个P

1
2
3
4
5
6
7
go-tool git:(master) ✗ GODEBUG=schedtrace=1 GOMAXPROCS=1 bin/app
SCHED 0ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=0 [2]
SCHED 1ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 2ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 4ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 7ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 13ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]

可以看到main函数无法执行!也就是那个go 空转抢占了整个程序

备注:

  • GODEBUG=schedtrace=1 表示1ms打印一次
1
2
3
4
5
6
7
8
9
SCHED:调试信息输出标志字符串,代表本行是goroutine scheduler的输出;
1ms:即从程序启动到输出这行日志的时间;
gomaxprocs: P的数量;
idleprocs: 处于idle状态的P的数量;通过gomaxprocs和idleprocs的差值,我们就可知道执行go代码的P的数量;
threads: os threads的数量,包含scheduler使用的m数量,加上runtime自用的类似sysmon这样的thread的数量;
spinningthreads: 处于自旋状态的os thread数量;
idlethread: 处于idle状态的os thread的数量;
runqueue=1go scheduler全局队列中G的数量;
[3 4 0 10]: 分别为4个P的local queue中的G的数量。

3、升级1.14+版本解决

但是假如我换为用 1.14+版本执行,有兴趣的话可以使用我的docker镜像,直接可以拉取: fanhaodong/golang:1.15.11fanhaodong/golang:1.13.5

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@647af84b3319 code]# GODEBUG=schedtrace=1 GOMAXPROCS=1 bin/app
SCHED 0ms: gomaxprocs=1 idleprocs=0 threads=2 spinningthreads=0 idlethreads=0 runqueue=0 [0]
SCHED 1ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=0 [0]
SCHED 2ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=0 [0]
SCHED 3ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 4ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 5ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 6ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 7ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 8ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 10ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
SCHED 13ms: gomaxprocs=1 idleprocs=0 threads=3 spinningthreads=0 idlethreads=1 runqueue=1 [1]
close

4、关于G上内存分配的问题

首先我们知道G/M/P,G可能和M也可能和P解除绑定,那么关于数据变量放在哪哇!其实这个就是逃逸分析!

1、什么情况下会逃逸

1
2
3
4
5
6
7
8
9
10
11
12
13
package main
type Demo struct {
Main string
}
func main() {
go func() {
demo := Demo{}
go func() {
test(demo)
}()
}()
}
func test(demo Demo) {}

输出可以看到其实没有发生逃逸,那是因为 demo被拷贝它自己的栈空间内

1
2
3
4
5
[root@647af84b3319 code]#  go build -gcflags "-N -l -m" -o bin/app deno/main2.go
# command-line-arguments
deno/main2.go:13:11: demo does not escape
deno/main2.go:6:5: func literal escapes to heap
deno/main2.go:8:6: func literal escapes to heap

备注:

-gcflags "-N -l -m" 其中 -N 禁用优化 -l禁止内联优化,-m打印逃逸信息

那么继续改成这个

1
2
3
4
5
6
7
8
9
10
11
12
13
package main
type Demo struct {
Main string
}
func main() {
go func() {
demo := Demo{}
go func() {
test(&demo)
}()
}()
}
func test(demo *Demo) {}

可以看到发现 demo对象其实被逃逸到了堆上!这就是不会出现类似于G如果被别的M执行,其实不会出现内存分配位置的问题!

1
2
3
4
5
6
[root@647af84b3319 code]#  go build -gcflags "-N -l -m" -o bin/app deno/main2.go
# command-line-arguments
deno/main2.go:13:11: demo does not escape
deno/main2.go:7:3: moved to heap: demo
deno/main2.go:6:5: func literal escapes to heap
deno/main2.go:8:6: func literal escapes to heap

2、for循环中申明g内存引用问题

所以可以看到demo其实是copy到了堆上!这就是g逃逸的问题,和for循环一样的

1
2
3
4
5
6
7
func main() {
for x := 0; x < 10; x++ {
go func() {
fmt.Println(x)
}()
}
}

执行可以发现,其实x已经逃逸到了堆上,所以你所有的g都引用的一个对象,如何解决了

1
2
3
4
5
6
7
go-tool git:(master) ✗ go build -gcflags "-l -m" -o bin/app deno/main2.go
# command-line-arguments
deno/main2.go:10:6: moved to heap: x
deno/main2.go:11:6: func literal escapes to heap
deno/main2.go:12:15: main.func1 ... argument does not escape
deno/main2.go:12:15: x escapes to heap
deno/main2.go:16:11: test demo does not escape

如何解决了,其实很简单

  • 直接copy一份数据
1
2
3
4
5
6
for x := 0; x < 10; x++ {
x := x // clone
go func() {
fmt.Println(x)
}()
}
  • 通过参数传递(推荐)
1
2
3
4
5
for x := 0; x < 10; x++ {
go func(x int) {
fmt.Println(x)
}(x)
}

参考文章

也谈goroutine调度器

图解Go运行时调度器

Go语言回顾:从Go 1.0到Go 1.13

Go语言原本

调度系统设计精要

Scalable Go Scheduler Design Doc

本人坚持原创技术分享,如果你觉得文章对您有用,请随意打赏! 如果有需要咨询的请发送到我的邮箱!