Go微服务实战
上QQ阅读APP看书,第一时间看更新

5.3 pipeline

channel可以连接goroutine,如果一个goroutine的输出是另一个goroutine的输入,就叫作pipeline。可以理解为pipeline是虚拟的,用来连接goroutine和channel,并且最终形成一个goroutine的输出成为另一个goroutine的输入,且是使用channel传递数据的。

使用pipeline的好处有三点:首先,在程序中形成一个清晰稳定的数据流,我们在使用的时候不需要过多考虑goroutine和channel的相关通信和状态问题。其次,在一个pipeline内,不需要把数据再保存为变量,节省了内存空间并提高了效率。最后,使用pipeline能够简化程序的复杂度,便于维护。

为了便于理解,还是先来看示例代码:


book/ch05/5.3/main.go
1. package main
2.
3. import (
4.     "fmt"
5.     "math/rand"
6. )
7.
8. var done = false
9. var Mess = make(map[int]bool)
10. func main() {
11.     A := make(chan int)
12.     B := make(chan int)
13.     go sendRan(50,10,A)
14.     go receive(B,A)
15.     sum(B)
16. }
17.
18. func genRandom(max,min int) int  {
19.     return rand.Intn(max-min)+min
20. }
21.
22. func sendRan(max,min int,out chan<- int)  {
23.     for{
24.         if done{
25.             close(out)
26.             return
27.         }
28.         out <- genRandom(max,min)
29.     }
30. }
31.
32. func receive(out chan<- int ,in <-chan int)  {
33.     for r := range in{
34.         fmt.Println(" ",r)
35.         _,ok := Mess[r]
36.         if ok {
37.             fmt.Println("duplicate num is:",r)
38.             done = true
39.         }else {
40.             Mess[r] = true
41.             out <- r
42.         }
43.     }
44.     close(out)
45. }
46.
47. func sum(in <-chan int)  {
48.     var sum int
49.     for r := range in{
50.         sum += r
51.     }
52.     fmt.Println("The sum is:",sum)
53. }

先来说一下整个程序完成的任务:随机生成一些数字写入channel,然后另一个channel读取,如果随机数在以前已经出现过,就关闭生成的channel。有一个函数会计算读取channel里面所有随机数的和并打印,下面还是详细讲解一下代码。

第8行和第9行,定义了两个变量。done是布尔类型,默认为false,一旦发现产生了重复的随机数,则设置为true。map类型的Mess则用来记录随机数,每个随机数r都对应Mess[r]true的值,每次新的r生成以后只需要判断Mess[r]是否为true就可知该数据是否已经生成过。

第18行至第30行,生成随机数,并且写入通道。该操作通过genRandom和sendRan函数来完成。genRandom函数根据传入的两个参数max和min生成一个随机数。第24行是判断done的值,如果其值变为true就关闭通道。不过done的值在本函数中不会修改,它是在后面的receive函数修改的。

第32行至第45行,把数据从通道in写入通道out。其中第35行至第42行用于判断该值以前是否生成过,如果生成过则把done设置为true。这里会影响上面的sendRan函数。最后,关闭通道out。

第47行至第53行,sum函数用于读取通道的数并且求和。注意这个循环,在receive关闭通道以后才会停止读数据。

第10行至第16行是main函数,它调用了两个channel,作为参数传递给sendRan和receive函数,然后用goroutine的方式分别运行这两个函数。第三个函数sum并没有使用goroutine运行,因为这个函数放到这里起阻塞main函数往下执行的作用,channel不关闭,sum的读取循环就不会结束,这样正好可以让主goroutine等待其他两个goroutine的执行结束。

执行程序,会看到如下结果:


1.   11
2.   17
3.   17
4. duplicate num is: 17
5. The sum is: 28

因为重复的数字没有写入通道out,且直接关闭了通道(见第36行至第38行),所以重复的数字不会被sum计算。