并发

寒江蓑笠翁大约 63 分钟

并发

Go语言对于并发的支持是纯天然的,这是这门语言的核心所在,其上手难度相对较小,开发人员不太需要关注底层实现就能做出一个相当不错的并发应用,提高了开发人员的下限。


协程

协程(coroutine)是一种轻量级的线程,或者说是用户态的线程,不受操作系统直接调度,由Go语言自身的调度器进行运行时调度,因此上下文切换开销非常小,这也是为什么Go的并发性能很不错的原因之一。协程这一概念并非Go首次提出,Go也不是第一个支持协程的语言,但Go是第一个能够将协程和并发支持的相当简洁和优雅的语言。


在Go中,创建一个协程十分的简单,仅需要一个go关键字,就能够快速开启一个协程,go关键字后面必须是一个函数调用。例子如下

提示

具有返回值的内置函数不允许跟随在go关键字后面,例如下面的错误示范

go make([]int.10) //  go discards result of make([]int, 10) (value of type []int)
func main() {
	go fmt.Println("hello world!")
	go hello()
	go func() {
		fmt.Println("hello world!")
	}()
}

func hello() {
	fmt.Println("hello world!")
}

以上三种开启协程的方式都是可以的,但是其实这个例子执行过后在大部分情况下什么都不会输出,协程是并发执行的,系统创建协程需要时间,而在此之前,主协程早已运行结束,一旦主线程退出,其他子协程也就自然退出了。并且协程的执行顺序也是不确定的,无法预判的,例如下面的例子

func main() {
	fmt.Println("start")
	for i := 0; i < 10; i++ {
		go fmt.Println(i)
	}
	fmt.Println("end")
}

这是一个在循环体中开启协程的例子,永远也无法精准的预判到它到底会输出什么。可能子协程还没开始运行,主协程就已经结束了,情况如下

start
end

又或者只有一部分子协程在主协程退出前成功运行,情况如下

start
0  
1  
5  
3  
4  
6  
7  
end

最简单的做法就是让主协程等一会儿,需要使用到time包下的Sleep函数,可以使当前协程暂停一段时间,例子如下

func main() {
	fmt.Println("start")
	for i := 0; i < 10; i++ {
		go fmt.Println(i)
	}
    // 暂停1ms
	time.Sleep(time.Millisecond)
	fmt.Println("end")
}

再次执行输出如下,可以看到所有的数字都完整输出了,没有遗漏

start
0
1
5
2
3
4
6
8
9
7
end

但是顺序还是乱的,因此让每次循环都稍微的等一下。例子如下

func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go fmt.Println(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

现在的输出已经是正常的顺序了

start
0
1
2
3
4
5
6
7
8
9
end

上面的例子中结果输出很完美,那么并发的问题解决了吗,不,一点也没有。对于并发的程序而言,不可控的因素非常多,执行的时机,先后顺序,执行过程的耗时等等,倘若循环中子协程的工作不只是一个简单的输出数字,而是一个非常巨大复杂的任务,耗时的不确定的,那么依旧会重现之前的问题。例如下方代码

func main() {
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      go hello(i)
      time.Sleep(time.Millisecond)
   }
   time.Sleep(time.Millisecond)
   fmt.Println("end")
}

func hello(i int) {
   // 模拟随机耗时
   time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
   fmt.Println(i)
}

这段代码的输出依旧是不确定的,下面是可能的情况之一

start
0
3
4
end

因此time.Sleep并不是一种良好的解决办法,幸运的是Go提供了非常多的并发控制手段,常用的并发控制方法有三种:

  • channel:管道
  • WaitGroup:信号量
  • Context:上下文

三种方法有着不同的适用情况,WaitGroup可以动态的控制一组指定数量的协程,Context更适合子孙协程嵌套层级更深的情况,管道更适合协程间通信。对于较为传统的锁控制,Go也对此提供了支持:

  • Mutex:互斥锁
  • RWMutex :读写互斥锁

管道

channel,译为管道,Go对于管道的作用如下解释:

Do not communicate by sharing memory; instead, share memory by communicating.

即通过消息来进行内存共享,channel就是为此而生,它是一种在协程间通信的解决方案,同时也可以用于并发控制,先来认识下channel的基本语法。Go中通过关键字chan来代表管道类型,同时也必须声明管道的存储类型,来指定其存储的数据是什么类型,下面的例子是一个普通管道的模样。

var ch chan int

这是一个管道的声明语句,此时管道还未初始化,其值为nil,不可以直接使用。


创建

在创建管道时,有且只有一种方法,那就是使用内置函数make,对于管道而言,make函数接收两个参数,第一个是管道的类型,第二个是可选参数为管道的缓冲大小。例子如下

intCh := make(chan int)
// 缓冲区大小为1的管道
strCh := make(chan string, 1)

在使用完一个管道后一定要记得关闭该管道,使用内置函数close来关闭一个管道,该函数签名如下。

func close(c chan<- Type)

一个关闭管道的例子如下

func main() {
	intCh := make(chan int)
	// do something
	close(intCh)
}

有些时候使用defer来关闭管道可能会更好。


读写

对于一个管道而言,Go使用了两种很形象的操作符来表示读写操作:

ch <-:表示对一个管道写入数据

<- ch:表示对一个管道读取数据

<-很生动的表示了数据的流动方向,来看一个对int类型的管道读写的例子

func main() {
    // 如果没有缓冲区则会导致死锁
	intCh := make(chan int, 1)
	defer close(intCh)
    // 写入数据
	intCh <- 114514
    // 读取数据
	fmt.Println(<-intCh)
}

上面的例子中创建了一个缓冲区大小为1的int型管道,对其写入数据114514,然后再读取数据并输出,最后关闭该管道。对于读取操作而言,还有第二个返回值,一个布尔类型的值,用于表示数据是否读取成功

ints, ok := <-intCh

管道中的数据流动方式与队列一样,即先进先出(FIFO),协程对于管道的操作是同步的,在某一个时刻,只有一个协程能够对其写入数据,同时也只有一个协程能够读取管道中的数据。


无缓冲

对于无缓冲管道而言,因为缓冲区容量为0,所以不会临时存放任何数据。正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据,否则就会阻塞等待,读取数据时也是同理,这也解释了为什么下面看起来很正常的代码会发生死锁。

func main() {
	// 创建无缓冲管道
	ch := make(chan int)
    defer close(ch)
	// 写入数据
	ch <- 123
	// 读取数据
	n := <-ch
	fmt.Println(n)
}

无缓冲管道不应该同步的使用,正确来说应该开启一个新的协程来发送数据,如下例

func main() {
	// 创建无缓冲管道
	ch := make(chan int)
	defer close(ch)
	go func() {
		// 写入数据
		ch <- 123
	}()
	// 读取数据
	n := <-ch
	fmt.Println(n)
}

有缓冲

当管道有了缓冲区,就像是一个阻塞队列一样,读取空的管道和写入已满的管道都会造成阻塞。无缓冲管道在发送数据时,必须立刻有人接收,否则就会一直阻塞。对于有缓冲管道则不必如此,对于有缓冲管道写入数据时,会先将数据放入缓冲区里,只有当缓冲区容量满了才会阻塞的等待协程来读取管道中的数据。同样的,读取有缓冲管道时,会先从缓冲区中读取数据,直到缓冲区没数据了,才会阻塞的等待协程来向管道中写入数据。因此,无缓冲管道中会造成死锁例子在这里可以顺利运行。

func main() {
   // 创建有缓冲管道
   ch := make(chan int, 1)
   defer close(ch)
   // 写入数据
   ch <- 123
   // 读取数据
   n := <-ch
   fmt.Println(n)
}

尽管可以顺利运行,但这种同步读写的方式是非常危险的,一旦管道缓冲区空了或者满了,将会永远阻塞下去,因为没有其他协程来向管道中写入或读取数据。来看看下面的一个例子

func main() {
	// 创建有缓冲管道
	ch := make(chan int, 5)
	// 创建两个无缓冲管道
	chW := make(chan struct{})
	chR := make(chan struct{})
	defer func() {
		close(ch)
		close(chW)
		close(chR)
	}()
	// 负责写
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
			fmt.Println("写入", i)
		}
		chW <- struct{}{}
	}()
	// 负责读
	go func() {
		for i := 0; i < 10; i++ {
            // 每次读取数据都需要花费1毫秒
			time.Sleep(time.Millisecond)
			fmt.Println("读取", <-ch)
		}
		chR <- struct{}{}
	}()
	fmt.Println("写入完毕", <-chW)
	fmt.Println("读取完毕", <-chR)
}

这里总共创建了3个管道,一个有缓冲管道用于协程间通信,两个无缓冲管道用于同步父子协程的执行顺序。负责读的协程每次读取之前都会等待1毫秒,负责写的协程一口气做多也只能写入5个数据,因为管道缓冲区最大只有5,在没有协程来读取之前,只能阻塞等待。所以该示例输出如下

写入 0
写入 1
写入 2
写入 3
写入 4 // 一下写了5个,缓冲区满了,等其他协程来读
读取 0
写入 5 // 读一个,写一个
读取 1
写入 6
读取 2
写入 7
读取 3
写入 8
写入 9
读取 4
写入完毕 {} // 所有的数据都发送完毕,写协程执行完毕
读取 5
读取 6
读取 7
读取 8
读取 9
读取完毕 {} // 所有的数据都读完了,读协程执行完毕

