Scheduling In Go : Part III – Concurrency
介绍 Introduction
当我在解决一个问题时,特别是当它是一个新问题时,我最初不会去想并发性是否适合。我首先寻找一个顺序的解决方案,并确保它是可行的。然后在可读性和技术审查之后,我会开始询问并发性是否合理和实用。有时很明显,并发性是一个很好的选择,而其他时候就不那么清楚了。
在这个系列的第一部分,我解释了操作系统调度器的机制和语义,我认为如果你打算编写多线程代码,这些机制和语义是很重要的。在第二部分中,我解释了Go调度器的语义,我认为这对于理解如何在Go中编写并发代码很重要。在这篇文章中,我将开始把操作系统和Go调度器的机制和语义结合起来,以便更深入地了解什么是并发,什么不是。
这篇文章的目标是:
- 提供关于你必须考虑的语义的指导,以确定工作负载是否适合使用并发性。
- 向你展示不同类型的工作负载如何改变语义,从而改变你要做出的工程决策。
什么是并发性 What is Concurrency
并发意味着 “不按顺序 “执行。以一组本来会按顺序执行的指令,找到一种方法来不按顺序执行它们,仍然产生相同的结果。对于你面对的问题,不按顺序执行必须是明显的,会增加价值。当我说价值时,我的意思是为复杂的成本增加足够的性能增益。根据你的问题,不按顺序执行也有可能是不能的甚至是没有意义的。
同样重要的是要明白,并发性与并行性是不同的(concurrency is not the same as parallelism)。并行性意味着同时执行两条或更多的指令。这是与并发性不同的概念。只有当你有至少2个操作系统(OS)和硬件线程可用,并且你有至少2个Goroutine,每个Goroutine在每个OS/硬件线程上独立执行指令时,并行性才有可能。
Figure 1 : Concurrency vs Parallelism
![[Pasted image 20220720103852.png]]
In figure 1, 你看到的是两个逻辑处理器(P)的图,每个处理器都有独立的操作系统线程(M),连接到机器上的独立硬件线程(Core)。你可以看到两个Goroutine(G1和G2)正在并行执行,同时在各自的操作系统/硬件线程上执行它们的指令。在每个逻辑处理器中,三个Goroutine轮流分享各自的操作系统线程。所有这些Goroutine都在并发运行,不按特定顺序执行它们的指令,并在操作系统线程上分享时间。
这里有一个问题,有时利用并发性而不利用并行性,实际上会减慢你的吞吐量。同样有趣的是,有时利用并行的并发性并不能给你带来比你可能认为的更大的性能增益。
工作负载 Workloads
你怎么知道什么时候可以不按顺序执行或有意义?了解你的问题所处理的工作负载的类型是一个很好的开始。在考虑并发性问题时,有两种类型的工作负载是需要了解的。
- CPU-Bound: 这是一个从未创造出Goroutines自然进入和离开等待状态的情况的工作负荷。这是一个不断进行计算的工作。一个计算Pi到第N位的线程将是CPU密集的。
- IO-Bound: 这是一个导致Goroutines自然进入等待状态的工作负荷。这种工作包括通过网络请求访问资源,或向操作系统进行系统调用,或等待事件的发生。一个需要读取文件的Goroutine将是IO-Bound。我将把导致Goroutine等待的同步事件(mutexes, atomic)也归入这一类别。
对于CPU密集的工作负载,你需要并行性来利用并发性。一个处理多个Goroutine的操作系统/硬件线程并不高效,因为这些Goroutine并没有作为其工作负载的一部分进出等待状态。如果Goroutines的数量超过了操作系统/硬件线程的数量,就会降低工作负载的执行速度,因为将Goroutines移入和移出操作系统线程的延迟成本(需要的时间)。上下文切换为你的工作负载创造了一个 “STW “的事件,因为你的工作负载在切换期间没有被执行,而它本来可以被执行。
对于IO-密集工作负载,你不需要并行性来使用并发性。一个操作系统/硬件线程可以高效地处理多个Goroutines,因为Goroutines作为其工作负载的一部分,自然会在等待状态中移动。拥有比操作系统/硬件线程更多的Goroutines可以加速工作负载的执行,因为将Goroutines移入和移出操作系统线程的延迟成本不会产生一个 “停止世界 “事件。你的工作负载自然会停止,这使得不同的Goroutine可以有效地利用同一个操作系统/硬件线程,而不是让操作系统/硬件线程闲置。
你怎么知道每个硬件线程有多少个Goroutines能提供最好的吞吐量?太少的Goroutines,你会有更多的空闲时间。太多的Goroutines,你会有更多的上下文切换延迟时间。这是你要考虑的问题,但超出了这篇特定文章的范围。
目前,重要的是审查一些代码,以巩固你的能力,识别工作负载何时可以利用并发性,何时不能,以及是否需要并行性。
Adding Numbers
我们不需要复杂的代码来可视化和理解这些语义。看看下面这个名为add
的函数,它为一个整数集合求和。
Listing 1
https://play.golang.org/p/r9LdqUsEzEz
36 func add(numbers []int) int {
37 var v int
38 for _, n := range numbers {
39 v += n
40 }
41 return v
42 }
在列表1的第36行,声明了一个名为add的函数,它接收一个整数集合并返回该集合的总和。它在第37行开始声明v变量,以包含总和。然后在第38行,该函数线性地遍历集合,在第39行将每个数字加入到当前的总和中。最后在第41行,该函数将最后的总和返回给调用者。
Question: add函数是一个适合于失序执行的工作负载吗?我相信答案是肯定的。整数的集合可以被分解成更小的列表,这些列表可以被同时处理。一旦所有的小列表被加起来,这组总和可以被加在一起,产生与顺序版本相同的答案。
然而,还有一个问题浮现在脑海中。应该创建多少个较小的列表并独立处理以获得最佳吞吐量?要回答这个问题,你必须知道add在执行什么样的工作负载。add函数执行的是CPU-Bound工作负载,因为该算法执行的是纯数学运算,它所做的任何事情都不会导致goroutine进入自然等待状态。这意味着每个操作系统/硬件线程使用一个Goroutine就可以获得良好的吞吐量。
下面的Listing 2 是我的 add 的并发版本。
Note: 在编写add的并发版本时,有几种方法和选项可供选择。此时不要纠结于我的特定实现。如果你有一个更可读的版本,执行相同或更好,我希望你分享它。
Listing 2
https://play.golang.org/p/r9LdqUsEzEz
44 func addConcurrent(goroutines int, numbers []int) int {
45 var v int64
46 totalNumbers := len(numbers)
47 lastGoroutine := goroutines - 1
48 stride := totalNumbers / goroutines
49
50 var wg sync.WaitGroup
51 wg.Add(goroutines)
52
53 for g := 0; g < goroutines; g++ {
54 go func(g int) {
55 start := g * stride
56 end := start + stride
57 if g == lastGoroutine {
58 end = totalNumbers
59 }
60
61 var lv int
62 for _, n := range numbers[start:end] {
63 lv += n
64 }
65
66 atomic.AddInt64(&v, int64(lv))
67 wg.Done()
68 }(g)
69 }
70
71 wg.Wait()
72
73 return int(v)
74 }
In Listing 2, 提出了 addConcurrent 函数,它是 add 函数的并发版本。并发版本使用 26 行代码,而非并发版本使用 5 行代码。代码很多,所以我只会强调要理解的重要行。
Line 48: 每个Goroutine都有自己独特但更小的数字列表,列表的大小是用集合的大小除以Goroutine的数量来计算的。
Line 53: 创建Goroutines池来执行累加工作。
Line 57-59: 最后一个Goroutine将添加剩余的可能比其他Goroutine更大的数字列表。
Line 66: 较小列表的总和被加在一起成为最终总和。
并发版本肯定比顺序版本更复杂,但这种复杂性值得吗?回答这个问题的最好方法是创建一个基准。对于这些基准测试,我在垃圾收集器关闭的情况下使用了 1000 万个数字的集合。有使用 add
函数的顺序版本和使用 addConcurrent
函数的并发版本。
Listing 3
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
Listing 3 显示了基准测试函数。以下是所有 Goroutine 只能使用一个操作系统/硬件线程时的结果。顺序版本使用 1 个 Goroutine,并发版本使用 runtime.NumCPU 或我机器上的 8 个 Goroutine。在这种情况下,并发版本利用了没有并行性的并发性。
Listing 4
10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential 1000 5720764 ns/op : ~10% Faster
BenchmarkConcurrent 1000 6387344 ns/op
BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain 1000 6482612 ns/op
Note: 在本地机器上运行基准测试很复杂。有很多因素会导致您的基准测试不准确。确保您的机器尽可能空闲并运行几次基准测试。您要确保看到结果的一致性。让测试工具运行两次基准测试可以为该基准测试提供最一致的结果。
在本地机器上运行基准测试很复杂。有很多因素会导致您的基准测试不准确。确保您的机器尽可能空闲并运行几次基准测试。您要确保看到结果的一致性。让测试工具运行两次基准测试可以为该基准测试提供最一致的结果。
以下是每个 Goroutine 都有一个单独的操作系统/硬件线程时的结果。顺序版本使用 1 个 Goroutine,并发版本使用 runtime.NumCPU
或我机器上的 8 个 Goroutine。在这种情况下,并发版本正在利用并行性。
Listing 5
10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8 1000 5910799 ns/op
BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8 1000 5933444 ns/op
BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster
清单 5 中的基准测试表明,当每个 Goroutine 都有单独的操作系统/硬件线程可用时,并发版本比顺序版本快大约 41% 到 43%。这是我所期望的,因为所有的 Goroutine 现在都在并行运行,八个 Goroutine 同时执行它们的并发工作。
Sorting
重要的是要了解并非所有受 CPU 限制的工作负载都适合并发。当分解工作和/或合并所有结果非常昂贵时,这主要是正确的。使用称为冒泡排序的排序算法可以看到一个例子。看看下面在 Go 中实现冒泡排序的代码。
Listing 6
https://play.golang.org/p/S0Us1wYBqG6
01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06 n := len(numbers)
07 for i := 0; i < n; i++ {
08 if !sweep(numbers, i) {
09 return
10 }
11 }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15 var idx int
16 idxNext := idx + 1
17 n := len(numbers)
18 var swap bool
19
20 for idxNext < (n - currentPass) {
21 a := numbers[idx]
22 b := numbers[idxNext]
23 if a > b {
24 numbers[idx] = b
25 numbers[idxNext] = a
26 swap = true
27 }
28 idx++
29 idxNext = idx + 1
30 }
31 return swap
32 }
33
34 func main() {
35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36 fmt.Println(org)
37
38 bubbleSort(org)
39 fmt.Println(org)
40 }
在清单 6 中,有一个用 Go 编写的冒泡排序示例。这种排序算法会扫描整数集合,每次通过时交换值。根据列表的顺序,在对所有内容进行排序之前,可能需要多次遍历集合。
问题:bubbleSort 函数是适合乱序执行的工作负载吗?我相信答案是否定的。整数的集合可以分解成更小的列表,这些列表可以同时排序。但是,在完成所有并发工作之后,没有有效的方法将较小的列表排序在一起。这是冒泡排序的并发版本的示例。
Listing 8
01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02 totalNumbers := len(numbers)
03 lastGoroutine := goroutines - 1
04 stride := totalNumbers / goroutines
05
06 var wg sync.WaitGroup
07 wg.Add(goroutines)
08
09 for g := 0; g < goroutines; g++ {
10 go func(g int) {
11 start := g * stride
12 end := start + stride
13 if g == lastGoroutine {
14 end = totalNumbers
15 }
16
17 bubbleSort(numbers[start:end])
18 wg.Done()
19 }(g)
20 }
21
22 wg.Wait()
23
24 // Ugh, we have to sort the entire list again.
25 bubbleSort(numbers)
26 }
在清单 8 中,显示了 bubbleSortConcurrent 函数,它是 bubbleSort 函数的并发版本。它使用多个 Goroutine 同时对列表的各个部分进行排序。但是,您剩下的是按块排序的值列表。给定一个包含 36 个数字的列表,分成 12 个一组,如果整个列表没有在第 25 行再次排序,这将是结果列表。
Listing 9
Before:
25 51 15 57 87 10 10 85 90 32 98 53
91 82 84 97 67 37 71 94 26 2 81 79
66 70 93 86 19 81 52 75 85 10 87 49
After:
10 10 15 25 32 51 53 57 85 87 90 98
2 26 37 67 71 79 81 82 84 91 94 97
10 19 49 52 66 70 75 81 85 86 87 93
由于冒泡排序的本质是遍历列表,因此在第 25 行调用气泡排序将否定使用并发的任何潜在收益。使用冒泡排序,使用并发不会提高性能。
Reading Files
已经介绍了两个 CPU-Bound 工作负载,但是 IO-Bound 工作负载呢?当 Goroutines 自然地进入和退出等待状态时,语义是否不同?查看一个读取文件并执行文本搜索的 IO-Bound 工作负载。
第一个版本是一个名为 find
的函数的顺序版本。
Listing 10
https://play.golang.org/p/8gFe5F8zweN
func find(topic string, docs []string) int {
var found int
for _, doc := range docs {
items, err := read(doc)
if err != nil {
continue
}
for _, item := range items {
if strings.Contains(item.Description, topic) {
found++
}
}
}
return found
}
在清单 10 中,您可以看到 find 函数的顺序版本。在第 43 行,声明了一个名为 found 的变量,以维护在给定文档中找到指定主题的次数的计数。然后在第 44 行,对文档进行迭代,并在第 45 行使用 read 函数读取每个文档。最后在第 49-53 行,使用 strings 包中的 Contains 函数检查是否可以在从文档读取的项目集合中找到主题。如果找到主题,则找到的变量加一。
这是 find 调用的 read 函数的实现。
Listing 11
https://play.golang.org/p/8gFe5F8zweN
func read(doc string) ([]item, error) {
time.Sleep(time.Millisecond) // Simulate blocking disk read.
var d document
if err := xml.Unmarshal([]byte(file), &d); err != nil {
return nil, err
}
return d.Channel.Items, nil
}
清单 11 中的 read
函数以 time.Sleep
调用开始,持续一毫秒。如果我们执行实际的系统调用以从磁盘读取文档,则此调用用于模拟可能产生的延迟。这种延迟的一致性对于准确测量 find
的顺序版本与并发版本的性能非常重要。然后在第 35-39 行,将存储在全局变量文件中的模拟 xml 文档解组为一个结构值进行处理。最后,在第 39 行将一组项目返回给调用者。
有了顺序版本,这里是并发版本。
Note: 在编写并发版本的find时,可以采用几种方法和选项。 此时不要纠结于我的特定实现。 如果你有一个更可读的版本,执行相同或更好,我希望你分享它。
Listing 12
https://play.golang.org/p/8gFe5F8zweN
58 func findConcurrent(goroutines int, topic string, docs []string) int {
59 var found int64
60
61 ch := make(chan string, len(docs))
62 for _, doc := range docs {
63 ch <- doc
64 }
65 close(ch)
66
67 var wg sync.WaitGroup
68 wg.Add(goroutines)
69
70 for g := 0; g < goroutines; g++ {
71 go func() {
72 var lFound int64
73 for doc := range ch {
74 items, err := read(doc)
75 if err != nil {
76 continue
77 }
78 for _, item := range items {
79 if strings.Contains(item.Description, topic) {
80 lFound++
81 }
82 }
83 }
84 atomic.AddInt64(&found, lFound)
85 wg.Done()
86 }()
87 }
88
89 wg.Wait()
90
91 return int(found)
92 }
在清单 12 中,显示了 findConcurrent
函数,它是 find
函数的并发版本。并发版本使用 30 行代码,而非并发版本使用 13 行代码。我实现并发版本的目标是控制用于处理未知数量文档的 Goroutine 的数量。我选择了一种池化模式,其中一个通道用于为 Goroutines 池提供数据。
代码很多,所以我只会强调要理解的重要行。
Lines 61-64: 创建一个通道并填充所有要处理的文档。
Line 65: 通道是关闭的,所以当所有文档都处理完后,Goroutines 池自然会终止。
Line 70: Goroutines 池被创建。
Line 73-83: 池中的每个 Goroutine 从通道接收文档,将文档读入内存并检查主题的内容。当存在匹配时,本地lFound
变量会增加。
Line 84: 各个 Goroutine 计数的总和被加在一起形成最终计数。
并发版本肯定比顺序版本更复杂,但这种复杂性值得吗?再次回答这个问题的最好方法是创建一个基准。对于这些基准测试,我使用了 1000 个文档的集合,并关闭了垃圾收集器。有使用 find 函数的顺序版本和使用 findConcurrent 函数的并发版本。
Listing 13
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
find("test", docs)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
findConcurrent(runtime.NumCPU(), "test", docs)
}
}
清单 13 显示了基准函数。以下是所有 Goroutine 只能使用一个操作系统/硬件线程时的结果。顺序使用 1 个 Goroutine,并发版本使用 runtime.NumCPU 或我机器上的 8 个 Goroutine。在这种情况下,并发版本利用了没有并行性的并发性。
Listing 14
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential 3 1483458120 ns/op
BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain 2 1502682536 ns/op
BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster
清单 14 中的基准测试表明,当所有 Goroutine 都只有一个操作系统/硬件线程可用时,并发版本比顺序版本快大约 87% 到 88%。这是我所期望的,因为所有 Goroutine 都有效地共享单个操作系统/硬件线程。在 read 调用上每个 Goroutine 发生的自然上下文切换允许随着时间的推移在单个操作系统/硬件线程上完成更多工作。
这是使用并发与并行性时的基准测试。
Listing 15
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8 3 1490947198 ns/op
BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8 3 1416126029 ns/op
BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster
清单 15 中的基准测试表明,引入额外的操作系统/硬件线程并不能提供任何更好的性能。
Conclusion
这篇文章的目的是提供关于确定工作负载是否适合使用并发性时必须考虑的语义的指导。 我试图提供不同类型的算法和工作负载的示例,以便您能够看到语义上的差异以及需要考虑的不同工程决策。
您可以清楚地看到,对于io密集的工作负载,不需要并行性来大幅提高性能。 这与cpu密集的工作正好相反。 对于冒泡排序这样的算法,使用并发会增加复杂性,但不会带来任何实际的性能好处。 重要的是要确定您的工作负载是否适合并发,然后确定必须使用正确语义的工作负载类型。
发表回复