并发
并发
Go语言对于并发的支持是纯天然的,这是这门语言的核心所在,其上手难度相对较小,开发人员不太需要关注底层实现就能做出一个相当不错的并发应用,提高了开发人员的下限。
协程
协程(coroutine)是一种轻量级的线程,或者说是用户态的线程,不受操作系统直接调度,由Go语言自身的调度器进行运行时调度,因此上下文切换开销非常小,这也是为什么Go的并发性能很不错的原因之一。协程这一概念并非Go首次提出,Go也不是第一个支持协程的语言,但Go是第一个能够将协程和并发支持的相当简洁和优雅的语言。
在Go中,创建一个协程十分的简单,仅需要一个go关键字,就能够快速开启一个协程,go关键字后面必须是一个函数调用。例子如下
提示
具有返回值的内置函数不允许跟随在go关键字后面,例如下面的错误示范
go make([]int.10) // go discards result of make([]int, 10) (value of type []int)
func main() {
go fmt.Println("hello world!")
go hello()
go func() {
fmt.Println("hello world!")
}()
}
func hello() {
fmt.Println("hello world!")
}
以上三种开启协程的方式都是可以的,但是其实这个例子执行过后在大部分情况下什么都不会输出,协程是并发执行的,系统创建协程需要时间,而在此之前,主协程早已运行结束,一旦主线程退出,其他子协程也就自然退出了。并且协程的执行顺序也是不确定的,无法预判的,例如下面的例子
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
fmt.Println("end")
}
这是一个在循环体中开启协程的例子,永远也无法精准的预判到它到底会输出什么。可能子协程还没开始运行,主协程就已经结束了,情况如下
start
end
又或者只有一部分子协程在主协程退出前成功运行,情况如下
start
0
1
5
3
4
6
7
end
最简单的做法就是让主协程等一会儿,需要使用到time包下的Sleep函数,可以使当前协程暂停一段时间,例子如下
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
// 暂停1ms
time.Sleep(time.Millisecond)
fmt.Println("end")
}
再次执行输出如下,可以看到所有的数字都完整输出了,没有遗漏
start
0
1
5
2
3
4
6
8
9
7
end
但是顺序还是乱的,因此让每次循环都稍微的等一下。例子如下
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go fmt.Println(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}
现在的输出已经是正常的顺序了
start
0
1
2
3
4
5
6
7
8
9
end
上面的例子中结果输出很完美,那么并发的问题解决了吗,不,一点也没有。对于并发的程序而言,不可控的因素非常多,执行的时机,先后顺序,执行过程的耗时等等,倘若循环中子协程的工作不只是一个简单的输出数字,而是一个非常巨大复杂的任务,耗时的不确定的,那么依旧会重现之前的问题。例如下方代码
func main() {
fmt.Println("start")
for i := 0; i < 10; i++ {
go hello(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
fmt.Println("end")
}
func hello(i int) {
// 模拟随机耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println(i)
}
这段代码的输出依旧是不确定的,下面是可能的情况之一
start
0
3
4
end
因此time.Sleep并不是一种良好的解决办法,幸运的是Go提供了非常多的并发控制手段,常用的并发控制方法有三种:
channel:管道WaitGroup:信号量Context:上下文
三种方法有着不同的适用情况,WaitGroup可以动态的控制一组指定数量的协程,Context更适合子孙协程嵌套层级更深的情况,管道更适合协程间通信。对于较为传统的锁控制,Go也对此提供了支持:
Mutex:互斥锁RWMutex:读写互斥锁
管道
channel,译为管道,Go对于管道的作用如下解释:
Do not communicate by sharing memory; instead, share memory by communicating.
即通过消息来进行内存共享,channel就是为此而生,它是一种在协程间通信的解决方案,同时也可以用于并发控制,先来认识下channel的基本语法。Go中通过关键字chan来代表管道类型,同时也必须声明管道的存储类型,来指定其存储的数据是什么类型,下面的例子是一个普通管道的模样。
var ch chan int
这是一个管道的声明语句,此时管道还未初始化,其值为nil,不可以直接使用。
创建
在创建管道时,有且只有一种方法,那就是使用内置函数make,对于管道而言,make函数接收两个参数,第一个是管道的类型,第二个是可选参数为管道的缓冲大小。例子如下
intCh := make(chan int)
// 缓冲区大小为1的管道
strCh := make(chan string, 1)
在使用完一个管道后一定要记得关闭该管道,使用内置函数close来关闭一个管道,该函数签名如下。
func close(c chan<- Type)
一个关闭管道的例子如下
func main() {
intCh := make(chan int)
// do something
close(intCh)
}
有些时候使用defer来关闭管道可能会更好。
读写
对于一个管道而言,Go使用了两种很形象的操作符来表示读写操作:
ch <-:表示对一个管道写入数据
<- ch:表示对一个管道读取数据
<-很生动的表示了数据的流动方向,来看一个对int类型的管道读写的例子
func main() {
// 如果没有缓冲区则会导致死锁
intCh := make(chan int, 1)
defer close(intCh)
// 写入数据
intCh <- 114514
// 读取数据
fmt.Println(<-intCh)
}
上面的例子中创建了一个缓冲区大小为1的int型管道,对其写入数据114514,然后再读取数据并输出,最后关闭该管道。对于读取操作而言,还有第二个返回值,一个布尔类型的值,用于表示数据是否读取成功
ints, ok := <-intCh
管道中的数据流动方式与队列一样,即先进先出(FIFO),协程对于管道的操作是同步的,在某一个时刻,只有一个协程能够对其写入数据,同时也只有一个协程能够读取管道中的数据。
无缓冲
对于无缓冲管道而言,因为缓冲区容量为0,所以不会临时存放任何数据。正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据,否则就会阻塞等待,读取数据时也是同理,这也解释了为什么下面看起来很正常的代码会发生死锁。
func main() {
// 创建无缓冲管道
ch := make(chan int)
defer close(ch)
// 写入数据
ch <- 123
// 读取数据
n := <-ch
fmt.Println(n)
}
无缓冲管道不应该同步的使用,正确来说应该开启一个新的协程来发送数据,如下例
func main() {
// 创建无缓冲管道
ch := make(chan int)
defer close(ch)
go func() {
// 写入数据
ch <- 123
}()
// 读取数据
n := <-ch
fmt.Println(n)
}
有缓冲
当管道有了缓冲区,就像是一个阻塞队列一样,读取空的管道和写入已满的管道都会造成阻塞。无缓冲管道在发送数据时,必须立刻有人接收,否则就会一直阻塞。对于有缓冲管道则不必如此,对于有缓冲管道写入数据时,会先将数据放入缓冲区里,只有当缓冲区容量满了才会阻塞的等待协程来读取管道中的数据。同样的,读取有缓冲管道时,会先从缓冲区中读取数据,直到缓冲区没数据了,才会阻塞的等待协程来向管道中写入数据。因此,无缓冲管道中会造成死锁例子在这里可以顺利运行。
func main() {
// 创建有缓冲管道
ch := make(chan int, 1)
defer close(ch)
// 写入数据
ch <- 123
// 读取数据
n := <-ch
fmt.Println(n)
}
尽管可以顺利运行,但这种同步读写的方式是非常危险的,一旦管道缓冲区空了或者满了,将会永远阻塞下去,因为没有其他协程来向管道中写入或读取数据。来看看下面的一个例子
func main() {
// 创建有缓冲管道
ch := make(chan int, 5)
// 创建两个无缓冲管道
chW := make(chan struct{})
chR := make(chan struct{})
defer func() {
close(ch)
close(chW)
close(chR)
}()
// 负责写
go func() {
for i := 0; i < 10; i++ {
ch <- i
fmt.Println("写入", i)
}
chW <- struct{}{}
}()
// 负责读
go func() {
for i := 0; i < 10; i++ {
// 每次读取数据都需要花费1毫秒
time.Sleep(time.Millisecond)
fmt.Println("读取", <-ch)
}
chR <- struct{}{}
}()
fmt.Println("写入完毕", <-chW)
fmt.Println("读取完毕", <-chR)
}
这里总共创建了3个管道,一个有缓冲管道用于协程间通信,两个无缓冲管道用于同步父子协程的执行顺序。负责读的协程每次读取之前都会等待1毫秒,负责写的协程一口气做多也只能写入5个数据,因为管道缓冲区最大只有5,在没有协程来读取之前,只能阻塞等待。所以该示例输出如下
写入 0
写入 1
写入 2
写入 3
写入 4 // 一下写了5个,缓冲区满了,等其他协程来读
读取 0
写入 5 // 读一个,写一个
读取 1
写入 6
读取 2
写入 7
读取 3
写入 8
写入 9
读取 4
写入完毕 {} // 所有的数据都发送完毕,写协程执行完毕
读取 5
读取 6
读取 7
读取 8
读取 9
读取完毕 {} // 所有的数据都读完了,读协程执行完毕
可以看到负责写的协程刚开始就一口气发送了5个数据,缓冲区满了以后就开始阻塞等待读协程来读取,后面就是每当读协程1毫秒读取一个数据,缓冲区有空位了,写协程就写入一个数据,直到所有数据发送完毕,写协程执行结束,随后当读协程将缓冲区所有数据读取完毕后,读协程也执行结束,最后主协程退出。
提示
通过内置函数len可以访问管道缓冲区中数据的个数,通过cap可以访问管道缓冲区的大小。
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
ch <- 3
fmt.Println(len(ch), cap(ch))
}
输出
3 5
利用管道的阻塞条件,可以很轻易的写出一个主协程等待子协程执行完毕的例子
func main() {
// 创建一个无缓冲管道
ch := make(chan struct{})
defer close(ch)
go func() {
fmt.Println(2)
// 写入
ch <- struct{}{}
}()
// 阻塞等待读取
<-ch
fmt.Println(1)
}
输出
2
1
通过有缓冲管道还可以实现一个简单的互斥锁,看下面的例子
var count = 0
// 缓冲区大小为1的管道
var lock = make(chan struct{}, 1)
func Add() {
// 加锁
lock <- struct{}{}
fmt.Println("当前计数为", count, "执行加法")
count += 1
// 解锁
<-lock
}
func Sub() {
// 加锁
lock <- struct{}{}
fmt.Println("当前计数为", count, "执行减法")
count -= 1
// 解锁
<-lock
}
由于管道的缓冲区大小为1,最多只有一个数据存放在缓冲区中。Add和Sub函数在每次操作前都会尝试向管道中发送数据,由于缓冲区大小为1,倘若有其他协程已经写入了数据,缓冲区已经满了,当前协程就必须阻塞等待,直到缓冲区空出位置来,如此一来,在某一个时刻,最多只能有一个协程对变量count进行修改,这样就实现了一个简单的互斥锁。
注意点
下面是一些总结,以下几种情况使用不当会导致管道阻塞:
读写无缓冲管道
当对一个无缓冲管道直接进行同步读写操作都会导致该协程阻塞
func main() {
// 创建了一个无缓冲管道
intCh := make(chan int)
defer close(intCh)
// 发送数据
intCh <- 1
// 读取数据
ints, ok := <-intCh
fmt.Println(ints, ok)
}
读取空缓冲区的管道
当读取一个缓冲区为空的管道时,会导致该协程阻塞
func main() {
// 创建的有缓冲管道
intCh := make(chan int, 1)
defer close(intCh)
// 缓冲区为空,阻塞等待其他协程写入数据
ints, ok := <-intCh
fmt.Println(ints, ok)
}
写入满缓冲区的管道
当管道的缓冲区已满,对其写入数据会导致该协程阻塞
func main() {
// 创建的有缓冲管道
intCh := make(chan int, 1)
defer close(intCh)
intCh <- 1
// 满了,阻塞等待其他协程来读取数据
intCh <- 1
}
管道为nil
当管道为nil时,无论怎样读写都会导致当前协程阻塞
func main() {
var intCh chan int
// 写
intCh <- 1
}
func main() {
var intCh chan int
// 读
fmt.Println(<-intCh)
}
关于管道阻塞的条件需要好好掌握和熟悉,大多数情况下这些问题隐藏的十分隐蔽,并不会像例子中那样直观。
以下几种情况还会导致panic:
关闭一个nil管道
当管道为nil时,使用close函数对其进行关闭操作会导致panic`
func main() {
var intCh chan int
close(intCh)
}
写入已关闭的管道
对一个已关闭的管道写入数据会导致panic
func main() {
intCh := make(chan int, 1)
close(intCh)
intCh <- 1
}
关闭已关闭的管道
在一些情况中,管道可能经过层层传递,调用者或许也不知道到底该由谁来关闭管道,如此一来,可能会发生关闭一个已经关闭了的管道,就会发生panic。
func main() {
ch := make(chan int, 1)
defer close(ch)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
// 只能对管道发送数据
ch <- 1
close(ch)
}
单向管道
双向管道指的是既可以写,也可以读,即可以在管道两边进行操作。单向管道指的是只读或只写的管道,即只能在管道的一边进行操作。手动创建的一个只读或只写的管道没有什么太大的意义,因为不能对管道读写就失去了其存在的作用。单向管道通常是用来限制通道的行为,一般会在函数的形参和返回值中出现,例如用于关闭通道的内置函数close的函数签名就用到了单向通道。
func close(c chan<- Type)
又或者说常用到的time包下的After函数
func After(d Duration) <-chan Time
close函数的形参是一个只写通道,After函数的返回值是一个只读通道,所以单向通道的语法如下:
- 箭头符号
<-在前,就是只读通道,如<-chan int - 箭头符号
<-在后,就是只写通道,如chan<- string
当尝试对只读的管道写入数据时,将会无法通过编译
func main() {
timeCh := time.After(time.Second)
timeCh <- time.Now()
}
报错如下,意思非常明确
invalid operation: cannot send to receive-only channel timeCh (variable of type <-chan time.Time)
对只写的管道读取数据也是同理。
双向管道可以转换为单向管道,反过来则不可以。通常情况下,将双向管道传给某个协程或函数并且不希望它读取/发送数据,就可以用到单向管道来限制另一方的行为。
func main() {
ch := make(chan int, 1)
go write(ch)
fmt.Println(<-ch)
}
func write(ch chan<- int) {
// 只能对管道发送数据
ch <- 1
}
只读管道也是一样的道理
提示
chan是引用类型,即便Go的函数参数是值传递,但其引用依旧是同一个,这一点会在后续的管道原理中说明。
for range
通过for range语句,可以遍历读取缓冲管道中的数据,如下例
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
for n := range ch {
fmt.Println(n)
}
}
通常来说,for range遍历其他可迭代数据结构时,会有两个返回值,第一个是索引,第二个元素值,但是对于管道而言,有且仅有一个返回值,就是缓冲区的元素值,for range会遍历读取管道缓冲区中的元素,当管道缓冲区为空时,就会阻塞等待,直到有其他协程向管道中写入数据才会继续读取数据,所以输出如下
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!
可以看到上面的代码发生了死锁,因为子协程已经执行完毕了,而主协程还在阻塞等待其他协程来向管道中写入数据,所以应该管道在写入完毕后将其关闭。修改为如下代码
func main() {
ch := make(chan int, 10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
// 关闭管道
close(ch)
}()
for n := range ch {
fmt.Println(n)
}
}
写完后关闭管道,上述代码便不再会发生死锁。前面提到过读取管道是有两个返回值的,for range遍历管道时,当无法成功读取数据时,便会退出循环。第二个返回值指的是能否成功读取数据,而不是管道是否已经关闭,即便管道已经关闭,对于有缓冲管道而言,依旧可以读取数据,并且第二个返回值仍然为true。看下面的一个例子
func main() {
ch := make(chan int, 10)
for i := 0; i < 5; i++ {
ch <- i
}
// 关闭管道
close(ch)
// 再读取数据
for i := 0; i < 6; i++ {
n, ok := <-ch
fmt.Println(n, ok)
}
}
输出结果
0 true
1 true
2 true
3 true
4 true
0 false
由于管道已经关闭了,即便缓冲区为空,再读取数据也不会导致当前协程阻塞,可以看到在第六次遍历的时候读取的是零值,并且ok为false。
提示
关于管道关闭的时机,应该尽量在向管道发送数据的那一方关闭管道,而不要在接收方关闭管道,因为大多数情况下接收方只知道接收数据,并不知道该在什么时候关闭管道。
select
select在Linux系统中,是一种IO多路复用的解决方案,类似的,在Go中,select是一种管道多路复用的控制结构。什么是多路复用,简单的用一句话概括:在某一时刻,同时监测多个元素是否可用,被监测的可以是网络请求,文件IO等。在Go中的select监测的元素就是管道,且只能是管道。select的语法与switch语句类似,下面看看一个select语句长什么样
func main() {
// 创建三个管道
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
default:
fmt.Println("所有管道都不可用")
}
}
与switch类似,select由多个case和一个default组成,default分支可以省略。每一个case只能操作一个管道,且只能进行一种操作,要么读要么写,当有多个case可用时,select会伪随机的选择一个case来执行。如果所有case都不可用,就会执行default分支,倘若没有default分支,将会阻塞等待,直到至少有一个case可用。由于上例中没有对管道写入数据,自然所有的case都不可用,所以最终输出为default分支的执行结果。稍微修改下后如下:
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
// 开启一个新的协程
go func() {
// 向A管道写入数据
chA <- 1
}()
select {
case n, ok := <-chA:
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println(n, ok)
}
}
上例开启了一个新的协程来向管道A写入数据,select由于没有默认分支,所以会一直阻塞等待直到有case可用。当管道A可用时,执行完对应分支后主协程就直接退出了。要想一直监测管道,可以配合for循环使用,如下。
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
go Send(chA)
go Send(chB)
go Send(chC)
// for循环
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("B", n, ok)
}
}
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}
这样确实三个管道都能用上了,但是死循环+select会导致主协程永久阻塞,所以可以将其单独放到新协程中,并且加上一些其他的逻辑。
func main() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
l := make(chan struct{})
go Send(chA)
go Send(chB)
go Send(chC)
go func() {
Loop:
for {
select {
case n, ok := <-chA:
fmt.Println("A", n, ok)
case n, ok := <-chB:
fmt.Println("B", n, ok)
case n, ok := <-chC:
fmt.Println("B", n, ok)
case <-time.After(time.Second): // 设置1秒的超时时间
break Loop // 退出循环
}
}
l <- struct{}{} // 告诉主协程可以退出了
}()
<-l
}
func Send(ch chan<- int) {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond)
ch <- i
}
}
上例中通过for循环配合select来一直监测三个管道是否可以用,并且第四个case是一个超时管道,超时过后便会退出循环,结束子协程。最终输出如下
B 0 true
B 0 true
A 0 true
A 1 true
B 1 true
B 1 true
B 2 true
A 2 true
B 2 true
超时
上一个例子用到了time.After函数,其返回值是一个只读的管道,该函数配合select使用可以非常简单的实现超时机制,例子如下
func main() {
chA := make(chan int)
defer close(chA)
go func() {
time.Sleep(time.Second * 2)
chA <- 1
}()
select {
case n := <-chA:
fmt.Println(n)
case <-time.After(time.Second):
fmt.Println("超时")
}
}
永久阻塞
当select语句中什么都没有时,就会永久阻塞,例如
func main() {
fmt.Println("start")
select {}
fmt.Println("end")
}
end永远也不会输出,主协程会一直阻塞,这种情况一般是有特殊用途。
提示
在select的case中对值为nil的管道进行操作的话,并不会导致阻塞,该case则会被忽略,永远也不会被执行。例如下方代码无论执行多少次都只会输出timeout。
func main() {
var nilCh chan int
select {
case <-nilCh:
fmt.Println("read")
case nilCh <- 1:
fmt.Println("write")
case <-time.After(time.Second):
fmt.Println("timeout")
}
}
WaitGroup
sync.WaitGroup是sync包下提供的一个结构体,WaitGroup即等待执行,使用它可以很轻易的实现等待一组协程的效果。该结构体只对外暴露三个方法。
Add方法用于指明要等待的协程的数量
func (wg *WaitGroup) Add(delta int)
Done方法表示当前协程已经执行完毕
func (wg *WaitGroup) Done()
Wait方法等待子协程结束,否则就阻塞
func (wg *WaitGroup) Wait()
WaitGroup使用起来十分简单,属于开箱即用。其内部的实现是计数器+信号量,程序开始时调用Add初始化计数,每当一个协程执行完毕时调用Done,计数就-1,直到减为0,而在此期间,主协程调用Wait 会一直阻塞直到全部计数减为0,然后才会被唤醒。看一个简单的使用例子
func main() {
var wait sync.WaitGroup
// 指定子协程的数量
wait.Add(1)
go func() {
fmt.Println(1)
// 执行完毕
wait.Done()
}()
// 等待子协程
wait.Wait()
fmt.Println(2)
}
这段代码永远都是先输出1再输出2,主协程会等待子协程执行完毕后再退出。
1
2
针对协程介绍中最开始的例子,可以做出如下修改
func main() {
var mainWait sync.WaitGroup
var wait sync.WaitGroup
// 计数10
mainWait.Add(10)
fmt.Println("start")
for i := 0; i < 10; i++ {
// 循环内计数1
wait.Add(1)
go func() {
fmt.Println(i)
// 两个计数-1
wait.Done()
mainWait.Done()
}()
// 等待当前循环的协程执行完毕
wait.Wait()
}
// 等待所有的协程执行完毕
mainWait.Wait()
fmt.Println("end")
}
这里使用了sync.WaitGroup替代了原先的time.Sleep,协程并发执行的的顺序更加可控,不管执行多少次,输出都如下
start
0
1
2
3
4
5
6
7
8
9
end
WaitGroup通常适用于可动态调整协程数量的时候,例如事先知晓协程的数量,又或者在运行过程中需要动态调整。WaitGroup的值不应该被复制,复制后的值也不应该继续使用,尤其是将其作为函数参数传递时,因该传递指针而不是值。倘若使用复制的值,计数完全无法作用到真正的WaitGroup上,这可能会导致主协程一直阻塞等待,程序将无法正常运行。例如下方的代码
func main() {
var mainWait sync.WaitGroup
mainWait.Add(1)
hello(mainWait)
mainWait.Wait()
fmt.Println("end")
}
func hello(wait sync.WaitGroup) {
fmt.Println("hello")
wait.Done()
}
错误提示所有的协程都已经退出,但主协程依旧在等待,这就形成了死锁,因为hello函数内部对一个形参WaitGroup调用Done并不会作用到原来的mainWait上,所以应该使用指针来进行传递。
hello
fatal error: all goroutines are asleep - deadlock!
提示
当计数变为负数,或者计数数量大于子协程数量时,将会引发panic。
Context
Context译为上下文,是Go提供的一种并发控制的解决方案,相比于管道和WaitGroup,它可以更好的控制子孙协程以及层级更深的协程。Context本身是一个接口,只要实现了该接口都可以称之为上下文例如著名Web框架Gin中的gin.Context。context标准库也提供了几个实现,分别是:
emptyCtxcancelCtxtimerCtxvalueCtx
Context
先来看看Context接口的定义,再去了解它的具体实现。
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key any) any
}
Deadline
该方法具有两个返回值,deadline是截止时间,即上下文应该取消的时间。第二个值是是否设置deadline,如果没有设置则一直为false。
Deadline() (deadline time.Time, ok bool)
Done
其返回值是一个空结构体类型的只读管道,该管道仅仅起到通知作用,不传递任何数据。当上下文所做的工作应该取消时,该通道就会被关闭,对于一些不支持取消的上下文,可能会返回nil。
Done() <-chan struct{}
Err
该方法返回一个error,用于表示上下关闭的原因。当Done管道没有关闭时,返回nil,如果关闭过后,会返回一个err来解释为什么会关闭。
Err() error
Value
该方法返回对应的键值,如果key不存在,或者不支持该方法,就会返回nil。
Value(key any) any
emptyCtx
顾名思义,emptyCtx就是空的上下文,context包下所有的实现都是不对外暴露的,但是提供了对应的函数来创建上下文。emptyCtx就可以通过context.Background和context.TODO来进行创建。两个函数如下
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
可以看到仅仅只是返回了emptyCtx指针。emptyCtx的底层类型实际上是一个int,之所以不使用空结构体,是因为emptyCtx的实例必须要有不同的内存地址,它没法被取消,没有deadline,也不能取值,实现的方法都是返回零值。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
emptyCtx通常是用来当作最顶层的上下文,在创建其他三种上下文时作为父上下文传入。context包中的各个实现关系如下图所示

valueCtx
valueCtx实现比较简单,其内部只包含一对键值对,和一个内嵌的Context类型的字段。
type valueCtx struct {
Context
key, val any
}
其本身只实现了Value方法,逻辑也很简单,当前上下文找不到就去父上下文找。
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
下面看一个valueCtx的简单使用案例
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(1)
// 传入上下文
go Do(context.WithValue(context.Background(), 1, 2))
waitGroup.Wait()
}
func Do(ctx context.Context) {
// 新建定时器
ticker := time.NewTimer(time.Second)
defer waitGroup.Done()
for {
select {
case <-ctx.Done(): // 永远也不会执行
case <-ticker.C:
fmt.Println("timeout")
return
default:
fmt.Println(ctx.Value(1))
}
time.Sleep(time.Millisecond * 100)
}
}
valueCtx多用于在多级协程中传递一些数据,无法被取消,因此ctx.Done永远会返回nil,select会忽略掉nil管道。最后输出如下
2
2
2
2
2
2
2
2
2
2
timeout
cancelCtx
cancelCtx以及timerCtx都实现了canceler接口,接口类型如下
type canceler interface {
// removeFromParent 表示是否从父上下文中删除自身
// err 表示取消的原因
cancel(removeFromParent bool, err error)
// Done 返回一个管道,用于通知取消的原因
Done() <-chan struct{}
}
cancel方法不对外暴露,在创建上下文时通过闭包将其包装为返回值以供外界调用,就如context.WithCancel源代码中所示
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
// 尝试将自身添加进父级的children中
propagateCancel(parent, &c)
// 返回context和一个函数
return &c, func() { c.cancel(true, Canceled) }
}
cancelCtx译为可取消的上下文,创建时,如果父级实现了canceler,就会将自身添加进父级的children中,否则就一直向上查找。如果所有的父级都没有实现canceler,就会启动一个协程等待父级取消,然后当父级结束时取消当前上下文。当调用cancelFunc时,Done通道将会关闭,该上下文的任何子级也会随之取消,最后会将自身从父级中删除。下面是一个简单的示例:
var waitGroup sync.WaitGroup
func main() {
bkg := context.Background()
// 返回了一个cancelCtx和cancel函数
cancelCtx, cancel := context.WithCancel(bkg)
waitGroup.Add(1)
go func(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("等待取消中...")
}
time.Sleep(time.Millisecond * 200)
}
}(cancelCtx)
time.Sleep(time.Second)
cancel()
waitGroup.Wait()
}
输出如下
等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
context canceled
再来一个层级嵌套深一点的示例
var waitGroup sync.WaitGroup
func main() {
waitGroup.Add(3)
ctx, cancelFunc := context.WithCancel(context.Background())
go HttpHandler(ctx)
time.Sleep(time.Second)
cancelFunc()
waitGroup.Wait()
}
func HttpHandler(ctx context.Context) {
cancelCtxAuth, cancelAuth := context.WithCancel(ctx)
cancelCtxMail, cancelMail := context.WithCancel(ctx)
defer cancelAuth()
defer cancelMail()
defer waitGroup.Done()
go AuthService(cancelCtxAuth)
go MailService(cancelCtxMail)
for {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
default:
fmt.Println("正在处理http请求...")
}
time.Sleep(time.Millisecond * 200)
}
}
func AuthService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("auth 父级取消", ctx.Err())
return
default:
fmt.Println("auth...")
}
time.Sleep(time.Millisecond * 200)
}
}
func MailService(ctx context.Context) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
fmt.Println("mail 父级取消", ctx.Err())
return
default:
fmt.Println("mail...")
}
time.Sleep(time.Millisecond * 200)
}
}
例子中创建了3个cancelCtx,尽管父级cancelCtx在取消的同时会取消它的子上下文,但是保险起见,如果创建了一个cancelCtx,在相应的流程结束后就应该调用cancel函数。输出如下
正在处理http请求...
auth...
mail...
mail...
auth...
正在处理http请求...
auth...
mail...
正在处理http请求...
正在处理http请求...
auth...
mail...
auth...
正在处理http请求...
mail...
context canceled
auth 父级取消 context canceled
mail 父级取消 context canceled
timerCtx
timerCtx在cancelCtx 的基础之上增加了超时机制,context包下提供了两种创建的函数,分别是WithDeadline和WithTimeout,两者功能类似,前者是指定一个具体的超时时间,比如指定一个具体时间2023/3/20 16:32:00,后者是指定一个超时的时间间隔,比如5分钟后。两个函数的签名如下
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
timerCtx会在时间到期后自动取消当前上下文,取消的流程除了要额外的关闭timer之外,基本与cancelCtx一致。下面是一个简单的timerCtx的使用示例
var wait sync.WaitGroup
func main() {
deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
defer cancel()
wait.Add(1)
go func(ctx context.Context) {
defer wait.Done()
for {
select {
case <-ctx.Done():
fmt.Println("上下文取消", ctx.Err())
return
default:
fmt.Println("等待取消中...")
}
time.Sleep(time.Millisecond * 200)
}
}(deadline)
wait.Wait()
}
尽管上下文到期会自动取消,但是为了保险起见,在相关流程结束后,最好手动取消上下文。输出如下
等待取消中...
等待取消中...
等待取消中...
等待取消中...
等待取消中...
上下文取消 context deadline exceeded
WithTimeout其实与WithDealine非常相似,它的实现也只是稍微封装了一下并调用WithDeadline,和上面例子中的WithDeadline用法一样,如下
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
提示
就跟内存分配后不回收会造成内存泄漏一样,上下文也是一种资源,如果创建了但从来不取消,一样会造成上下文泄露,所以最好避免此种情况的发生。
锁
先来看看的一个例子
var wait sync.WaitGroup
var count = 0
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
// 模拟访问耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
// 访问数据
temp := *data
// 模拟计算耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
ans := 1
// 修改数据
*data = temp + ans
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("最终结果", count)
}
对于上面的例子,开启了十个协程来对count进行+1操作,并且使用了time.Sleep来模拟不同的耗时,根据直觉来讲,10个协程执行10个+1操作,最终结果一定是10,正确结果也确实是10,但事实并非如此,上面的例子执行结果如下:
1
2
3
3
2
2
3
3
3
4
最终结果 4
可以看到最终结果为4,而这只是众多可能的结果中的一种。由于每个协程访问和计算所需的时间不同,A协程访问数据耗费500毫秒,此时访问到的count值为1,随后又花费了400毫秒计算,但在这400毫秒内,B协程已经完成了访问和计算并成功更新了count的值,A协程在计算完毕后,A协程最初访问到的值已经过时了,但A协程并不知道这件事,依旧在原先访问到的值基础上加一,并赋值给count,这样一来,B协程的执行结果被覆盖了。多个协程读取和访问一个共享数据时,尤其会发生这样的问题,为此就需要用到锁。
Go中sync包下的Mutex与RWMutex提供了互斥锁与读写锁两种实现,且提供了非常简单易用的API,加锁只需要Lock(),解锁也只需要Unlock()。需要注意的是,Go所提供的锁都是非递归锁,也就是不可重入锁,所以重复加锁或重复解锁都会导致死锁。
互斥锁
sync.Mutex是Go提供的互斥锁实现,其实现了sync.Locker接口
type Locker interface {
// 加锁
Lock()
// 解锁
Unlock()
}
使用互斥锁可以非常完美的解决上述问题,例子如下
var wait sync.WaitGroup
var count = 0
var lock sync.Mutex
func main() {
wait.Add(10)
for i := 0; i < 10; i++ {
go func(data *int) {
// 加锁
lock.Lock()
// 模拟访问耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
// 访问数据
temp := *data
// 模拟计算耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
ans := 1
// 修改数据
*data = temp + ans
// 解锁
lock.Unlock()
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("最终结果", count)
}
每一个协程在访问数据前,都先上锁,更新完成后再解锁,其他协程想要访问就必须要先获得锁,否则就阻塞等待。如此一来,就不存在上述问题了,所以输出如下
1
2
3
4
5
6
7
8
9
10
最终结果 10
读写锁
互斥锁适合读操作与写操作频率都差不多的情况,对于一些读多写少的数据,如果使用互斥锁,会造成大量的不必要的协程竞争锁,这会消耗很多的系统资源,这时候就需要用到读写锁,即读写互斥锁,对于一个协程而言:
- 如果获得了读锁,其他协程进行写操作时会阻塞,其他协程进行读操作时不会阻塞
- 如果获得了写锁,其他协程进行写操作时会阻塞,其他协程进行读操作时会阻塞
Go中读写互斥锁的实现是sync.RWMutex,它也同样实现了Locker接口,但它提供了更多可用的方法,如下:
// 加读锁
func (rw *RWMutex) RLock()
// 尝试加读锁
func (rw *RWMutex) TryRLock() bool
// 解读锁
func (rw *RWMutex) RUnlock()
// 加写锁
func (rw *RWMutex) Lock()
// 尝试加写锁
func (rw *RWMutex) TryLock() bool
// 解写锁
func (rw *RWMutex) Unlock()
其中TryRlock与TryLock两个尝试加锁的操作是非阻塞式的,成功加锁会返回true,无法获得锁时并不会阻塞而是返回false。读写互斥锁内部实现依旧是互斥锁,并且从始至终都只有一个锁,并不是说分读锁和写锁就有两个锁。