可以看到负责写的协程刚开始就一口气发送了5个数据,缓冲区满了以后就开始阻塞等待读协程来读取,后面就是每当读协程1毫秒读取一个数据,缓冲区有空位了,写协程就写入一个数据,直到所有数据发送完毕,写协程执行结束,随后当读协程将缓冲区所有数据读取完毕后,读协程也执行结束,最后主协程退出。

提示

通过内置函数len可以访问管道缓冲区中数据的个数,通过cap可以访问管道缓冲区的大小。

func main() {
   ch := make(chan int, 5)
   ch <- 1
   ch <- 2
   ch <- 3
   fmt.Println(len(ch), cap(ch))
}

输出

3 5

利用管道的阻塞条件,可以很轻易的写出一个主协程等待子协程执行完毕的例子

func main() {
   // 创建一个无缓冲管道
   ch := make(chan struct{})
   defer close(ch)
   go func() {
      fmt.Println(2)
      // 写入
      ch <- struct{}{}
   }()
   // 阻塞等待读取
   <-ch
   fmt.Println(1)
}

输出

2
1

通过有缓冲管道还可以实现一个简单的互斥锁,看下面的例子

var count = 0

// 缓冲区大小为1的管道
var lock = make(chan struct{}, 1)

func Add() {
    // 加锁
	lock <- struct{}{}
	fmt.Println("当前计数为", count, "执行加法")
	count += 1
    // 解锁
	<-lock
}

func Sub() {
    // 加锁
	lock <- struct{}{}
	fmt.Println("当前计数为", count, "执行减法")
	count -= 1
    // 解锁
	<-lock
}

由于管道的缓冲区大小为1,最多只有一个数据存放在缓冲区中。AddSub函数在每次操作前都会尝试向管道中发送数据,由于缓冲区大小为1,倘若有其他协程已经写入了数据,缓冲区已经满了,当前协程就必须阻塞等待,直到缓冲区空出位置来,如此一来,在某一个时刻,最多只能有一个协程对变量count进行修改,这样就实现了一个简单的互斥锁。


注意点

下面是一些总结,以下几种情况使用不当会导致管道阻塞:

读写无缓冲管道

当对一个无缓冲管道直接进行同步读写操作都会导致该协程阻塞

