go sync.Cond
提示
本文代码基于 go1.17.13,src/sync/cond.go
# 一、简述
- Cond 实现了一个条件变量,可以让一组阻塞等待的 goroutine 在满足条件的情况下被唤醒;
- WaiteGroup 是主 goroutine 阻塞等待,直到所有子 goroutine 执行完毕;
- 与 WaiteGroup 相反,Cond 可以让子 goroutine 阻塞等待,直到主 goroutine 发出信号才会被唤醒;
- 通常与sync.Mutex结合使用,用于在多个协程之间进行通信和同步;
- 相比于 for 循环实现的忙等待来说,Cond 能够让 goroutine 阻塞等待,让出 CPU 的使用权;
# 二、基本原理
- 通过原子操作记录当前正在执行的 goroutine 的数量和等待的 goroutine 数量,当正在执行的 goroutine 计数不为 0 时,主 goroutine 阻塞等待;当正在执行的 goroutine 计数为 0 时唤醒等待的主 goroutine,并检测该计数是否为 0,以防止在主 goroutine 阻塞等待期间
WaitGroup
被其他 goroutine 复用; - 主 goroutine 调用
Add
来设置要等待的 goroutine 数量,并发执行多个数量的子 goroutine,然后主 goroutine 调用Wait
阻塞等待,直到所有 goroutines 完成,每个子 goroutine 在运行完成后调用Done
让正在执行的 goroutine 数量减一,数量为 0 时阻塞等待完成; - WaitGroup 首次使用后不允许被复制;
# 三、基本用法
点击查看
package main
import (
"fmt"
"sync"
)
var (
count = 4
c = sync.NewCond(&sync.Mutex{})
)
func main() {
for i := 0; i < count; i++ {
go consumer(i)
}
producer()
}
func consumer(id int) {
c.L.Lock()
c.Wait() // 等待生产者的信号
fmt.Printf("Consumer %d: consumed 1 item\n", id)
count--
c.L.Unlock()
}
func producer() {
for count > 0 {
c.L.Lock()
c.Signal() // 唤醒一个消费者
c.L.Unlock()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
- 创建了一个名为 c 的 sync.Cond 对象,并与一个 sync.Mutex 结合使用;
- consumer 函数模拟消费者行为,当条件不满足时,它会调用 c.Wait() 进入等待状态;
- producer 函数负责生产物品,并在生产后通过 c.Signal() 唤醒一个消费者;
- 这样,生产者和消费者就可以在条件成立时进行相互通信,实现协程间的同步和通信;
# 四、源码解读
# 1, Cond
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
1
2
3
4
5
6
2
3
4
5
6
noCopy
是一个空结构体类型,用于禁止复制,会在编译期间检测,可以通过go vet
检测;L
一个实现了Locker
接口的对象,通常是一个sync.Mutex
或sync.RWMutex
,用于在条件变量上等待和发出信号时保证线程安全;notify
一个notifyList
类型的字段用于管理等待在条件变量上的协程列表,并且负责唤醒这些协程;checker
一个copyChecker
类型的字段,用于检测sync.Cond
对象是否被复制,为了防止意外地拷贝sync.Cond
对象,因为sync.Cond
对象在并发环境下应当是不可复制的;
# 2, NewCond
点击查看
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
1
2
3
4
2
3
4
- 根据传入的 Locker 对象创建 Cond 指针;
# 3, Wait
点击查看
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
1
2
3
4
5
6
7
2
3
4
5
6
7
- Wait 将当前 goroutine 添加到条件变量的通知列表中,并原子解锁 c.L,暂停调用的 goroutine 的执行,直到收到通知信号才会继续执行;
- 解锁是能够让其自己在阻塞等待期间,其他 goroutine 能够获得锁继续操作共享资源,当前 goroutine 恢复执行后,Wait 在返回之前锁定 c.L,以便自己能够操作共享资源;
- 除非被
Broadcast
或Signal
唤醒,否则Wait
无法返回;
# 4, Signal
点击查看
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
1
2
3
4
2
3
4
- 通过
notify
字段唤醒一个等待 c 的 goroutine,唤醒的 goroutine 是队列最前面且等待最久的; - 对于调用者在调用期间持有 c.L 是允许的但不是必需的;
# 5, Broadcast
点击查看
// Broadcast 唤醒所有等待 c 的 goroutine,对于调用者在调用期间持有 c.L是允许的但不是必需的。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
1
2
3
4
5
2
3
4
5
- 通过
notify
字段唤醒所有等待的 goroutine;
# 6, copyChecker
点击查看
// copyChecker 将指针保留到自身以检测对象复制。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
copyChecker
是一个uintptr
类型的别名,用于检测sync.Cond
对象是否被复制,从而保证在并发环境下sync.Cond
对象是不可复制的;
# 6, noCopy
点击查看
// noCopy 可以嵌入到结构中,首次使用后不得复制,详见 https://golang.org/issues/8005#issuecomment-190753527
type noCopy struct{}
// Lock 是 'go vet' 的 -copylocks 检查器使用的 no-op。
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
1
2
3
4
5
6
2
3
4
5
6
- 这段代码定义了一个名为
noCopy
的空结构体和其对应的两个空方法Lock
和Unlock
; - 在Go语言中,如果某个类型实现了锁操作的方法,但是在某些情况下不希望触发 -copylocks 检查器的警告,可以使用类似的方法来规避该检查器的报错;
noCopy
类型本身并无实际属性或行为,只是用于规避特定的静态代码分析工具go vet
的报错,不会对实际的程序逻辑产生影响,只是为了满足静态代码分析的要求;
编辑 (opens new window)
上次更新: 2024/08/12, 19:17:34