并发控制

2021/08/08 go探索发现 共 7313 字,约 21 分钟

云原生科学-Go探索发现: 并发控制

协程A执行过程中需要创建子协程A1,A2,A3…An,协程A创建完子协程后等待子协程退出。针对这种场景Go提供了三种解决方案。

  • Channel
  • WatiGroup 信号量机制控制子协程
  • Context 使用上下文控制子协程。

Channel优点是实现简单,清晰易懂。WaitGroup的优点是子协程个数可以动态调整,Context可以子对协程派生出来的孙子协程进行控制。

channel

channel 一般用于协程之间的通信,不过也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程。

ackage main

import (
	"fmt"
	"time"
)

func Process(c chan<- int ){
	time.Sleep(time.Second)
	c<-1
}
func main(){
	channels:=make([] chan int,10)
	for i:=0;i<10;i++{
		channels[i]=make(chan int)
		go Process(channels[i])
	}
	for i,ch:=range channels {
		<-ch
		fmt.Printf("process %v is done\n",i)
	}

}

父协程也可以向管道写入数据通知子协程结束,这时子协程需要定期探测管道中是否有消息实现。

小结

channel控制子协程的优点是实现简单,但需要大量创建协程时就需要相同数量的channel,对子协程继续派生出的协程不方便控制

waitgroup

waitgroup内部维护了一个技数器:

  • 启动goroutine前通过Add(2)将技术器设置为待启动的groutine个数;
  • 启动goroutine后,使用Wait()方法阻塞自己,等待技术器变0;
  • 每个goroutine执行结束后通过Done()方法将技术器减1;
  • 技术器变为0后,阻塞的goroutine被唤醒。

例子参考 coding-with-go

信号量

信号量提供一种保护共享资源的机制。 信号量可以理解为数值:

  • 当信号量>0.表示资源可用,获取信号量时自动将信号量减1;
  • 当信号量==0时,表示资源不可用,获取信号量时,当前线程会进入睡眠,信号量为正时被唤醒。

context

context对于派生goroutine有更强的控制力,它可以控制多级的goroutine.

实现原理

context只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别用于不同的场景。

接口定义

源码包src/context/context.go 定义了接口

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Deadline()

返回deadline和标识是否设置dealine的bool的值,如果没有设置deadline,则ok==false,此时deadline为一个初始值的time.Time值。

Done()

该方法返回一个channel,需要在select-case语句中使用,如case<-context.Done(). 当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭请求,当context还未关闭时,Done()返回nil;

Err()

该方法描述context关闭的原因,关闭的原因由context实现控制,不要要用户设置。比如Deadline context,关闭可能是因为deadline,也可能是被主动关闭;

  • deadline关闭:context deadline exceeded;
  • 主动关闭: context canceled.

当context关闭后,Error返回context的关闭原因;当context还未关闭时,Err()返回nil.

Value()

用于在goroutine之间传递信息。 该方法根据key值查询map中的value.

空context

context 包定义了一个空的context,名为emptyCtx,用于conext根结点,空contex只是简单实现了Conext,本身不包含任何值,仅用于其他context的父节点。

emptyCtx实现

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
	return background
}

context实例创建方式

context包提供四种方法创建不同类型的context实例,使用四个方法时如果没有父context,则都需要传入background.即将background作为父节点。

  • WithCancel()
  • WithDeadline()
  • WithTimeout()
  • WithhValue()

context包中实现Context接口的struct,除了emptyCtx,还有cancelCtx,timerCtx和valueCtx三种。正是基于这三种context结构体,实现了上述四种类型的context.

以WithCancel()为例: 调用newCancelCtx(parent)创建CancelCtx实例

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

CancelCtx继承参数中的parenet context

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
# var cancelCtxKey int
# key 等于&cancelCtxkey时 返回本身的结构体类型
在 propagateCancel调用时被用来判断parent context是否是*cancelCtx类型 p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
func (c *cancelCtx) Value(key interface{}) interface{} {
	if key == &cancelCtxKey {
		return c
	}
	return c.Context.Value(key)
}

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}
#一个进程写多个协程读 加锁保证多个协程读的一致性
func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

type stringer interface {
	String() string
}

func contextName(c Context) string {
	if s, ok := c.(stringer); ok {
		return s.String()
	}
	return reflectlite.TypeOf(c).String()
}

func (c *cancelCtx) String() string {
	return contextName(c.Context) + ".WithCancel"
}

Context 树的构建是在调用 context.WithCancel() 调用时通过 propagateCancel 进行的。 如果parent context不是context.background()生成的emptyCtx,而是从上层传下来的cancelCtx类型,则对parent context的child字段进行赋值

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

// &cancelCtxKey is the key that a cancelCtx returns itself for.
var cancelCtxKey int

// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
// 调用回溯链中第一个实现了 Done() 的实例(第三方Context类/cancelCtx) 判断是否是cancelCtx类型
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	if done == closedchan || done == nil {
		return nil, false
	}
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
	return p, true
}

children字段中记录了由此context派生的所有child,此context被“cancel”时会把其中的所有child都cancel掉。 cancelCtx与deadline和value无关,只需实现Done()和Err()接口即可。 cancelCtx.done值一般经历如下三个阶段:nil->chan struct{}->closed chan.

Err()接口的实现。

Err()接口只需返回一个error告知context被关闭的原因即可,对于cancelCtx来说只需要返回成员变量err即可。 cancelCtx.err默认是nil,在context被“cancel”时指定一个error变量:var canceled=errors.New(“context canceled”)

cancel()接口实现

WithCancel()返回的第二个用于cancel context方法正是cancel() 它关闭自己及其后代,后代存储在children map中。

WithCancel方法做了三件事情:

  • 将cancelCtx实例添加到其父节点的children中。
  • 初始化一个cancelCtx实例
  • 返回cancelCtx实例和cancel()方法。

timerCtx

type timerCtx struct{
	cancelctx
	timer *time.Timer
	deadline time.Time
}

timerCtx在cancelCtx基础上增加了deadline,用于标识自动cancel的最终时间,而timer就是一个触发自动cancel的定时器,由此衍生出withDeadline()和WithTimeout().

  • deadline: 指最后期限
  • timeout: 指定最长存活时间,比如context在30s后结束。

对于接口来说,timeCtx在cancelCtx的基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。

ValueCtx

type valueCtx Struct{
    Context
    key,val interface{}
}

valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。 由于valueCtx既不需要cancel,也不需要deadline,只需要实现Value()接口即可。

valueCtx接口实现

func (c *valueCtx)Value(key interface{})interface{}{
if c.key == key {
   return c.val
}
return c.Context.Value(key)
}

当前context找不到key时,会向父节点查找,如果查询不到则最终返回interface{}.可以通过子context查到父节点的value.

小结

Context仅仅是接口定义,根据实现的不同,可以衍生出不同类型context; cancelCtx实现了Context,痛过WithCancel()创建cncelCtx实例。 timeerCtx实现了Context, 通过WithDeadline() WithTimeout创建timerCtx实例。 valueCtx实现了Context, 通通过WithValue()创建valueCtx.

Mutex

文档信息

Search

    Table of Contents