云原生科学-Go探索发现: 并发控制
协程A执行过程中需要创建子协程A1,A2,A3…An,协程A创建完子协程后等待子协程退出。针对这种场景Go提供了三种解决方案。
- Channel
- WatiGroup 信号量机制控制子协程
- Context 使用上下文控制子协程。
Channel优点是实现简单,清晰易懂。WaitGroup的优点是子协程个数可以动态调整,Context可以子对协程派生出来的孙子协程进行控制。
channel
channel 一般用于协程之间的通信,不过也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程。
ackage main
import (
"fmt"
"time"
)
func Process(c chan<- int ){
time.Sleep(time.Second)
c<-1
}
func main(){
channels:=make([] chan int,10)
for i:=0;i<10;i++{
channels[i]=make(chan int)
go Process(channels[i])
}
for i,ch:=range channels {
<-ch
fmt.Printf("process %v is done\n",i)
}
}
父协程也可以向管道写入数据通知子协程结束,这时子协程需要定期探测管道中是否有消息实现。
小结
channel控制子协程的优点是实现简单,但需要大量创建协程时就需要相同数量的channel,对子协程继续派生出的协程不方便控制
waitgroup
waitgroup内部维护了一个技数器:
- 启动goroutine前通过Add(2)将技术器设置为待启动的groutine个数;
- 启动goroutine后,使用Wait()方法阻塞自己,等待技术器变0;
- 每个goroutine执行结束后通过Done()方法将技术器减1;
- 技术器变为0后,阻塞的goroutine被唤醒。
例子参考 coding-with-go
信号量
信号量提供一种保护共享资源的机制。 信号量可以理解为数值:
- 当信号量>0.表示资源可用,获取信号量时自动将信号量减1;
- 当信号量==0时,表示资源不可用,获取信号量时,当前线程会进入睡眠,信号量为正时被唤醒。
context
context对于派生goroutine有更强的控制力,它可以控制多级的goroutine.
实现原理
context只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别用于不同的场景。
接口定义
源码包src/context/context.go 定义了接口
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
Deadline()
返回deadline和标识是否设置dealine的bool的值,如果没有设置deadline,则ok==false,此时deadline为一个初始值的time.Time值。
Done()
该方法返回一个channel,需要在select-case语句中使用,如case<-context.Done(). 当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭请求,当context还未关闭时,Done()返回nil;
Err()
该方法描述context关闭的原因,关闭的原因由context实现控制,不要要用户设置。比如Deadline context,关闭可能是因为deadline,也可能是被主动关闭;
- deadline关闭:context deadline exceeded;
- 主动关闭: context canceled.
当context关闭后,Error返回context的关闭原因;当context还未关闭时,Err()返回nil.
Value()
用于在goroutine之间传递信息。 该方法根据key值查询map中的value.
空context
context 包定义了一个空的context,名为emptyCtx,用于conext根结点,空contex只是简单实现了Conext,本身不包含任何值,仅用于其他context的父节点。
emptyCtx实现
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
context实例创建方式
context包提供四种方法创建不同类型的context实例,使用四个方法时如果没有父context,则都需要传入background.即将background作为父节点。
- WithCancel()
- WithDeadline()
- WithTimeout()
- WithhValue()
context包中实现Context接口的struct,除了emptyCtx,还有cancelCtx,timerCtx和valueCtx三种。正是基于这三种context结构体,实现了上述四种类型的context.
以WithCancel()为例: 调用newCancelCtx(parent)创建CancelCtx实例
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
CancelCtx继承参数中的parenet context
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
# var cancelCtxKey int
# key 等于&cancelCtxkey时 返回本身的结构体类型
在 propagateCancel调用时被用来判断parent context是否是*cancelCtx类型 p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
func (c *cancelCtx) Value(key interface{}) interface{} {
if key == &cancelCtxKey {
return c
}
return c.Context.Value(key)
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
#一个进程写多个协程读 加锁保证多个协程读的一致性
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
type stringer interface {
String() string
}
func contextName(c Context) string {
if s, ok := c.(stringer); ok {
return s.String()
}
return reflectlite.TypeOf(c).String()
}
func (c *cancelCtx) String() string {
return contextName(c.Context) + ".WithCancel"
}
Context 树的构建是在调用 context.WithCancel() 调用时通过 propagateCancel 进行的。 如果parent context不是context.background()生成的emptyCtx,而是从上层传下来的cancelCtx类型,则对parent context的child字段进行赋值
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// &cancelCtxKey is the key that a cancelCtx returns itself for.
var cancelCtxKey int
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
// 调用回溯链中第一个实现了 Done() 的实例(第三方Context类/cancelCtx) 判断是否是cancelCtx类型
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
children字段中记录了由此context派生的所有child,此context被“cancel”时会把其中的所有child都cancel掉。 cancelCtx与deadline和value无关,只需实现Done()和Err()接口即可。 cancelCtx.done值一般经历如下三个阶段:nil->chan struct{}->closed chan.
Err()接口的实现。
Err()接口只需返回一个error告知context被关闭的原因即可,对于cancelCtx来说只需要返回成员变量err即可。 cancelCtx.err默认是nil,在context被“cancel”时指定一个error变量:var canceled=errors.New(“context canceled”)
cancel()接口实现
WithCancel()返回的第二个用于cancel context方法正是cancel() 它关闭自己及其后代,后代存储在children map中。
WithCancel方法做了三件事情:
- 将cancelCtx实例添加到其父节点的children中。
- 初始化一个cancelCtx实例
- 返回cancelCtx实例和cancel()方法。
timerCtx
type timerCtx struct{
cancelctx
timer *time.Timer
deadline time.Time
}
timerCtx在cancelCtx基础上增加了deadline,用于标识自动cancel的最终时间,而timer就是一个触发自动cancel的定时器,由此衍生出withDeadline()和WithTimeout().
- deadline: 指最后期限
- timeout: 指定最长存活时间,比如context在30s后结束。
对于接口来说,timeCtx在cancelCtx的基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。
ValueCtx
type valueCtx Struct{
Context
key,val interface{}
}
valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。 由于valueCtx既不需要cancel,也不需要deadline,只需要实现Value()接口即可。
valueCtx接口实现
func (c *valueCtx)Value(key interface{})interface{}{
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
当前context找不到key时,会向父节点查找,如果查询不到则最终返回interface{}.可以通过子context查到父节点的value.
小结
Context仅仅是接口定义,根据实现的不同,可以衍生出不同类型context; cancelCtx实现了Context,痛过WithCancel()创建cncelCtx实例。 timeerCtx实现了Context, 通过WithDeadline() WithTimeout创建timerCtx实例。 valueCtx实现了Context, 通通过WithValue()创建valueCtx.
Mutex
文档信息
- 本文作者:Beast
- 本文链接:https://beastpu.github.io/2021/08/08/go-05/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)