golang并发控制之WaitGroup与Context

golang并发控制之WaitGroup与Context,第1张

文章目录 channelWaitGroupWaitGroup结构AddWaitDone 示例 Contextcontext接口context类型cancelCtxtimerCtxvalueCtx 示例
在go中有三种方式实现并发控制(主协程等待子协程完成):

Channel:通过channel控制子协程;WaitGroup:使用信号量控制子协程;子协程数动态可调整;Context:使用上下文控制子协程;可进一步控制孙子协程; channel

通过channel控制(参见《golang通道channel与定时器简介》)子协程;实现简单,清晰易懂;但每个子协程都要有一个channel用于跟父协程通信,父协程等待所有子协程结束;且对子协程的派生协程控制不方便;

WaitGroup

WaitGroup是Golang应用开发过程中经常使用的并发控制技术。

信号量:

当信号量>0时,表示资源可用;获取信号量时信号量减一;当信号量==0时,表示资源不可用;获取信号量时,当前线程进入睡眠,当信号量为正时被唤醒。 WaitGroup结构

包含state计数器和一个信号量:

counter:还未执行完成的协程数量;waiter count:多少个等待者;semaphore:信号量;

对外提供接口

Add(delta int):counter增加delta(delta可以为负);Wait():waiter递增一,并阻塞等待信号量;Done():counter减一,并按照waiter数释放相应次数信号量;

Add *** 作必须早于Wait,Add设置的总值要与实际协程数量一致。

Add

Add做了两件事:

把delta累加到counter中;若counter值变为0,根据waiter数量释放等量的信号量;若counter为负数,则panic。 Wait

Wait也做了两件事:

累加waiter;阻塞等待信号量; Done

Done只做一件事,把counter减一;实际只是调用Add(-1);

示例

WaitGroup可在初始化时就设定等待数量,也可在启动goroutine前(一定要保证在Wait运行前已添加)通过Add增加:

func father(wg *sync.WaitGroup) {
	defer wg.Done()

	fmt.Printf("father\n")
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go child(wg, i)
	}
}

func child(wg *sync.WaitGroup, id int) {
	defer wg.Done()
	fmt.Printf("child [%d]\n", id)
}

func doWait() {
	var wg sync.WaitGroup
	wg.Add(1)
	go father(&wg)

	wg.Wait()
	fmt.Println("father and all children exit")
}
Context

相比WaitGroup,Context对于派生协程有更强的控制力,可控制多级协程。

context接口

Context是一个接口,凡是实现此接口的类都可称为是一种context:

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

各接口说明:

Deadline:返回当前 Context 被取消的时间,也就是完成工作的截止时间(deadline);若未设置过,ok返回false,且deadline为Time初始值;Done:返回channel(context已关闭时,返回关闭的channel;未关闭时,返回nil),需要在select-case中使用,如case <-context.Done():;Err:描述context关闭的原因(contex还未关闭时,返回nil);Value:从 Context 中返回键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法仅用于传递跨 API 和进程间跟请求域的数据。

典型使用:

for {
	select {
	case <-ctx.Done():
		fmt.Println("Contex has done:", ctx.Err())
		return
	default:
		fmt.Println("Is running")
		time.Sleep(2 * time.Second)
	}
}
context类型

context包中定义了四种类型的context:

context包中有一个公用的emptyCtx全局变量,可通过context.Background()获取后做为其他类型的父context;

Context仅仅是一个接口定义,跟据实现的不同,可以衍生出不同的context类型;cancelCtx实现了Context接口,通过WithCancel()创建cancelCtx实例;timerCtx实现了Context接口,通过WithDeadline()和WithTimeout()创建timerCtx实例;valueCtx实现了Context接口,通过WithValue()创建valueCtx实例;三种context实例可互为父节点,从而可以组合成不同的应用形式; cancelCtx

可取消上下文环境:

通过ctx, cancel := context.WithCancel(context.Background())创建;通过cancel()取消;
type cancelCtx struct {
	Context

	mu sync.Mutex 		// protects following fields
	done chan struct{} 	// created lazily, closed by first cancel call
	children map[canceler]struct{} 	// set to nil by the first cancel call
	err error 	// set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	c.mu.Lock()

	c.err = err //设置一个error, 说明关闭原因
	close(c.done) //将channel关闭, 以此通知派生的context

	for child := range c.children { //遍历所有children, 逐个调用cancel方法
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent { // 正常情况下,需要将自己从parent删除
		removeChild(c.Context, c)
	}
}

children记录了所有由此context派生的所有child(此context被cancel时,所有child也会被cancel掉)。

timerCtx

定时器上下文,可通过WithDeadline()或WithTimeout()来创建:

定时器到期时,会自动cancel掉;err设为”context deadline exceeded”;也可手动cancel掉;err设为"context canceled";
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.
	deadline time.Time	// 自动cancel的最终时间
}
valueCtx

传递数据的上下文:

type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}
示例

超时上下文可以主动调用cancel来取消,也可等待时间到后自动取消:

func contextWorker(index int, ctx context.Context, wg *sync.WaitGroup)  {
	defer wg.Done()
	
	for{
		select {
		case <-ctx.Done():
			fmt.Println(index, "Context done:", ctx.Err())
			return
		default:
			fmt.Println("worker:", index)
			time.Sleep(1000*time.Millisecond)
		}

	}
}

func doContext()  {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	//defer cancel()

	var wg sync.WaitGroup
	for i:=0; i<3; i++{
		wg.Add(1)
		go contextWorker(i+1, ctx, &wg)
	}

	time.Sleep(2*time.Second)
	cancel()

	wg.Wait()

	fmt.Println("Waited, to quit")
}

取消后,子例程会退出:

worker: 3
worker: 2
worker: 1
worker: 3
worker: 2
worker: 1
1 Context done: context canceled
2 Context done: context canceled
3 Context done: context canceled
Waited, to quit

若去掉主动cancel,则会等待超时自动取消:

2 Context done: context deadline exceeded
3 Context done: context deadline exceeded
1 Context done: context deadline exceeded

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/langs/996222.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存