20 张动图为你演示 go 并发 | go优质外文翻译 | go 技术论坛-金年会app官方网
如果你更喜欢通过视频了解本文,请点击观看我在gophercon上的演讲
go语言最强大的特性之一就是基于 这篇论文实现的内置并发. go在设计时就考虑了并发并允许我们构建复杂的并发管道。那你有没有想过,各种并发模式看起来是怎样的?
你一定想过。 我们多数情况下都会通过想象来思考问题. 如果我问你一个关于“1到100的数字”的问题,你脑子里就会下意识的出现一系列画面。例如,我会把它想象成一条从我开始的直线,从数字1到20然后右转90度一直到1000 。我记得我很小的时候,在我们的幼儿园里,衣帽间里有很多数字,写在墙上,数字20恰好在拐角处。你可能有你自己的关于数字的画面。另一个常见的例子是一年四季的视觉展现——有人将之想象成一个盒子,有人将之想象成一个圈。
无论如何, 我想用go和webgl把我对于常见的并发模式的具象化尝试展现给大家.这多多少少代表了我对于并发程序的理解。如果能听到我和大家脑海中的画面有什么不同,一定会非常有趣。 我特别想知道 rob pike 或者 sameer ajmani 脑子里是怎么描绘并发图像的. 我打赌我会很感兴趣的。
那么,让我们从一个很基础的“hello,concurrent world”例子开始我们今天的主题。
hello, concurrent world
代码很简单——单个通道,单个goroutine,单次写入,单次读取。
package main
func main() {
// 创建一个int类型的通道
ch := make(chan int)
// 开启一个匿名 goroutine
go func() {
// 向通道发送数字42
ch <- 42
}()
// 从通道中读取
<-ch
}
蓝色线代表随时间运行的goroutine. 连接‘main’和‘go #19’的蓝色细线用来标记goroutine的开始和结束同时展示了父子关系,最后,红线代表发送/接收动作. 虽然这是两个独立的动作,我还是尝试用“从 a 发送到 b”的动画将他们表示成一个动作. goroutine 名称中的“#19” 是 goroutine 真实的内部id, 其获取方法参考了 scott mansfield 的 这篇文章。
timers
实际上,你可以通过以下方法构建一个简单的计时器——创建一个通道, 开启一个 goroutine 让其在指定的时间间隔后向通道中写入数据,然后将这个通道返回给调用者。于是调用函数就会在读取通道时阻塞,直到之前设定的时间间隔过去。接下来我们调用24次计时器然后尝试具象化调用过程。
package main
import "time"
func timer(d time.duration) <-chan int {
c := make(chan int)
go func() {
time.sleep(d)
c <- 1
}()
return c
}
func main() {
for i := 0; i < 24; i {
c := timer(1 * time.second)
<-c
}
}
很整洁,对吗? 我们继续。
ping-pong
这个并发例子取自谷歌员工 sameer ajmani 演讲。当然,这个模式不算非常高级,但是对于那些只熟悉go的并发机制的人来说它看起来可能非常新鲜有趣。
这里我们用一个通道代表乒乓球台. 一个整型变量代表球, 然后用两个goroutine代表玩家,玩家通过增加整型变量的值(点击计数器)模拟击球动作。
package main
import "time"
func main() {
var ball int
table := make(chan int)
go player(table)
go player(table)
table <- ball
time.sleep(1 * time.second)
<-table
}
func player(table chan int) {
for {
ball := <-table
ball
time.sleep(100 * time.millisecond)
table <- ball
}
}
这里我建议你点击 进入交互式 webgl 动画操作一下. 你可以放慢或者加速动画,从不同的角度观察。
现在,我们添加三个玩家看看。
go player(table)
go player(table)
go player(table)
我们可以看到每个玩家都按照次序轮流操作,你可能会想为什么会这样。为什么多个玩家(goroutine)会按照严格的顺序接到“球”呢。
答案是 go 运行时环境维护了一个 (存储需要从某一通道上接收数据的goroutine),在我们的例子里,每个玩家在刚发出球后就做好了接球准备。我们来看一下更复杂的情况,加入100个玩家。
for i := 0; i < 100; i {
go player(table)
}
先进先出顺序很明显了,是吧? 我们可以创建一百万个goroutine,因为它们很轻量,但是对于实现我们的目的来说没有必要。我们来想想其他可以玩的。 例如, 常见的消息传递模式。
fan-in
并发世界中流行的模式之一是所谓的 fan-in 模式。这与 fan-out 模式相反,稍后我们将介绍。简而言之,fan-in 是一项功能,可以从多个输入中读取数据并将其全部多路复用到单个通道中。
举例来说:
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.duration) {
var i int
for {
ch <- i
i
time.sleep(d)
}
}
func reader(out chan int) {
for x := range out {
fmt.println(x)
}
}
func main() {
ch := make(chan int)
out := make(chan int)
go producer(ch, 100*time.millisecond)
go producer(ch, 250*time.millisecond)
go reader(out)
for i := range ch {
out <- i
}
}
如我们所见,第一个 producer 每100毫秒生成一次值,第二个每250毫秒生成一次值,但是 reader 会立即从这两个生产者那里接受值。实际上,多路复用发生在 main 的range循环中。
workers
与 fan-in 相反的模式是 fan-out 或者worker 模式。多个 goroutine 可以从单个通道读取,从而在cpu内核之间分配大量的工作量,因此是 worker 的名称。在go中,此模式易于实现-只需以通道为参数启动多个goroutine,然后将值发送至该通道-go运行时会自动地进行分配和复用 :)
package main
import (
"fmt"
"sync"
"time"
)
func worker(tasksch <-chan int, wg *sync.waitgroup) {
defer wg.done()
for {
task, ok := <-tasksch
if !ok {
return
}
d := time.duration(task) * time.millisecond
time.sleep(d)
fmt.println("processing task", task)
}
}
func pool(wg *sync.waitgroup, workers, tasks int) {
tasksch := make(chan int)
for i := 0; i < workers; i {
go worker(tasksch, wg)
}
for i := 0; i < tasks; i {
tasksch <- i
}
close(tasksch)
}
func main() {
var wg sync.waitgroup
wg.add(36)
go pool(&wg, 36, 50)
wg.wait()
}
这里值得一提的是:并行性。如您所见,所有goroutine并行’运行‘,等待通道给予它们’工作‘。鉴于上面的动画,很容易发现goroutine几乎立即接连地收到它们的工作。不幸的是,该动画在goroutine确实在处理工作还是仅仅是在等待输入的地方没有用颜色显示出来,但是此动画是在gomaxprocs=4的情况下录制的,因此只有4个goroutine有效地并行运行。我们将很快讨论这个主题。
现在,让我们做一些更复杂的事情,并启动一些有自己workers(subworkers)的workers。
package main
import (
"fmt"
"sync"
"time"
)
const (
workers = 5
subworkers = 3
tasks = 20
subtasks = 10
)
func subworker(subtasks chan int) {
for {
task, ok := <-subtasks
if !ok {
return
}
time.sleep(time.duration(task) * time.millisecond)
fmt.println(task)
}
}
func worker(tasks <-chan int, wg *sync.waitgroup) {
defer wg.done()
for {
task, ok := <-tasks
if !ok {
return
}
subtasks := make(chan int)
for i := 0; i < subworkers; i {
go subworker(subtasks)
}
for i := 0; i < subtasks; i {
task1 := task * i
subtasks <- task1
}
close(subtasks)
}
}
func main() {
var wg sync.waitgroup
wg.add(workers)
tasks := make(chan int)
for i := 0; i < workers; i {
go worker(tasks, &wg)
}
for i := 0; i < tasks; i {
tasks <- i
}
close(tasks)
wg.wait()
}
很好。当然,我们可以将worker和subworker的数量设置为更高的值,但是我试图使动画清晰易懂。
更酷的 fan-out 模式确实存在,例如动态数量的worker/subworker,通过通道发送通道,但是 fan-out 的想法现在应该很清楚了。
服务器
下一个常见的模式类似于扇出,但是会在很短的时间内生成goroutine,只是为了完成某些任务。它通常用于实现服务器-创建侦听器,循环运行accept()并为每个接受的连接启动goroutine。它非常具有表现力,可以实现尽可能简单的服务器处理程序。看一个简单的例子:
package main
import "net"
func handler(c net.conn) {
c.write([]byte("ok"))
c.close()
}
func main() {
l, err := net.listen("tcp", ":5000")
if err != nil {
panic(err)
}
for {
c, err := l.accept()
if err != nil {
continue
}
go handler(c)
}
}
这不是很有趣-似乎并发方面没有发生任何事情。当然,在引擎盖下有很多复杂性,这是我们特意隐藏的。 .
但是,让我们回到并发性并向金年会app官方网的服务器添加一些交互。假设每个处理程序都希望异步写入记录器。在我们的示例中,记录器本身是一个单独的goroutine
,它可以完成此任务。
package main
import (
"fmt"
"net"
"time"
)
func handler(c net.conn, ch chan string) {
ch <- c.remoteaddr().string()
c.write([]byte("ok"))
c.close()
}
func logger(ch chan string) {
for {
fmt.println(<-ch)
}
}
func server(l net.listener, ch chan string) {
for {
c, err := l.accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
l, err := net.listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go logger(ch)
go server(l, ch)
time.sleep(10 * time.second)
}
不是吗?但是很容易看到,如果请求数量增加并且日志记录操作花费一些时间(例如,准备和编码数据),我们的* logger * goroutine很快就会成为瓶颈。我们可以使用一个已知的扇出模式。我们开始做吧。
服务器 工作者
带工作程序的服务器示例是记录器的高级版本。它不仅可以完成一些工作,而且还可以通过* results *通道将其工作结果发送回池中。没什么大不了的,但是它将我们的记录器示例扩展到了更实际的示例。
让我们看一下代码和动画:
package main
import (
"net"
"time"
)
func handler(c net.conn, ch chan string) {
addr := c.remoteaddr().string()
ch <- addr
time.sleep(100 * time.millisecond)
c.write([]byte("ok"))
c.close()
}
func logger(wch chan int, results chan int) {
for {
data := <-wch
data
results <- data
}
}
func parse(results chan int) {
for {
<-results
}
}
func pool(ch chan string, n int) {
wch := make(chan int)
results := make(chan int)
for i := 0; i < n; i {
go logger(wch, results)
}
go parse(results)
for {
addr := <-ch
l := len(addr)
wch <- l
}
}
func server(l net.listener, ch chan string) {
for {
c, err := l.accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
l, err := net.listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go pool(ch, 4)
go server(l, ch)
time.sleep(10 * time.second)
}
我们在4个goroutine之间分配了工作,有效地提高了记录器的吞吐量,但是从此动画中,我们可以看到记录器仍然可能是问题的根源。成千上万的连接在分配之前会汇聚在一个通道中,这可能导致记录器再次成为瓶颈。但是,当然,它会在更高的负载下发生。
并发素筛(素筛指素数筛法)
足够的扇入/扇出乐趣。让我们看看更复杂的并发算法。我最喜欢的例子之一是concurrent prime sieve,可以在对话中找到。素数筛,或[)是一种古老的算法,用于查找达到给定限制的素数。它通过按顺序消除所有质数的倍数来工作。天真的算法并不是真正有效的算法,尤其是在多核计算机上。
该算法的并发变体使用goroutine过滤数字-每个发现的素数一个goroutine,以及用于将数字从生成器发送到过滤器的通道。找到质数后,它将通过通道发送到* main *以进行输出。当然,该算法也不是很有效,特别是如果您想找到大质数并寻找最低的big o复杂度,但是我发现它非常优雅。
// 并发的主筛
package main
import "fmt"
// 将序列2、3、4,...发送到频道“ ch”。
func generate(ch chan<- int) {
for i := 2; ; i {
ch <- i // send 'i' to channel 'ch'.
}
}
//将值从通道“ in”复制到通道“ out”,
//删除可被“素数”整除的那些。
func filter(in <-chan int, out chan<- int, prime int) {
for {
i := <-in // receive value from 'in'.
if i%prime != 0 {
out <- i // send 'i' to 'out'.
}
}
}
//主筛:菊花链过滤器过程。
func main() {
ch := make(chan int) // create a new channel.
go generate(ch) // launch generate goroutine.
for i := 0; i < 10; i {
prime := <-ch
fmt.println(prime)
ch1 := make(chan int)
go filter(ch, ch1, prime)
ch = ch1
}
}
,请以交互模式随意播放此动画。我喜欢它的说明性-它确实可以帮助您更好地理解该算法。 * generate * goroutine发出从2开始的每个整数,每个新的goroutine仅过滤特定的质数倍数-2、3、5、7 …,将第一个找到的质数发送给* main *。如果旋转它从顶部看,您会看到从goroutine发送到main的所有数字都是质数。漂亮的算法,尤其是在3d中。
gomaxprocs(调整并发的运行性能)
现在,让我们回到我们的工作人员示例。还记得我告诉过它以gomaxprocs = 4运行吗?那是因为所有这些动画都不是艺术品,它们是真实程序的真实痕迹。
让我们回顾一下是什么。
gomaxprocs设置可以同时执行的最大cpu数量。
当然,cpu是指逻辑cpu。我修改了一些示例,以使他们真正地工作(而不仅仅是睡觉)并使用实际的cpu时间。然后,我运行了代码,没有进行任何修改,只是设置了不同的gomaxprocs值。 linux机顶盒有2个cpu,每个cpu具有12个内核,因此有24个内核。
因此,第一次运行演示了该程序在1个内核上运行,而第二次-使用了所有24个内核的功能。
| 这些动画中的时间速度是不同的(我希望所有动画都适合同一时间/ height),因此区别很明显。当gomaxprocs = 1时,下一个工作人员只有在上一个工作完成后才能开始实际工作。在gomaxprocs = 24的情况下,加速非常大,而复用的开销可以忽略不计。
不过,重要的是要了解,增加gomaxprocs并不总是可以提高性能,在某些情况下实际上会使它变得更糟。
goroutines leak
我们可以从go中的并发时间中证明什么呢?我想到的一件事情是goroutine泄漏。例如,如果您,可能会发生泄漏。或者,您只是忘记添加结束条件,而运行了for{}循环。
第一次在代码中遇到goroutine泄漏时,我的脑海中出现了可怕的图像,并且在下个周末我写了 。现在,我可以使用webgl可视化该恐怖图像。
看一看:
仅仅是看到此,我都会感到痛苦:) 所有这些行都浪费了资源,并且是您程序的定时炸弹。
parallelism is not concurrency
我要说明的最后一件事是并行性与并发性之间的区别。这个话题 ,rob pike在这个话题上做了一个。确实是#必须观看的视频之一。
简而言之,
并行是简单的并行运行事物。
并发是一种构造程序的方法。
因此,并发程序可能是并行的,也可能不是并行的,这些概念在某种程度上是正交的。我们在演示 gomaxprocs 设置效果时已经看到了这一点。
我可以重复所有这些链接的文章和谈话,但是一张图片相当于说了一千个字。我在这里能做的是可视化这个差异。因此,这是并行。许多事情并行运行。
这也是并行性:
但这是并发的:
还有这个:
这也是并发的:
how it was made
为了创建这些动画,我编写了两个程序:gotracer 和 gothree.js 库。首先,gotracer执行以下操作:
- 解析go程序的ast树(abstract syntax tree,抽象语法树),并在与并发相关的事件上插入带有输出的特殊命令-启动/停止goroutine,创建通道,向/从通道发送/接收。
- 运行生成的程序
- 分析此特殊输出,并生成带有事件和时间戳描述的json。
生成的json示例:
接下来,gothree.js使用令人惊叹的 库的功能来使用webgl绘制3d线和对象。我做了一些很小的包装使其适合单个html页面-就是这样。
但是,这种方法非常有局限。我必须准确地选择示例,重命名通道和goroutine,以使得或多或少复杂的代码产生正确的跟踪。使用这种方法,如果goroutine具有不同的名称,就没有简便的方法来关联goroutine之间的通道。更不用说通过chan类型的通道发送的通道。时序方面也存在很大的问题-输出到stdout可能比发送值花费更多的时间,因此在某些情况下,我必须放置time.sleep(一些毫秒)以获取正确的动画。
基本上,这就是为什么我还没有开源代码的原因。我正在玩 dmitry vyukov 的 ,它似乎提供了事件的详细信息,但是没有包含发送值得信息。也许有更好的方法可以实现预期的目标。如果您有想法,请在twitter给我写信,或在评论中写给我。如果能将这个为期两周的工具扩展为任何go程序的真正的调试/跟踪工具,那将是非常棒的。
我也很乐意看到此处未列出的更有趣的并发算法和模式。欢迎随时在评论中写一个。
happy coding!
upd: 可在 上使用此工具,并使用go execution tracer和打了补丁的运行时生成跟踪。
另外,我愿意接受新工作,因此,如果您公司/团队对我感兴趣,有难题需要使用go解决,我可以远程工作(或者您在巴塞罗那)并在招聘,请告诉我:)
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 cc 协议,如果我们的工作有侵犯到您的权益,请及时联系金年会app官方网。
原文地址:
大制作
6666666,整个流程大概看了看,很不错
牛批..
看不懂,反正一句牛逼
很不错的,充分说明了
并行
与并发
。点赞谢谢支持啦
虽然暂时看不懂,但是感觉很牛
这个厉害了
虽然看不懂,可还是感觉很屌的样子
赞赞,值得收藏
看不懂 但是很吊的样子
尽管一个都看不懂,但是好像很厉害的样子
哇,视觉感来了。
666 :frog: :frog: