并发
并发
Go语言对于并发的支持是纯天然的,这是这门语言的核心所在,其上手难度相对较小,开发人员不太需要关注底层实现就能做出一个相当不错的并发应用,提高了开发人员的下限。
协程
协程(coroutine)是一种轻量级的线程,或者说是用户态的线程,不受操作系统直接调度,由Go语言自身的调度器进行运行时调度,因此上下文切换开销非常小,这也是为什么Go的并发性能很不错的原因之一。协程这一概念并非Go首次提出,Go也不是第一个支持协程的语言,但Go是第一个能够将协程和并发支持的相当简洁和优雅的语言。
在Go中,创建一个协程十分的简单,仅需要一个go
关键字,就能够快速开启一个协程,go
关键字后面必须是一个函数调用。例子如下
提示
具有返回值的内置函数不允许跟随在go
关键字后面,例如下面的错误示范
go make([]int.10) // go discards result of make([]int, 10) (value of type []int)
func main() {
go fmt.Println("hello world!")
go hello()
go func() {
fmt.Println("hello world!")
}()
}
func hello() {
fmt.Println("hello world!")
}
以上三种开启协程的方式都是可以的,但是其实这个例子执行过后在大部分情况下什么都不会输出,协程是并发执行的,系统创建协程需要时间,而在此之前,主协程早已运行结束,一旦主线程退出,其他子协程也就自然退出了。并且协程的执行顺序也是不确定的,无法预判的,例如下面的例子
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
fmt.Println("end")
}
这是一个在循环体中开启协程的例子,永远也无法精准的预判到它到底会输出什么。可能子协程还没开始运行,主协程就已经结束了,情况如下
start
end
又或者只有一部分子协程在主协程退出前成功运行,情况如下
start
0
1
5
3
4
6
7
end
最简单的做法就是让主协程等一会儿,需要使用到time
包下的Sleep
函数,可以使当前协程暂停一段时间,例子如下
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
// 暂停1ms
time.Sleep(time.Millisecond)
fmt.Println("end")
}
再次执行输出如下,可以看到所有的数字都完整输出了,没有遗漏
start
0
1
5
2
3
4
6
8
9
7
end
但是顺序还是乱的,因此让每次循环都稍微的等一下。例子如下
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}
现在的输出已经是正常的顺序了
start
0
1
2
3
4
5
6
7
8
9
end
上面的例子中结果输出很完美,那么并发的问题解决了吗,不,一点也没有。对于并发的程序而言,不可控的因素非常多,执行的时机,先后顺序,执行过程的耗时等等,倘若循环中子协程的工作不只是一个简单的输出数字,而是一个非常巨大复杂的任务,耗时的不确定的,那么依旧会重现之前的问题。例如下方代码
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go hello(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}
func hello(i int) {
// 模拟随机耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println(i)
}
这段代码的输出依旧是不确定的,下面是可能的情况之一
start
0
3
4
end
因此time.Sleep
并不是一种良好的解决办法,幸运的是Go提供了非常多的并发控制手段,常用的并发控制方法有三种:
channel
:管道WaitGroup
:信号量Context
:上下文
三种方法有着不同的适用情况,WaitGroup
可以动态的控制一组指定数量的协程,Context
更适合子孙协程嵌套层级更深的情况,管道更适合协程间通信。对于较为传统的锁控制,Go也对此提供了支持:
Mutex
:互斥锁RWMutex
:读写互斥锁
管道
channel
,译为管道,Go对于管道的作用如下解释:
Do not communicate by sharing memory; instead, share memory by communicating.
即通过消息来进行内存共享,channel
就是为此而生,它是一种在协程间通信的解决方案,同时也可以用于并发控制,先来认识下channel
的基本语法。Go中通过关键字chan
来代表管道类型,同时也必须声明管道的存储类型,来指定其存储的数据是什么类型,下面的例子是一个普通管道的模样。
var ch chan int
这是一个管道的声明语句,此时管道还未初始化,其值为nil
,不可以直接使用。
创建
在创建管道时,有且只有一种方法,那就是使用内置函数make
,对于管道而言,make
函数接收两个参数,第一个是管道的类型,第二个是可选参数为管道的缓冲大小。例子如下
intCh := make(chan int)
// 缓冲区大小为1的管道
strCh := make(chan string, 1)
在使用完一个管道后一定要记得关闭该管道,使用内置函数close
来关闭一个管道,该函数签名如下。
func close(c chan<- Type)
一个关闭管道的例子如下
func main() {
intCh := make(chan int)
// do something
close(intCh)
}
有些时候使用defer
来关闭管道可能会更好。
读写
对于一个管道而言,Go使用了两种很形象的操作符来表示读写操作:
ch <-
:表示对一个管道写入数据
<- ch
:表示对一个管道读取数据
<-
很生动的表示了数据的流动方向,来看一个对int
类型的管道读写的例子
func main() {
// 如果没有缓冲区则会导致死锁
intCh := make(chan int, 1)
defer close(intCh)
// 写入数据
intCh <- 114514
// 读取数据
fmt.Println(<-intCh)
}
上面的例子中创建了一个缓冲区大小为1的int
型管道,对其写入数据114514
,然后再读取数据并输出,最后关闭该管道。对于读取操作而言,还有第二个返回值,一个布尔类型的值,用于表示数据是否读取成功
ints, ok := <-intCh
管道中的数据流动方式与队列一样,即先进先出(FIFO),协程对于管道的操作是同步的,在某一个时刻,只有一个协程能够对其写入数据,同时也只有一个协程能够读取管道中的数据。
无缓冲
对于无缓冲管道而言,因为缓冲区容量为0,所以不会临时存放任何数据。正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据,否则就会阻塞等待,读取数据时也是同理,这也解释了为什么下面看起来很正常的代码会发生死锁。
func main() {
// 创建无缓冲管道
ch := make(chan int)
defer close(ch)
// 写入数据
ch <- 123
// 读取数据
n := <-ch
fmt.Println(n)
}
无缓冲管道不应该同步的使用,正确来说应该开启一个新的协程来发送数据,如下例
func main() {
// 创建无缓冲管道
ch := make(chan int)
defer close(ch)
go func() {
// 写入数据
ch <- 123
}()
// 读取数据
n := <-ch
fmt.Println(n)
}
有缓冲
当管道有了缓冲区,就像是一个阻塞队列一样,读取空的管道和写入已满的管道都会造成阻塞。无缓冲管道在发送数据时,必须立刻有人接收,否则就会一直阻塞。对于有缓冲管道则不必如此,对于有缓冲管道写入数据时,会先将数据放入缓冲区里,只有当缓冲区容量满了才会阻塞的等待协程来读取管道中的数据。同样的,读取有缓冲管道时,会先从缓冲区中读取数据,直到缓冲区没数据了,才会阻塞的等待协程来向管道中写入数据。因此,无缓冲管道中会造成死锁例子在这里可以顺利运行。
func main() {
// 创建有缓冲管道
ch := make(chan int, 1)
defer close(ch)
// 写入数据
ch <- 123
// 读取数据
n := <-ch
fmt.Println(n)
}
尽管可以顺利运行,但这种同步读写的方式是非常危险的,一旦管道缓冲区空了或者满了,将会永远阻塞下去,因为没有其他协程来向管道中写入或读取数据。来看看下面的一个例子
func main() {
// 创建有缓冲管道
ch := make(chan int, 5)
// 创建两个无缓冲管道
chW := make(chan struct{})
chR := make(chan struct{})
defer func() {
close(ch)
close(chW)
close(chR)
}()
// 负责写
go func() {
for i := 0; i < 10; i++ {
ch <- i
fmt.Println("写入", i)
}
chW <- struct{}{}
}()
// 负责读
go func() {
for i := 0; i < 10; i++ {
// 每次读取数据都需要花费1毫秒
time.Sleep(time.Millisecond)
fmt.Println("读取", <-ch)
}
chR <- struct{}{}
}()
fmt.Println("写入完毕", <-chW)
fmt.Println("读取完毕", <-chR)
}
这里总共创建了3个管道,一个有缓冲管道用于协程间通信,两个无缓冲管道用于同步父子协程的执行顺序。负责读的协程每次读取之前都会等待1毫秒,负责写的协程一口气做多也只能写入5个数据,因为管道缓冲区最大只有5,在没有协程来读取之前,只能阻塞等待。所以该示例输出如下
写入 0
写入 1
写入 2
写入 3
写入 4 // 一下写了5个,缓冲区满了,等其他协程来读
读取 0
写入 5 // 读一个,写一个
读取 1
写入 6
读取 2
写入 7
读取 3
写入 8
写入 9
读取 4
写入完毕 {} // 所有的数据都发送完毕,写协程执行完毕
读取 5
读取 6
读取 7
读取 8
读取 9
读取完毕 {} // 所有的数据都读完了,读协程执行完毕
可以看到负责写的协程刚开始就一口气发送了5个数据,缓冲区满了以后就开始阻塞等待读协程来读取,后面就是每当读协程1毫秒读取一个数据,缓冲区有空位了,写协程就写入一个数据,直到所有数据发送完毕,写协程执行结束,随后当读协程将缓冲区所有数据读取完毕后,读协程也执行结束,最后主协程退出。
提示
通过内置函数len
可以访问管道缓冲区中数据的个数,通过cap
可以访问管道缓冲区的大小。
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(len(ch), cap(ch))
}
输出
3 5
利用管道的阻塞条件,可以很轻易的写出一个主协程等待子协程执行完毕的例子
func main() {
// 创建一个无缓冲管道
ch := make(chan struct{})
defer close(ch)
go func() {
fmt.Println(2)
// 写入
ch <- struct{}{}
}()
// 阻塞等待读取
<-ch
fmt.Println(1)
}
输出
2
1
通过有缓冲管道还可以实现一个简单的互斥锁,看下面的例子
var count = 0
// 缓冲区大小为1的管道
var lock = make(chan struct{}, 1)
func Add() {
// 加锁
lock <- struct{}{}
fmt.Println("当前计数为", count, "执行加法")
count += 1
// 解锁
<-lock
}
func Sub() {
// 加锁
lock <- struct{}{}
fmt.Println("当前计数为", count, "执行减法")
count -= 1
// 解锁
<-lock
}
由于管道的缓冲区大小为1,最多只有一个数据存放在缓冲区中。Add
和Sub
函数在每次操作前都会尝试向管道中发送数据,由于缓冲区大小为1,倘若有其他协程已经写入了数据,缓冲区已经满了,当前协程就必须阻塞等待,直到缓冲区空出位置来,如此一来,在某一个时刻,最多只能有一个协程对变量count
进行修改,这样就实现了一个简单的互斥锁。
注意点
下面是一些总结,以下几种情况使用不当会导致管道阻塞:
读写无缓冲管道
当对一个无缓冲管道直接进行同步读写操作都会导致该协程阻塞
func main() {
// 创建了一个无缓冲管道
intCh := make(chan int)
defer close(intCh)
// 发送数据
intCh <- 1
// 读取数据
ints, ok := <-intCh
fmt.Println(ints, ok)
}
读取空缓冲区的管道
当读取一个缓冲区为空的管道时,会导致该协程阻塞
func main() {
// 创建的有缓冲管道
intCh := make(chan int, 1)
defer close(intCh)
// 缓冲区为空,阻塞等待其他协程写入数据
ints, ok := <-intCh
fmt.Println(ints, ok)
}
写入满缓冲区的管道
当管道的缓冲区已满,对其写入数据会导致该协程阻塞
func main() {
// 创建的有缓冲管道
intCh := make(chan int, 1)
defer close(intCh)
intCh <- 1
// 满了,阻塞等待其他协程来读取数据
intCh <- 1
}
管道为nil
当管道为nil
时,无论怎样读写都会导致当前协程阻塞
func main() {
var intCh chan int
// 写
intCh <- 1
}
func main() {
var intCh chan int
// 读
fmt.Println(<-intCh)
}
关于管道阻塞的条件需要好好掌握和熟悉,大多数情况下这些问题隐藏的十分隐蔽,并不会像例子中那样直观。
以下几种情况还会导致panic
:
关闭一个nil
管道
当管道为nil
时,使用close
函数对其进行关闭操作会导致panic`
func main() {
var intCh chan int
close(intCh)
}
写入已关闭的管道
对一个已关闭的管道写入数据会导致panic
func main() {
intCh := make(chan int, 1)
close(intCh)
intCh <- 1
}
关闭已关闭的管道
在一些情况中,管道可能经过层层传递,调用者或许也不知道到底该由谁来关闭管道,如此一来,可能会发生关闭一个已经关闭了的管道,就会发生panic
。
func main() {
ch := make(chan int, 1)
defer close(ch)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
// 只能对管道发送数据
ch <- 1
close(ch)
}
单向管道
双向管道指的是既可以写,也可以读,即可以在管道两边进行操作。单向管道指的是只读或只写的管道,即只能在管道的一边进行操作。手动创建的一个只读或只写的管道没有什么太大的意义,因为不能对管道读写就失去了其存在的作用。单向管道通常是用来限制通道的行为,一般会在函数的形参和返回值中出现,例如用于关闭通道的内置函数close
的函数签名就用到了单向通道。
func close(c chan<- Type)
又或者说常用到的time
包下的After
函数
func After(d Duration) <-chan Time
close
函数的形参是一个只写通道,After
函数的返回值是一个只读通道,所以单向通道的语法如下:
- 箭头符号
<-
在前,就是只读通道,如<-chan int
- 箭头符号
<-
在后,就是只写通道,如chan<- string
当尝试对只读的管道写入数据时,将会无法通过编译
func main() {
timeCh := time.After(time.Second)
timeCh <- time.Now()
}
报错如下,意思非常明确
invalid operation: cannot send to receive-only channel timeCh (variable of type <-chan time.Time)
对只写的管道读取数据也是同理。
双向管道可以转换为单向管道,反过来则不可以。通常情况下,将双向管道传给某个协程或函数并且不希望它读取/发送数据,就可以用到单向管道来限制另一方的行为。
func main() {
ch := make(chan int, 1)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
// 只能对管道发送数据
ch <- 1
}
只读管道也是一样的道理
提示
chan
是引用类型,即便Go的函数参数是值传递,但其引用依旧是同一个,这一点会在后续的管道原理中说明。
for range
通过for range
语句,可以遍历读取缓冲管道中的数据,如下例
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
for n := range ch {
fmt.Println(n)
}
}
通常来说,for range
遍历其他可迭代数据结构时,会有两个返回值,第一个是索引,第二个元素值,但是对于管道而言,有且仅有一个返回值,就是缓冲区的元素值,for range
会遍历读取管道缓冲区中的元素,当管道缓冲区为空时,就会阻塞等待,直到有其他协程向管道中写入数据才会继续读取数据,所以输出如下
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!
可以看到上面的代码发生了死锁,因为子协程已经执行完毕了,而主协程还在阻塞等待其他协程来向管道中写入数据,所以应该管道在写入完毕后将其关闭。修改为如下代码
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
// 关闭管道
close(ch)
}()
for n := range ch {
fmt.Println(n)
}
}
写完后关闭管道,上述代码便不再会发生死锁。前面提到过读取管道是有两个返回值的,for range
遍历管道时,当无法成功读取数据时,便会退出循环。第二个返回值指的是能否成功读取数据,而不是管道是否已经关闭,即便管道已经关闭,对于有缓冲管道而言,依旧可以读取数据,并且第二个返回值仍然为true
。看下面的一个例子
func main() {
ch := make(chan int, 10)
for i := 0; i < 5; i++ {
ch <- i
}
// 关闭管道
close(ch)
// 再读取数据
for i := 0; i < 6; i++ {
n, ok := <-ch
fmt.Println(n, ok)
}
}
输出结果
0 true
1 true
2 true
3 true
4 true
0 false
由于管道已经关闭了,即便缓冲区为空,再读取数据也不会导致当前协程阻塞,可以看到在第六次遍历的时候读取的是零值,并且ok
为false
。
提示
关于管道关闭的时机,应该尽量在向管道发送数据的那一方关闭管道,而不要在接收方关闭管道,因为大多数情况下接收方只知道接收数据,并不知道该在什么时候关闭管道。
select
select
在Linux系统中,是一种IO多路复用的解决方案,类似的,在Go中,select
是一种管道多路复用的控制结构。什么是多路复用,简单的用一句话概括:在某一时刻,同时监测多个元素是否可用,被监测的可以是网络请求,文件IO等。在Go中的select
监测的元素就是管道,且只能是管道。select
的语法与switch
语句类似,下面看看一个select
语句长什么样
func main() {
// 创建三个管道
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
default:
fmt.Println("所有管道都不可用")
}
}
与switch
类似,select
由多个case
和一个default
组成,default
分支可以省略。每一个case
只能操作一个管道,且只能进行一种操作,要么读要么写,当有多个case
可用时,select
会伪随机的选择一个case
来执行。如果所有case
都不可用,就会执行default
分支,倘若没有default
分支,将会阻塞等待,直到至少有一个case
可用。由于上例中没有对管道写入数据,自然所有的case
都不可用,所以最终输出为default
分支的执行结果。稍微修改下后如下:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
// 开启一个新的协程
go func() {
// 向A管道写入数据
chA <- 1
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
}
}
上例开启了一个新的协程来向管道A写入数据,select
由于没有默认分支,所以会一直阻塞等待直到有case
可用。当管道A可用时,执行完对应分支后主协程就直接退出了。要想一直监测管道,可以配合for
循环使用,如下。
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
go Send(chA)
go Send(chB)
go Send(chC)
// for循环
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("B", n, ok)
}
}
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}
这样确实三个管道都能用上了,但是死循环+select
会导致主协程永久阻塞,所以可以将其单独放到新协程中,并且加上一些其他的逻辑。
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
l := make(chan struct{})
go Send(chA)
go Send(chB)
go Send(chC)
go func() {
Loop:
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("B", n, ok)
case <-time.After(time.Second): // 设置1秒的超时时间
break Loop // 退出循环
}
}
l <- struct{}{} // 告诉主协程可以退出了
}()
<-l
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}
上例中通过for
循环配合select
来一直监测三个管道是否可以用,并且第四个case
是一个超时管道,超时过后便会退出循环,结束子协程。最终输出如下
B 0 true
B 0 true
A 0 true
A 1 true
B 1 true
B 1 true
B 2 true
A 2 true
B 2 true
超时
上一个例子用到了time.After
函数,其返回值是一个只读的管道,该函数配合select
使用可以非常简单的实现超时机制,例子如下
func main() {
chA := make(chan int)
defer close(chA)
go func() {
time.Sleep(time.Second * 2)
chA <- 1
}()
select {
case n := <-chA:
fmt.Println(n)
case <-time.After(time.Second):
fmt.Println("超时")
}
}
永久阻塞
当select
语句中什么都没有时,就会永久阻塞,例如
func main() {
fmt.Println("start")
select {}
fmt.Println("end")
}
end
永远也不会输出,主协程会一直阻塞,这种情况一般是有特殊用途。
提示
在select
的case
中对值为nil
的管道进行操作的话,并不会导致阻塞,该case
则会被忽略,永远也不会被执行。例如下方代码无论执行多少次都只会输出timeout。
func main() {
var nilCh chan int
select {
case <-nilCh:
fmt.Println("read")
case nilCh <- 1:
fmt.Println("write")
case <-time.After(time.Second):
fmt.Println("timeout")
}
}
WaitGroup
sync.WaitGroup
是sync
包下提供的一个结构体,WaitGroup
即等待执行,使用它可以很轻易的实现等待一组协程的效果。该结构体只对外暴露三个方法。
Add
方法用于指明要等待的协程的数量
func (wg *WaitGroup) Add(delta int)
Done
方法表示当前协程已经执行完毕
func (wg *WaitGroup) Done()
Wait
方法等待子协程结束,否则就阻塞
func (wg *WaitGroup) Wait()
WaitGroup
使用起来十分简单,属于开箱即用。其内部的实现是计数器+信号量,程序开始时调用Add
初始化计数,每当一个协程执行完毕时调用Done
,计数就-1,直到减为0,而在此期间,主协程调用Wait
会一直阻塞直到全部计数减为0,然后才会被唤醒。看一个简单的使用例子
func main() {
var wait sync.WaitGroup
// 指定子协程的数量
wait.Add(1)
go func() {
fmt.Println(1)
// 执行完毕
wait.Done()
}()
// 等待子协程
wait.Wait()
fmt.Println(2)
}
这段代码永远都是先输出1再输出2,主协程会等待子协程执行完毕后再退出。
1
2
针对协程介绍中最开始的例子,可以做出如下修改
func main() {
var mainWait sync.WaitGroup
var wait sync.WaitGroup
// 计数10
mainWait.Add(10)
fmt.Println("start")
for i := 0; i < 10; i++ {
// 循环内计数1
wait.Add(1)
go func() {
fmt.Println(i)
// 两个计数-1
wait.Done()
mainWait.Done()
}()
// 等待当前循环的协程执行完毕
wait.Wait()
}
// 等待所有的协程执行完毕
mainWait.Wait()
fmt.Println("end")
}
这里使用了sync.WaitGroup
替代了原先的time.Sleep
,协程并发执行的的顺序更加可控,不管执行多少次,输出都如下
start
0
1
2
3
4
5
6
7
8
9
end
WaitGroup
通常适用于可动态调整协程数量的时候,例如事先知晓协程的数量,又或者在运行过程中需要动态调整。WaitGroup
的值不应该被复制,复制后的值也不应该继续使用,尤其是将其作为函数参数传递时,因该传递指针而不是值。倘若使用复制的值,计数完全无法作用到真正的WaitGroup
上,这可能会导致主协程一直阻塞等待,程序将无法正常运行。例如下方的代码
func main() {
var mainWait sync.WaitGroup
mainWait.Add(1)
hello(mainWait)
mainWait.Wait()
fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
fmt.Println("hello")
wait.Done()
}
错误提示所有的协程都已经退出,但主协程依旧在等待,这就形成了死锁,因为hello
函数内部对一个形参WaitGroup
调用Done
并不会作用到原来的mainWait
上,所以应该使用指针来进行传递。
hello
fatal error: all goroutines are asleep - deadlock!
提示
当计数变为负数,或者计数数量大于子协程数量时,将会引发panic
。
Context
Context
译为上下文,是Go提供的一种并发控制的解决方案,相比于管道和WaitGroup
,它可以更好的控制子孙协程以及层级更深的协程。Context
本身是一个接口,只要实现了该接口都可以称之为上下文例如著名Web框架Gin
中的gin.Context
。context
标准库也提供了几个实现,分别是:
emptyCtx
cancelCtx
timerCtx
valueCtx
Context
先来看看Context
接口的定义,再去了解它的具体实现。
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key any) any
}
Deadline
该方法具有两个返回值,deadline
是截止时间,即上下文应该取消的时间。第二个值是是否设置deadline
,如果没有设置则一直为false
。
Deadline() (deadline time.Time, ok bool)
Done
其返回值是一个空结构体类型的只读管道,该管道仅仅起到通知作用,不传递任何数据。当上下文所做的工作应该取消时,该通道就会被关闭,对于一些不支持取消的上下文,可能会返回nil
。
Done() <-chan struct{}
Err
该方法返回一个error
,用于表示上下关闭的原因。当Done
管道没有关闭时,返回nil
,如果关闭过后,会返回一个err
来解释为什么会关闭。
Err() error
Value
该方法返回对应的键值,如果key
不存在,或者不支持该方法,就会返回nil
。
Value(key any) any
emptyCtx
顾名思义,emptyCtx
就是空的上下文,context
包下所有的实现都是不对外暴露的,但是提供了对应的函数来创建上下文。emptyCtx
就可以通过context.Background
和context.TODO
来进行创建。两个函数如下
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
可以看到仅仅只是返回了emptyCtx
指针。emptyCtx
的底层类型实际上是一个int
,之所以不使用空结构体,是因为emptyCtx
的实例必须要有不同的内存地址,它没法被取消,没有deadline
,也不能取值,实现的方法都是返回零值。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
emptyCtx
通常是用来当作最顶层的上下文,在创建其他三种上下文时作为父上下文传入。context包中的各个实现关系如下图所示
valueCtx
valueCtx
实现比较简单,其内部只包含一对键值对,和一个内嵌的Context
类型的字段。
type valueCtx struct {
Context
key, val any
}
其本身只实现了Value
方法,逻辑也很简单,当前上下文找不到就去父上下文找。
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
下面看一个valueCtx
的简单使用案例
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(1)
// 传入上下文
go Do(context.WithValue(context.Background(), 1, 2))
waitGroup.Wait()
}
func Do(ctx context.Context) {
// 新建定时器
ticker := time.NewTimer(time.Second)
defer waitGroup.Done()
for {
select {
case <-ctx.Done(): // 永远也不会执行
case <-ticker.C:
fmt.Println("timeout")
return
default:
fmt.Println(ctx.Value(1))
}
time.Sleep(time.Millisecond * 100)
}
}
valueCtx
多用于在多级协程中传递一些数据,无法被取消,因此ctx.Done
永远会返回nil
,select
会忽略掉nil
管道。最后输出如下
2
2
2
2
2
2
2
2
2
2
timeout
cancelCtx
cancelCtx
以及timerCtx
都实现了canceler
接口,接口类型如下
type canceler interface {
// removeFromParent 表示是否从父上下文中删除自身
// err 表示取消的原因
cancel(removeFromParent bool, err error)
// Done 返回一个管道,用于通知取消的原因
Done() <-chan struct{}
}
cancel
方法不对外暴露,在创建上下文时通过闭包将其包装为返回值以供外界调用,就如context.WithCancel
源代码中所示
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
// 尝试将自身添加进父级的children中
propagateCancel(parent, &c)
// 返回context和一个函数
return &c, func() { c.cancel(true, Canceled) }
}
cancelCtx
译为可取消的上下文,创建时,如果父级实现了canceler
,就会将自身添加进父级的children
中,否则就一直向上查找。如果所有的父级都没有实现canceler
,就会启动一个协程等待父级取消,然后当父级结束时取消当前上下文。当调用cancelFunc
时,Done
通道将会关闭,该上下文的任何子级也会随之取消,最后会将自身从父级中删除。下面是一个简单的示例:
var waitGroup sync.WaitGroup
func main() {
bkg := context.Background()
// 返回了一个cancelCtx和cancel函数
cancelCtx, cancel := context.WithCancel(bkg)
waitGroup.Add(1)
go func(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("等待取消中...")
}
time.Sleep(time.Millisecond * 200)
}
}(cancelCtx)
time.Sleep(time.Second)
cancel()
waitGroup.Wait()
}
输出如下
等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
context canceled
再来一个层级嵌套深一点的示例
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(3)
ctx, cancelFunc := context.WithCancel(context.Background())
go HttpHandler(ctx)
time.Sleep(time.Second)
cancelFunc()
waitGroup.Wait()
}
func HttpHandler(ctx context.Context) {
cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
cancelCtxMail, cancelMail := context.WithCancel(ctx)
defer cancelAuth()
defer cancelMail()
defer waitGroup.Done()
go AuthService(cancelCtxAuth)
go MailService(cancelCtxMail)
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("正在处理http请求...")
}
time.Sleep(time.Millisecond * 200)
}
}
func AuthService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("auth 父级取消", ctx.Err())
return
default:
fmt.Println("auth...")
}
time.Sleep(time.Millisecond * 200)
}
}
func MailService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("mail 父级取消", ctx.Err())
return
default:
fmt.Println("mail...")
}
time.Sleep(time.Millisecond * 200)
}
}
例子中创建了3个cancelCtx
,尽管父级cancelCtx
在取消的同时会取消它的子上下文,但是保险起见,如果创建了一个cancelCtx
,在相应的流程结束后就应该调用cancel
函数。输出如下
正在处理http请求...
auth...
mail...
mail...
auth...
正在处理http请求...
auth...
mail...
正在处理http请求...
正在处理http请求...
auth...
mail...
auth...
正在处理http请求...
mail...
context canceled
auth 父级取消 context canceled
mail 父级取消 context canceled
timerCtx
timerCtx
在cancelCtx
的基础之上增加了超时机制,context
包下提供了两种创建的函数,分别是WithDeadline
和WithTimeout
,两者功能类似,前者是指定一个具体的超时时间,比如指定一个具体时间2023/3/20 16:32:00
,后者是指定一个超时的时间间隔,比如5分钟后。两个函数的签名如下
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
timerCtx
会在时间到期后自动取消当前上下文,取消的流程除了要额外的关闭timer
之外,基本与cancelCtx
一致。下面是一个简单的timerCtx
的使用示例
var wait sync.WaitGroup
func main() {
deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
defer cancel()
wait.Add(1)
go func(ctx context.Context) {
defer wait.Done()
for {
select {
case <-ctx.Done():
fmt.Println("上下文取消", ctx.Err())
return
default:
fmt.Println("等待取消中...")
}
time.Sleep(time.Millisecond * 200)
}
}(deadline)
wait.Wait()
}
尽管上下文到期会自动取消,但是为了保险起见,在相关流程结束后,最好手动取消上下文。输出如下
等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
上下文取消 context deadline exceeded
WithTimeout
其实与WithDealine
非常相似,它的实现也只是稍微封装了一下并调用WithDeadline
,和上面例子中的WithDeadline
用法一样,如下
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
提示
就跟内存分配后不回收会造成内存泄漏一样,上下文也是一种资源,如果创建了但从来不取消,一样会造成上下文泄露,所以最好避免此种情况的发生。
锁
先来看看的一个例子
var wait sync.WaitGroup
var count = 0
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
// 模拟访问耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
// 访问数据
temp := *data
// 模拟计算耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
ans := 1
// 修改数据
*data = temp + ans
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("最终结果", count)
}
对于上面的例子,开启了十个协程来对count
进行+1
操作,并且使用了time.Sleep
来模拟不同的耗时,根据直觉来讲,10个协程执行10个+1
操作,最终结果一定是10
,正确结果也确实是10
,但事实并非如此,上面的例子执行结果如下:
1
2
3
3
2
2
3
3
3
4
最终结果 4
可以看到最终结果为4,而这只是众多可能的结果中的一种。由于每个协程访问和计算所需的时间不同,A协程访问数据耗费500毫秒,此时访问到的count
值为1,随后又花费了400毫秒计算,但在这400毫秒内,B协程已经完成了访问和计算并成功更新了count
的值,A协程在计算完毕后,A协程最初访问到的值已经过时了,但A协程并不知道这件事,依旧在原先访问到的值基础上加一,并赋值给count
,这样一来,B协程的执行结果被覆盖了。多个协程读取和访问一个共享数据时,尤其会发生这样的问题,为此就需要用到锁。
Go中sync
包下的Mutex
与RWMutex
提供了互斥锁与读写锁两种实现,且提供了非常简单易用的API,加锁只需要Lock()
,解锁也只需要Unlock()
。需要注意的是,Go所提供的锁都是非递归锁,也就是不可重入锁,所以重复加锁或重复解锁都会导致死锁。
互斥锁
sync.Mutex
是Go提供的互斥锁实现,其实现了sync.Locker
接口
type Locker interface {
// 加锁
Lock()
// 解锁
Unlock()
}
使用互斥锁可以非常完美的解决上述问题,例子如下
var wait sync.WaitGroup
var count = 0
var lock sync.Mutex
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
// 加锁
lock.Lock()
// 模拟访问耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
// 访问数据
temp := *data
// 模拟计算耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
ans := 1
// 修改数据
*data = temp + ans
// 解锁
lock.Unlock()
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("最终结果", count)
}
每一个协程在访问数据前,都先上锁,更新完成后再解锁,其他协程想要访问就必须要先获得锁,否则就阻塞等待。如此一来,就不存在上述问题了,所以输出如下
1
2
3
4
5
6
7
8
9
10
最终结果 10
读写锁
互斥锁适合读操作与写操作频率都差不多的情况,对于一些读多写少的数据,如果使用互斥锁,会造成大量的不必要的协程竞争锁,这会消耗很多的系统资源,这时候就需要用到读写锁,即读写互斥锁,对于一个协程而言:
- 如果获得了读锁,其他协程进行写操作时会阻塞,其他协程进行读操作时不会阻塞
- 如果获得了写锁,其他协程进行写操作时会阻塞,其他协程进行读操作时会阻塞
Go中读写互斥锁的实现是sync.RWMutex
,它也同样实现了Locker
接口,但它提供了更多可用的方法,如下:
// 加读锁
func (rw *RWMutex) RLock()
// 尝试加读锁
func (rw *RWMutex) TryRLock() bool
// 解读锁
func (rw *RWMutex) RUnlock()
// 加写锁
func (rw *RWMutex) Lock()
// 尝试加写锁
func (rw *RWMutex) TryLock() bool
// 解写锁
func (rw *RWMutex) Unlock()
其中TryRlock
与TryLock
两个尝试加锁的操作是非阻塞式的,成功加锁会返回true
,无法获得锁时并不会阻塞而是返回false
。读写互斥锁内部实现依旧是互斥锁,并且从始至终都只有一个锁,并不是说分读锁和写锁就有两个锁。