Go微服务实战
上QQ阅读APP看书,第一时间看更新

8.1 竞态与并发模式

8.1.1 数据竞态

第5章介绍了并发编程的goroutine和channel,通过这两者,读者可以进行并发编程。本节要介绍的则是并发中另一个非常重要的知识点—竞态。什么是竞态呢?第7章的综合案例当中其实已经使用过,就是在tcp_server.go的accept和remove方法中使用的sync.mutex锁,这里再把accept和remove方法的代码列出来,仔细分析一下:


type TcpServer struct {
    listener net.Listener
    clients []*client
    mutex *sync.Mutex
}

func (s *TcpServer) accept(conn net.Conn) *client  {
    log.Printf("Accepting connection from %v,total clients:%v",conn.
        RemoteAddr().String(),len(s.clients)+1)

    s.mutex.Lock()
    defer s.mutex.Unlock()

    client := &client{
        conn:conn,
        writer:protocol.NewWriter(conn),
    }

    s.clients = append(s.clients,client)
    return client
}

func (s *TcpServer) remove(client *client)  {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    for i,check := range s.clients{
        if check == client {
            s.clients = append(s.clients[:i],s.clients[i+1:]...)
        }
    }
    log.Printf("Closing connection from %v",client.conn.RemoteAddr().String())
    client.conn.Close()
}

在分析这个问题前,可以先做一个假设,如果不使用锁,代码会是如下形式:


func (s *TcpServer) accept(conn net.Conn) *client  {
    log.Printf("Accepting connection from %v,total clients:%v",conn.
        RemoteAddr().String(),len(s.clients)+1)

    client := &client{
        conn:conn,
        writer:protocol.NewWriter(conn),
    }

    s.clients = append(s.clients,client)
    return client
}

func (s *TcpServer) remove(client *client)  {

    for i,check := range s.clients{
        if check == client {
            s.clients = append(s.clients[:i],s.clients[i+1:]...)
        }
    }
    log.Printf("Closing connection from %v",client.conn.RemoteAddr().String())
    client.conn.Close()
}

上述代码如果仅有一个client发起请求肯定是没有问题的,可是如果同时有多个client发起请求,也就是accept并发执行,就有可能出现问题了。

假设有A和B发起请求,A执行到accept方法的倒数第二行的append语句之前,正好B的整个accept方法执行完成,而且B马上开始执行remove方法,在B的remove方法内,其append语句和A的accept方法的append语句就很可能产生冲突,因为同时有两个goroutine对同一个变量进行操作,比如remove内的for循环在执行时,通过accept方法使clients切片内的元素增多,可能会导致删除语句删掉不该删除的元素。

试想以下情景:

1)clients:A,A accept阻塞在append。

2)clients:B,B accept执行成功。

3)clients:B,A的accept和B的remove并发执行。

执行第三步的时候,有可能被删掉的是A,甚至A和B都会在clients内被清空。不过这都是猜测,此处笔者要提醒读者,不可靠感觉推测并发的结果,考虑到编译器和机器的原因,并发极有可能出现不可预料的结果。

上述情况就是竞态的一种,这属于数据竞态(data race),这种情况发生在多个goroutine并发读写同一个变量,并且至少有一个goroutine写入时。

这个概念有些抽象,读起来可能有些困难,此处准备了一个简单的例子来帮助大家更直观地理解这个问题,示例代码如下:


func main() {
    fmt.Println(getNumber())
}

func getNumber() int {
    var i int
    go func() {
        i = 5
    }()

    return i
}

代码非常简单,这里在getNumber方法内声明了一个int型变量i,并且给i赋值5,然后返回这个变量i。可是,如果执行上面的代码,打印出来的结果很可能不是5,而是0。到底结果是0还是5,取决于执行return语句的时候goroutine有没有执行完成。

当程序执行是图8-1所示的情形时,结果是0。

图8-1 goroutine执行示意图1

可以看到,出现这种情况是因为执行return的时候,goroutine对于i变量的赋值还没有执行完,所以返回了0。

如果执行return的时候goroutine已经执行完成,那么返回的值就是5,如图8-2所示。

图8-2 goroutine执行示意图2

这个例子可以很好地帮助我们理解数据竞态。

注意

在写代码时千万别认为由数据竞态引起的错误是小概率事件,并且错误地认为这种问题可以接受。我们要避免所有的数据竞态。

那么,我们在写代码时如何避免发生数据竞态问题呢?

数据竞态的产生是因为多个goroutine同时访问同一个变量,而且至少有一个goroutine会修改变量。所以可以根据数据竞态发生的条件来寻找解决竞态问题的方法。