func main() {
   // 创建了一个无缓冲管道
   intCh := make(chan int)
   defer close(intCh)
   // 发送数据
   intCh <- 1
   // 读取数据
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

读取空缓冲区的管道

当读取一个缓冲区为空的管道时,会导致该协程阻塞

func main() {
   // 创建的有缓冲管道
   intCh := make(chan int, 1)
   defer close(intCh)
   // 缓冲区为空,阻塞等待其他协程写入数据
   ints, ok := <-intCh
   fmt.Println(ints, ok)
}

写入满缓冲区的管道

当管道的缓冲区已满,对其写入数据会导致该协程阻塞

func main() {
	// 创建的有缓冲管道
	intCh := make(chan int, 1)
	defer close(intCh)
	
	intCh <- 1
    // 满了,阻塞等待其他协程来读取数据
	intCh <- 1
}

管道为nil

当管道为nil时,无论怎样读写都会导致当前协程阻塞

func main() {
	var intCh chan int
    // 写
	intCh <- 1
}
func main() {
	var intCh chan int
    // 读
	fmt.Println(<-intCh)
}

关于管道阻塞的条件需要好好掌握和熟悉,大多数情况下这些问题隐藏的十分隐蔽,并不会像例子中那样直观。


以下几种情况还会导致panic

关闭一个nil管道

当管道为nil时,使用close函数对其进行关闭操作会导致panic`

func main() {
	var intCh chan int
	close(intCh)
}

写入已关闭的管道

对一个已关闭的管道写入数据会导致panic

func main() {
	intCh := make(chan int, 1)
	close(intCh)
	intCh <- 1
}

关闭已关闭的管道

在一些情况中,管道可能经过层层传递,调用者或许也不知道到底该由谁来关闭管道,如此一来,可能会发生关闭一个已经关闭了的管道,就会发生panic

func main() {
	ch := make(chan int, 1)
	defer close(ch)
	go write(ch)
	fmt.Println(<-ch)
}

func write(ch chan<- int) {
	// 只能对管道发送数据
	ch <- 1
	close(ch)
}

单向管道

双向管道指的是既可以写,也可以读,即可以在管道两边进行操作。单向管道指的是只读或只写的管道,即只能在管道的一边进行操作。手动创建的一个只读或只写的管道没有什么太大的意义,因为不能对管道读写就失去了其存在的作用。单向管道通常是用来限制通道的行为,一般会在函数的形参和返回值中出现,例如用于关闭通道的内置函数close的函数签名就用到了单向通道。

func close(c chan<- Type)

又或者说常用到的time包下的After函数

func After(d Duration) <-chan Time 

close函数的形参是一个只写通道,After函数的返回值是一个只读通道,所以单向通道的语法如下:

  • 箭头符号<-在前,就是只读通道,如<-chan int
  • 箭头符号<-在后,就是只写通道,如chan<- string

当尝试对只读的管道写入数据时,将会无法通过编译

func main() {
	timeCh := time.After(time.Second)
	timeCh <- time.Now()
}

报错如下,意思非常明确

invalid operation: cannot send to receive-only channel timeCh (variable of type <-chan time.Time)

对只写的管道读取数据也是同理。


双向管道可以转换为单向管道,反过来则不可以。通常情况下,将双向管道传给某个协程或函数并且不希望它读取/发送数据,就可以用到单向管道来限制另一方的行为。

func main() {
   ch := make(chan int, 1)
   go write(ch)
   fmt.Println(<-ch)
}

func write(ch chan<- int) {
   // 只能对管道发送数据
   ch <- 1
}

只读管道也是一样的道理

提示

chan是引用类型,即便Go的函数参数是值传递,但其引用依旧是同一个,这一点会在后续的管道原理中说明。

for range

通过for range语句,可以遍历读取缓冲管道中的数据,如下例

func main() {
	ch := make(chan int, 10)
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
	}()
	for n := range ch {
		fmt.Println(n)
	}
}

通常来说,for range遍历其他可迭代数据结构时,会有两个返回值,第一个是索引,第二个元素值,但是对于管道而言,有且仅有一个返回值,就是缓冲区的元素值,for range会遍历读取管道缓冲区中的元素,当管道缓冲区为空时,就会阻塞等待,直到有其他协程向管道中写入数据才会继续读取数据,所以输出如下

0
1                                                           
2                                                           
3                                                           
4                                                           
5                                                           
6                                                           
7                                                           
8                                                           
9                                                           
fatal error: all goroutines are asleep - deadlock!

可以看到上面的代码发生了死锁,因为子协程已经执行完毕了,而主协程还在阻塞等待其他协程来向管道中写入数据,所以应该管道在写入完毕后将其关闭。修改为如下代码

func main() {
   ch := make(chan int, 10)
   go func() {
      for i := 0; i < 10; i++ {
         ch <- i
      }
      // 关闭管道
      close(ch)
   }()
   for n := range ch {
      fmt.Println(n)
   }
}

写完后关闭管道,上述代码便不再会发生死锁。前面提到过读取管道是有两个返回值的,for range遍历管道时,当无法成功读取数据时,便会退出循环。第二个返回值指的是能否成功读取数据,而不是管道是否已经关闭,即便管道已经关闭,对于有缓冲管道而言,依旧可以读取数据,并且第二个返回值仍然为true。看下面的一个例子

func main() {
	ch := make(chan int, 10)
	for i := 0; i < 5; i++ {
		ch <- i
	}
    // 关闭管道
	close(ch)
    // 再读取数据
	for i := 0; i < 6; i++ {
		n, ok := <-ch
		fmt.Println(n, ok)
	}
}

输出结果

0 true
1 true 
2 true 
3 true 
4 true 
0 false

由于管道已经关闭了,即便缓冲区为空,再读取数据也不会导致当前协程阻塞,可以看到在第六次遍历的时候读取的是零值,并且okfalse

提示

关于管道关闭的时机,应该尽量在向管道发送数据的那一方关闭管道,而不要在接收方关闭管道,因为大多数情况下接收方只知道接收数据,并不知道该在什么时候关闭管道。


select

select在Linux系统中,是一种IO多路复用的解决方案,类似的,在Go中,select是一种管道多路复用的控制结构。什么是多路复用,简单的用一句话概括:在某一时刻,同时监测多个元素是否可用,被监测的可以是网络请求,文件IO等。在Go中的select监测的元素就是管道,且只能是管道。select的语法与switch语句类似,下面看看一个select语句长什么样

func main() {
    // 创建三个管道
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
    defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()
	select {
	case n, ok := <-chA:
		fmt.Println(n, ok)
	case n, ok := <-chB:
		fmt.Println(n, ok)
	case n, ok := <-chC:
		fmt.Println(n, ok)
	default:
		fmt.Println("所有管道都不可用")
	}
}

switch类似,select由多个case和一个default组成,default分支可以省略。每一个case只能操作一个管道,且只能进行一种操作,要么读要么写,当有多个case可用时,select会伪随机的选择一个case来执行。如果所有case都不可用,就会执行default分支,倘若没有default分支,将会阻塞等待,直到至少有一个case可用。由于上例中没有对管道写入数据,自然所有的case都不可用,所以最终输出为default分支的执行结果。稍微修改下后如下:

func main() {
   chA := make(chan int)
   chB := make(chan int)
   chC := make(chan int)
   defer func() {
      close(chA)
      close(chB)
      close(chC)
   }()
   // 开启一个新的协程
   go func() {
      // 向A管道写入数据
      chA <- 1
   }()
   select {
   case n, ok := <-chA:
      fmt.Println(n, ok)
   case n, ok := <-chB:
      fmt.Println(n, ok)
   case n, ok := <-chC:
      fmt.Println(n, ok)
   }
}

上例开启了一个新的协程来向管道A写入数据,select由于没有默认分支,所以会一直阻塞等待直到有case可用。当管道A可用时,执行完对应分支后主协程就直接退出了。要想一直监测管道,可以配合for循环使用,如下。

func main() {
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
	defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()
	go Send(chA)
	go Send(chB)
	go Send(chC)
    // for循环
	for {
		select {
		case n, ok := <-chA:
			fmt.Println("A", n, ok)
		case n, ok := <-chB:
			fmt.Println("B", n, ok)
		case n, ok := <-chC:
			fmt.Println("B", n, ok)
		}
	}
}

func Send(ch chan<- int) {
	for i := 0; i < 3; i++ {
		time.Sleep(time.Millisecond)
		ch <- i
	}
}

这样确实三个管道都能用上了,但是死循环+select会导致主协程永久阻塞,所以可以将其单独放到新协程中,并且加上一些其他的逻辑。

func main() {
   chA := make(chan int)
   chB := make(chan int)
   chC := make(chan int)
   defer func() {
      close(chA)
      close(chB)
      close(chC)
   }()

   l := make(chan struct{})

   go Send(chA)
   go Send(chB)
   go Send(chC)
    
   go func() {
   Loop:
      for {
         select {
         case n, ok := <-chA:
            fmt.Println("A", n, ok)
         case n, ok := <-chB:
            fmt.Println("B", n, ok)
         case n, ok := <-chC:
            fmt.Println("B", n, ok)
         case <-time.After(time.Second): // 设置1秒的超时时间
            break Loop // 退出循环
         }
      }
      l <- struct{}{} // 告诉主协程可以退出了
   }()
    
   <-l
}

func Send(ch chan<- int) {
   for i := 0; i < 3; i++ {
      time.Sleep(time.Millisecond)
      ch <- i
   }
}

上例中通过for循环配合select来一直监测三个管道是否可以用,并且第四个case是一个超时管道,超时过后便会退出循环,结束子协程。最终输出如下

B 0 true
B 0 true
A 0 true
A 1 true
B 1 true
B 1 true
B 2 true
A 2 true
B 2 true

超时

上一个例子用到了time.After函数,其返回值是一个只读的管道,该函数配合select使用可以非常简单的实现超时机制,例子如下

func main() {
	chA := make(chan int)
	defer close(chA)
	go func() {
		time.Sleep(time.Second * 2)
		chA <- 1
	}()
	select {
	case n := <-chA:
		fmt.Println(n)
	case <-time.After(time.Second):
		fmt.Println("超时")
	}
}

永久阻塞

select语句中什么都没有时,就会永久阻塞,例如

func main() {
	fmt.Println("start")
	select {}
	fmt.Println("end")
}

end永远也不会输出,主协程会一直阻塞,这种情况一般是有特殊用途。

提示

selectcase中对值为nil的管道进行操作的话,并不会导致阻塞,该case则会被忽略,永远也不会被执行。例如下方代码无论执行多少次都只会输出timeout。

func main() {
   var nilCh chan int
   select {
   case <-nilCh:
      fmt.Println("read")
   case nilCh <- 1:
      fmt.Println("write")
   case <-time.After(time.Second):
      fmt.Println("timeout")
   }
}

WaitGroup

sync.WaitGroupsync包下提供的一个结构体,WaitGroup即等待执行,使用它可以很轻易的实现等待一组协程的效果。该结构体只对外暴露三个方法。

Add方法用于指明要等待的协程的数量

func (wg *WaitGroup) Add(delta int)

Done方法表示当前协程已经执行完毕

func (wg *WaitGroup) Done()

Wait方法等待子协程结束,否则就阻塞

func (wg *WaitGroup) Wait()

WaitGroup使用起来十分简单,属于开箱即用。其内部的实现是计数器+信号量,程序开始时调用Add初始化计数,每当一个协程执行完毕时调用Done,计数就-1,直到减为0,而在此期间,主协程调用Wait 会一直阻塞直到全部计数减为0,然后才会被唤醒。看一个简单的使用例子

func main() {
	var wait sync.WaitGroup
	// 指定子协程的数量
	wait.Add(1)
	go func() {
		fmt.Println(1)
		// 执行完毕
		wait.Done()
	}()
	// 等待子协程
	wait.Wait()
	fmt.Println(2)
}

这段代码永远都是先输出1再输出2,主协程会等待子协程执行完毕后再退出。

1
2

针对协程介绍中最开始的例子,可以做出如下修改

func main() {
   var mainWait sync.WaitGroup
   var wait sync.WaitGroup
   // 计数10
   mainWait.Add(10)
   fmt.Println("start")
   for i := 0; i < 10; i++ {
      // 循环内计数1
      wait.Add(1)
      go func() {
         fmt.Println(i)
         // 两个计数-1
         wait.Done()
         mainWait.Done()
      }()
      // 等待当前循环的协程执行完毕
      wait.Wait()
   }
   // 等待所有的协程执行完毕
   mainWait.Wait()
   fmt.Println("end")
}

这里使用了sync.WaitGroup替代了原先的time.Sleep,协程并发执行的的顺序更加可控,不管执行多少次,输出都如下

start
0  
1  
2  
3  
4  
5  
6  
7  
8  
9  
end

WaitGroup通常适用于可动态调整协程数量的时候,例如事先知晓协程的数量,又或者在运行过程中需要动态调整。WaitGroup的值不应该被复制,复制后的值也不应该继续使用,尤其是将其作为函数参数传递时,因该传递指针而不是值。倘若使用复制的值,计数完全无法作用到真正的WaitGroup上,这可能会导致主协程一直阻塞等待,程序将无法正常运行。例如下方的代码

func main() {
	var mainWait sync.WaitGroup
	mainWait.Add(1)
	hello(mainWait)
	mainWait.Wait()
	fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
	fmt.Println("hello")
	wait.Done()
}

错误提示所有的协程都已经退出,但主协程依旧在等待,这就形成了死锁,因为hello函数内部对一个形参WaitGroup调用Done并不会作用到原来的mainWait上,所以应该使用指针来进行传递。

hello
fatal error: all goroutines are asleep - deadlock!  

提示

当计数变为负数,或者计数数量大于子协程数量时,将会引发panic

Context

Context译为上下文,是Go提供的一种并发控制的解决方案,相比于管道和WaitGroup,它可以更好的控制子孙协程以及层级更深的协程。Context本身是一个接口,只要实现了该接口都可以称之为上下文例如著名Web框架Gin中的gin.Contextcontext标准库也提供了几个实现,分别是:

  • emptyCtx
  • cancelCtx
  • timerCtx
  • valueCtx

Context

先来看看Context接口的定义,再去了解它的具体实现。

type Context interface {
   
   Deadline() (deadline time.Time, ok bool)

   Done() <-chan struct{}

   Err() error

   Value(key any) any
}

Deadline

该方法具有两个返回值,deadline是截止时间,即上下文应该取消的时间。第二个值是是否设置deadline,如果没有设置则一直为false

Deadline() (deadline time.Time, ok bool)

Done

其返回值是一个空结构体类型的只读管道,该管道仅仅起到通知作用,不传递任何数据。当上下文所做的工作应该取消时,该通道就会被关闭,对于一些不支持取消的上下文,可能会返回nil

Done() <-chan struct{}

Err

该方法返回一个error,用于表示上下关闭的原因。当Done管道没有关闭时,返回nil,如果关闭过后,会返回一个err来解释为什么会关闭。

Err() error

Value

该方法返回对应的键值,如果key不存在,或者不支持该方法,就会返回nil

Value(key any) any

emptyCtx

顾名思义,emptyCtx就是空的上下文,context包下所有的实现都是不对外暴露的,但是提供了对应的函数来创建上下文。emptyCtx就可以通过context.Backgroundcontext.TODO来进行创建。两个函数如下

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

可以看到仅仅只是返回了emptyCtx指针。emptyCtx的底层类型实际上是一个int,之所以不使用空结构体,是因为emptyCtx的实例必须要有不同的内存地址,它没法被取消,没有deadline,也不能取值,实现的方法都是返回零值。

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
   return
}

func (*emptyCtx) Done() <-chan struct{} {
   return nil
}

func (*emptyCtx) Err() error {
   return nil
}

func (*emptyCtx) Value(key any) any {
   return nil
}

emptyCtx通常是用来当作最顶层的上下文,在创建其他三种上下文时作为父上下文传入。context包中的各个实现关系如下图所示

valueCtx

valueCtx实现比较简单,其内部只包含一对键值对,和一个内嵌的Context类型的字段。

type valueCtx struct {
   Context
   key, val any
}

其本身只实现了Value方法,逻辑也很简单,当前上下文找不到就去父上下文找。

func (c *valueCtx) Value(key any) any {
   if c.key == key {
      return c.val
   }
   return value(c.Context, key)
}

下面看一个valueCtx的简单使用案例

var waitGroup sync.WaitGroup

func main() {
	waitGroup.Add(1)
    // 传入上下文
	go Do(context.WithValue(context.Background(), 1, 2))
	waitGroup.Wait()
}

func Do(ctx context.Context) {
    // 新建定时器
	ticker := time.NewTimer(time.Second)
	defer waitGroup.Done()
	for {
		select {
		case <-ctx.Done(): // 永远也不会执行
		case <-ticker.C:
			fmt.Println("timeout")
			return
		default:
			fmt.Println(ctx.Value(1))
		}
		time.Sleep(time.Millisecond * 100)
	}
}

valueCtx多用于在多级协程中传递一些数据,无法被取消,因此ctx.Done永远会返回nilselect会忽略掉nil管道。最后输出如下

2
2
2
2
2
2
2
2
2
2
timeout

cancelCtx

cancelCtx以及timerCtx都实现了canceler接口,接口类型如下

type canceler interface {
    // removeFromParent 表示是否从父上下文中删除自身
    // err 表示取消的原因
	cancel(removeFromParent bool, err error)
    // Done 返回一个管道,用于通知取消的原因
	Done() <-chan struct{}
}

cancel方法不对外暴露,在创建上下文时通过闭包将其包装为返回值以供外界调用,就如context.WithCancel源代码中所示

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   c := newCancelCtx(parent)
   // 尝试将自身添加进父级的children中
   propagateCancel(parent, &c)
   // 返回context和一个函数
   return &c, func() { c.cancel(true, Canceled) }
}

cancelCtx译为可取消的上下文,创建时,如果父级实现了canceler,就会将自身添加进父级的children中,否则就一直向上查找。如果所有的父级都没有实现canceler,就会启动一个协程等待父级取消,然后当父级结束时取消当前上下文。当调用cancelFunc时,Done通道将会关闭,该上下文的任何子级也会随之取消,最后会将自身从父级中删除。下面是一个简单的示例:

var waitGroup sync.WaitGroup

func main() {
	bkg := context.Background()
    // 返回了一个cancelCtx和cancel函数
	cancelCtx, cancel := context.WithCancel(bkg)
	waitGroup.Add(1)
	go func(ctx context.Context) {
		defer waitGroup.Done()
		for {
			select {
			case <-ctx.Done():
				fmt.Println(ctx.Err())
				return
			default:
				fmt.Println("等待取消中...")
			}
			time.Sleep(time.Millisecond * 200)
		}

	}(cancelCtx)
	time.Sleep(time.Second)
	cancel()
	waitGroup.Wait()
}

输出如下

等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
context canceled

再来一个层级嵌套深一点的示例

var waitGroup sync.WaitGroup

func main() {
   waitGroup.Add(3)
   ctx, cancelFunc := context.WithCancel(context.Background())
   go HttpHandler(ctx)
   time.Sleep(time.Second)
   cancelFunc()
   waitGroup.Wait()
}

func HttpHandler(ctx context.Context) {
   cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
   cancelCtxMail, cancelMail := context.WithCancel(ctx)

   defer cancelAuth()
   defer cancelMail()
   defer waitGroup.Done()

   go AuthService(cancelCtxAuth)
   go MailService(cancelCtxMail)

   for {
      select {
      case <-ctx.Done():
         fmt.Println(ctx.Err())
         return
      default:
         fmt.Println("正在处理http请求...")
      }
      time.Sleep(time.Millisecond * 200)
   }

}

func AuthService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("auth 父级取消", ctx.Err())
         return
      default:
         fmt.Println("auth...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

func MailService(ctx context.Context) {
   defer waitGroup.Done()
   for {
      select {
      case <-ctx.Done():
         fmt.Println("mail 父级取消", ctx.Err())
         return
      default:
         fmt.Println("mail...")
      }
      time.Sleep(time.Millisecond * 200)
   }
}

例子中创建了3个cancelCtx,尽管父级cancelCtx在取消的同时会取消它的子上下文,但是保险起见,如果创建了一个cancelCtx,在相应的流程结束后就应该调用cancel函数。输出如下

正在处理http请求...
auth...
mail...
mail...
auth...
正在处理http请求...
auth...
mail...
正在处理http请求...
正在处理http请求...
auth...
mail...
auth...
正在处理http请求...
mail...
context canceled
auth 父级取消 context canceled
mail 父级取消 context canceled

timerCtx

timerCtxcancelCtx 的基础之上增加了超时机制,context包下提供了两种创建的函数,分别是WithDeadlineWithTimeout,两者功能类似,前者是指定一个具体的超时时间,比如指定一个具体时间2023/3/20 16:32:00,后者是指定一个超时的时间间隔,比如5分钟后。两个函数的签名如下

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

timerCtx会在时间到期后自动取消当前上下文,取消的流程除了要额外的关闭timer之外,基本与cancelCtx一致。下面是一个简单的timerCtx的使用示例

var wait sync.WaitGroup

func main() {
	deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
	defer cancel()
	wait.Add(1)
	go func(ctx context.Context) {
		defer wait.Done()
		for {
			select {
			case <-ctx.Done():
				fmt.Println("上下文取消", ctx.Err())
				return
			default:
				fmt.Println("等待取消中...")
			}
			time.Sleep(time.Millisecond * 200)
		}
	}(deadline)
	wait.Wait()
}

尽管上下文到期会自动取消,但是为了保险起见,在相关流程结束后,最好手动取消上下文。输出如下

等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
上下文取消 context deadline exceeded

WithTimeout其实与WithDealine非常相似,它的实现也只是稍微封装了一下并调用WithDeadline,和上面例子中的WithDeadline用法一样,如下

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
   return WithDeadline(parent, time.Now().Add(timeout))
}

提示

就跟内存分配后不回收会造成内存泄漏一样,上下文也是一种资源,如果创建了但从来不取消,一样会造成上下文泄露,所以最好避免此种情况的发生。


先来看看的一个例子

var wait sync.WaitGroup
var count = 0

func main() {
   wait.Add(10)
   for i := 0; i < 10; i++ {
      go func(data *int) {
         // 模拟访问耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         // 访问数据
         temp := *data
         // 模拟计算耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         ans := 1
         // 修改数据
         *data = temp + ans
         fmt.Println(*data)
         wait.Done()
      }(&count)
   }
   wait.Wait()
   fmt.Println("最终结果", count)
}

对于上面的例子,开启了十个协程来对count进行+1操作,并且使用了time.Sleep来模拟不同的耗时,根据直觉来讲,10个协程执行10个+1操作,最终结果一定是10,正确结果也确实是10,但事实并非如此,上面的例子执行结果如下:

1
2
3
3
2
2
3
3
3
4
最终结果 4

可以看到最终结果为4,而这只是众多可能的结果中的一种。由于每个协程访问和计算所需的时间不同,A协程访问数据耗费500毫秒,此时访问到的count值为1,随后又花费了400毫秒计算,但在这400毫秒内,B协程已经完成了访问和计算并成功更新了count的值,A协程在计算完毕后,A协程最初访问到的值已经过时了,但A协程并不知道这件事,依旧在原先访问到的值基础上加一,并赋值给count,这样一来,B协程的执行结果被覆盖了。多个协程读取和访问一个共享数据时,尤其会发生这样的问题,为此就需要用到锁。

Go中sync包下的MutexRWMutex提供了互斥锁与读写锁两种实现,且提供了非常简单易用的API,加锁只需要Lock(),解锁也只需要Unlock()。需要注意的是,Go所提供的锁都是非递归锁,也就是不可重入锁,所以重复加锁或重复解锁都会导致死锁。


互斥锁

sync.Mutex是Go提供的互斥锁实现,其实现了sync.Locker接口

type Locker interface {
   // 加锁
   Lock()
   // 解锁
   Unlock()
}

使用互斥锁可以非常完美的解决上述问题,例子如下

var wait sync.WaitGroup
var count = 0

var lock sync.Mutex

func main() {
	wait.Add(10)
	for i := 0; i < 10; i++ {
		go func(data *int) {
			// 加锁
			lock.Lock()
			// 模拟访问耗时
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			// 访问数据
			temp := *data
			// 模拟计算耗时
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			ans := 1
			// 修改数据
			*data = temp + ans
			// 解锁
			lock.Unlock()
			fmt.Println(*data)
			wait.Done()
		}(&count)
	}
	wait.Wait()
	fmt.Println("最终结果", count)
}

每一个协程在访问数据前,都先上锁,更新完成后再解锁,其他协程想要访问就必须要先获得锁,否则就阻塞等待。如此一来,就不存在上述问题了,所以输出如下

1
2
3
4
5
6
7
8
9
10
最终结果 10

读写锁

互斥锁适合读操作与写操作频率都差不多的情况,对于一些读多写少的数据,如果使用互斥锁,会造成大量的不必要的协程竞争锁,这会消耗很多的系统资源,这时候就需要用到读写锁,即读写互斥锁,对于一个协程而言:

  • 如果获得了读锁,其他协程进行写操作时会阻塞,其他协程进行读操作时不会阻塞
  • 如果获得了写锁,其他协程进行写操作时会阻塞,其他协程进行读操作时会阻塞

Go中读写互斥锁的实现是sync.RWMutex,它也同样实现了Locker接口,但它提供了更多可用的方法,如下:

// 加读锁
func (rw *RWMutex) RLock() 

// 尝试加读锁
func (rw *RWMutex) TryRLock() bool 

// 解读锁
func (rw *RWMutex) RUnlock() 

// 加写锁
func (rw *RWMutex) Lock()

// 尝试加写锁
func (rw *RWMutex) TryLock() bool 

// 解写锁
func (rw *RWMutex) Unlock()

其中TryRlockTryLock两个尝试加锁的操作是非阻塞式的,成功加锁会返回true,无法获得锁时并不会阻塞而是返回false。读写互斥锁内部实现依旧是互斥锁,并且从始至终都只有一个锁,并不是说分读锁和写锁就有两个锁。

条件变量

同步

原子操作