Don’t Communicate by sharing memory, share memory by communicating.
不要通过共享内存来通信;相反,通过通信来共享内存。
Generator 指返回一个 chan 的函数。这是一种十分常见的使用 goroutine +
chan 的方式,可以说是一种标准用法了。
采用这种方式使用 chan 十分的安全,不会出现一些 chan 误用导致的错误(例如向已经关闭的 chan 写入数据等)。
例如下面的代码,会开一个 goroutine 递归遍历指定目录,并将目录下的所有
json 文件通过 chan 吐出去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| func walkJsonFiles(dir string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
err := filepath.WalkDir(dir,
func(path string, info fs.DirEntry, err error) error {
if err != nil {
log.Printf("can't access path: %q: %v\n", path, err)
return err
}
if !info.Type().IsDir() && filepath.Ext(path) == ".json" {
out <- path
}
return nil
})
if err != nil {
log.Printf("error walking the path %q: %v\n", dir, err)
return
}
}()
return out
}
|
下面这张图可以形象的表达 Fan-In 的概念:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| func fanIn(cs ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
// 注意:这里不能直接 wg.Wait(),需要开一个 goroutine 来 Wait
defer func() {
go func() {
wg.Wait()
close(out)
}()
}()
collect := func(in <-chan string) {
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// Fan-In
for _, c := range cs {
go collect(c)
}
return out
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func fanInUsingSelect(input1, input2 <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for {
select {
case x, ok := <-input1:
if !ok {
input1 = nil
} else {
out <- x
}
case x, ok := <-input2:
if !ok {
input2 = nil
} else {
out <- x
}
}
if input1 == nil && input2 == nil {
break
}
}
}()
return out
}
|
Fan-Out 刚好和 Fan-In 相反,一般和 Fan-In 配合起来一起使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| func processJsonFiles(ch <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range ch {
// do something useful ...
out <- s + " done"
}
}()
return out
}
func fanOut(in <-chan string) <-chan string {
// 同时开 n 个 goroutine 来处理这些 json files
n := 20
cs := make([]<-chan string, n)
for i := 0; i < n; i++ {
cs[i] = processJsonFiles(in)
}
out := fanIn(cs...)
return out
}
func main() {
ch := walkJsonFiles("./")
out := fanOut(ch)
for s := range out {
fmt.Println(s)
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| package main
import (
"fmt"
"time"
)
// 为每次 select 设置超时
// 每次 select 都会重置超时
func selectTimeout(in <-chan string) {
for {
select {
case s := <-in:
fmt.Println("tick", s)
case <- time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}
// 为整个 for 循环设置一个超时,时间结束即退出
func selectTimeout2(in <-chan string) {
timeout := time.After(1 * time.Second)
for {
select {
case s := <-in:
fmt.Println("tock", s)
case <- timeout:
fmt.Println("timed out")
return
}
}
}
func main() {
in := make(chan string)
go func() {
for i := 0; i < 20; i++ {
time.Sleep(time.Duration(i * 200) * time.Millisecond)
in <- fmt.Sprintf("%d", i)
}
}()
go selectTimeout2(in)
selectTimeout(in)
}
|
输出如下:
1
2
3
4
5
6
7
| tick 0
tock 1
tick 2
timed out
tick 3
tick 4
You're too slow.
|
由于 chan 可以进行双向通信(round-trip communication),因此,可以很方便的实现退出前的清理工作。
例如:
- A 通知 B 退出
- B 收到退出指令时,执行清理动作,完成后再通知 A
- A 收到通知后,结束完整的退出流程
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| import (
"fmt"
"io/fs"
"path/filepath"
"errors"
)
func walkAndGracefulQuit(dir string, quit chan string) (<-chan string, <-chan bool) {
out := make(chan string)
done := make(chan bool)
go func() {
defer func() {
close(out)
cleanup()
done <- true
}()
filepath.WalkDir(dir,
func(path string, info fs.DirEntry, err error) error {
if err != nil {
fmt.Printf("can't access path: %q %v\n", path, err)
return err
}
if !info.Type().IsDir() && filepath.Ext(path) == ".md" {
select {
case out <- path:
// do nothing
case <-quit:
return errors.New("quit")
}
}
return nil
})
}()
return out, done
}
func cleanup() {
}
func main() {
quit := make(chan string)
c, done := walkAndGracefulQuit(".", quit)
i := 0
for name := range c {
fmt.Println(name)
// 找到第一个文件后,立即退出
if i++; i == 1 {
// 这里要用 goroutine 在后台发射退出信号,以防止目录下只有一个 md 文
// 件时,由于没有机会读 quit chan 而出现死锁现象。另一种做法是,利用
// select 来发射,但没有用 goroutine 这么简单和稳妥。
go func() {
quit <- "bye"
}()
}
}
// 等待清理动作完毕,正式结束程序
<-done
}
|
Go talks 有一个很经典的案例,这里我们学习一下。
假设 Google Search 有三个后端搜索服务:
- Web Search
- Image Search
- Video Search
这三个分别负责搜索网页,图片和视频。现在要整合这三个服务,对用户提供完整的搜索服务。
最简单形态,轮流访问后端服务,无并发实现:
1
2
3
4
5
6
| func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
|
无需锁、条件变量、回调,即可实现并发搜索:
1
2
3
4
5
6
7
8
9
10
11
12
| func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() {c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
|
设置搜索服务的超时时间,避免被后端较慢的(或者出故障的)服务拖垮:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func Google(query string) (results []Result) {
c := make(chan Result)
go func(){ c <- Web(query) }()
go func(){ c <- Image(query) }()
go func(){ c <- Video(query) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
}
|
通过后端服务多开(replicated),降低尾部延迟(tail latency):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
|