5.3 pipeline
channel可以连接goroutine,如果一个goroutine的输出是另一个goroutine的输入,就叫作pipeline。可以理解为pipeline是虚拟的,用来连接goroutine和channel,并且最终形成一个goroutine的输出成为另一个goroutine的输入,且是使用channel传递数据的。
使用pipeline的好处有三点:首先,在程序中形成一个清晰稳定的数据流,我们在使用的时候不需要过多考虑goroutine和channel的相关通信和状态问题。其次,在一个pipeline内,不需要把数据再保存为变量,节省了内存空间并提高了效率。最后,使用pipeline能够简化程序的复杂度,便于维护。
为了便于理解,还是先来看示例代码:
book/ch05/5.3/main.go
1. package main
2.
3. import (
4. "fmt"
5. "math/rand"
6. )
7.
8. var done = false
9. var Mess = make(map[int]bool)
10. func main() {
11. A := make(chan int)
12. B := make(chan int)
13. go sendRan(50,10,A)
14. go receive(B,A)
15. sum(B)
16. }
17.
18. func genRandom(max,min int) int {
19. return rand.Intn(max-min)+min
20. }
21.
22. func sendRan(max,min int,out chan<- int) {
23. for{
24. if done{
25. close(out)
26. return
27. }
28. out <- genRandom(max,min)
29. }
30. }
31.
32. func receive(out chan<- int ,in <-chan int) {
33. for r := range in{
34. fmt.Println(" ",r)
35. _,ok := Mess[r]
36. if ok {
37. fmt.Println("duplicate num is:",r)
38. done = true
39. }else {
40. Mess[r] = true
41. out <- r
42. }
43. }
44. close(out)
45. }
46.
47. func sum(in <-chan int) {
48. var sum int
49. for r := range in{
50. sum += r
51. }
52. fmt.Println("The sum is:",sum)
53. }
先来说一下整个程序完成的任务:随机生成一些数字写入channel,然后另一个channel读取,如果随机数在以前已经出现过,就关闭生成的channel。有一个函数会计算读取channel里面所有随机数的和并打印,下面还是详细讲解一下代码。
第8行和第9行,定义了两个变量。done是布尔类型,默认为false,一旦发现产生了重复的随机数,则设置为true。map类型的Mess则用来记录随机数,每个随机数r都对应Mess[r]true的值,每次新的r生成以后只需要判断Mess[r]是否为true就可知该数据是否已经生成过。
第18行至第30行,生成随机数,并且写入通道。该操作通过genRandom和sendRan函数来完成。genRandom函数根据传入的两个参数max和min生成一个随机数。第24行是判断done的值,如果其值变为true就关闭通道。不过done的值在本函数中不会修改,它是在后面的receive函数修改的。
第32行至第45行,把数据从通道in写入通道out。其中第35行至第42行用于判断该值以前是否生成过,如果生成过则把done设置为true。这里会影响上面的sendRan函数。最后,关闭通道out。
第47行至第53行,sum函数用于读取通道的数并且求和。注意这个循环,在receive关闭通道以后才会停止读数据。
第10行至第16行是main函数,它调用了两个channel,作为参数传递给sendRan和receive函数,然后用goroutine的方式分别运行这两个函数。第三个函数sum并没有使用goroutine运行,因为这个函数放到这里起阻塞main函数往下执行的作用,channel不关闭,sum的读取循环就不会结束,这样正好可以让主goroutine等待其他两个goroutine的执行结束。
执行程序,会看到如下结果:
1. 11 2. 17 3. 17 4. duplicate num is: 17 5. The sum is: 28
因为重复的数字没有写入通道out,且直接关闭了通道(见第36行至第38行),所以重复的数字不会被sum计算。