第一种方案是不修改变量。这种不修改并非是只有一次赋值,因为即便是仅有一次赋值也可能出现竞态,就像上面的例子所示。所以,如果仅仅是给变量一次初始化就需要放到init函数内,甚至只使用常量,这在实际项目中是不可能的,程序中肯定要用到变量,而且要修改变量的值。所以,想通过不修改变量的方式来达到避免竞态的效果是不可行的,或者说这种解决方案可能只有极少数场景可行,比如数据库连接。

第二种避免数据竞态的方式是避免让多个goroutine读取一个变量。这种方法的思路是只让一个goroutine访问需要的变量,比如上面getNumber例子中的变量i,只让一个goroutine访问i,完成对i值的修改。那么,其他goroutine如何访问i呢?通过通道。可以通过对通道的读取或写入来完成对i的更新,请看下面的例子:


book/ch08/race/datarace.go
1. package singlegoroutine
2.
3. var realNum = make(chan int)//设置数字值
4. var delta = make(chan int)//设置的增减额
5.
6. func SetNumber(n int)  {
7.     realNum <- n
8. }
9. func ChangeByDelta(d int)  {
10.     delta <- d
11. }
12. func GetNumber() int{
13.     return <- realNum
14. }
15. func moitor()  {
16.     var i int //把数值限定在方法内,goroutine运行后仅在goroutine内可见
17.     for {
18.         select{
19.         case i = <- realNum:
20.         case d := <- delta:
21.             i += d
22.         case realNum <- i:
23.         }
24.     }
25. }
26. func init()  {
27.     go monitor()
28. }

第3行和第4行,定义两个int型通道,realNum通道用于读写i当前的值,要么是把i的值读入realNum,要么是把realNum的值写入i。

第6行至第8行,用SetNumber方法将参数n写入通道realNum。

第9行至第11行的ChangeByDelta方法是向delta通道写入参数d的值。

第12行至第14行的GetNumber方法是读取现在的i值,可以看到是从realNum通道读取的。

第15行至第25行是重点,该方法是通过select对realNum和delta两个通道的状态进行判断,进而完成对i的操作。第19行是realNum可读时则把realNum的值写入i,而realNum可读是在SetNumber方法执行完成之后,通过通道完成了对i的操作。同理,delta通道也是如此。

第26行至第28行是在init方法内运行一个goroutine来执行monitor方法,也就是在初始化的时候goroutine就开始执行。

这里开启一个goroutine专门负责通道与变量之间的映射操作,而对外提供的方法都是对通道的操作,这样可以避免多个goroutine同时操作一个变量的情况出现。

注意

Go语言的多线程模式所提倡的就是:不要通过共享内存进行通信,而是要通过通信来共享内存。通过上面的例子,读者应该能理解这句话的深刻含义。

前面第二种方法有着较为广泛的应用场景,虽然看上去每个变量都只有一个goroutine访问,但这个goroutine要存在于整个项目的生命周期,这是非常麻烦的。对此,我们可以进行方法的变通,即让变量在一个goroutine中只使用方法,这样就可以达到类似的效果。具体做法是借助通道把共享变量的地址从一个方法传递到下一步的方法上,从而让整个变量变成串行使用。即便是多个goroutine之间,共享变量也可以通过共享变量地址这种方式来实现。当然,要求在每一步操作完成以后就不再操作了。

来看一个简单的示例,如下:


type Meat struct{
    status string
}
func cook(Ball chan<- *Meat){
    for{
        ball := new(Meat)
        ball.status = "meat ball"
        Ball <- ball //后续不再访问ball变量
    }
}

func serve(service chan<- *Meat,balls <- chan *Meat){
    for ball := range balls{
        ball.status = "serve ball"
        service <- ball //后续不再访问ball
    }
}

这是一个肉丸子(balls)的例子,先定义肉(Meat),在cook方法内一块肉只用一次,做成丸子后,丸子内所使用的肉cook方法就不会继续使用了,因为直接使用丸子(ball)就可以,这就相当于有一次传递。接下来,通过serve方法上菜,对于已经成为丸子的肉也只访问一次。这样对于同一个Meat来说,每次只有一个方法在访问它,或者说Meat每一个时刻只受限于一个方法,可以以此类推,使用多个方法串行访问一个变量。

注意

对于上例这种受限方式,有个专有名词:串行受限。

第三种避免数据竞态的方法是传统的互斥访问方式,也属于共享内存式并发编程。这种互斥方式就是在第7章案例中所使用的方式。将互斥的方式细分,可以划分为多种,这部分内容放到8.2节进行详细介绍。