channel

2021/08/18 go探索发现 共 2402 字,约 7 分钟

云原生科学-Go探索发现: 管道(channel)

管道提供了协程间的通信方式,类似于unix管道,管道的数据只能同时被一个groutine访问。

只有一个缓冲区的管道,写入数据类似于加锁,读出数据类似于释放锁。

特性

初始化

声明变量 var ch make int管道的值为nil

make()

ch1:=make(chan int) //无缓冲区
ch2:=make(chan string,5) //缓冲区

管道操作

操作符

ch:=make(chan int,10)
ch <- 1 //数据写入管道
d:=<-ch //读数据从管道
数据读写

管道没有缓冲区,读取数据会阻塞,直到有协程向管道写入数据。类似地向管道写入数据也会阻塞,直到有协程读数据。

管道有缓冲区但没有数据时,从管道读数据也会阻塞。缓冲区已满,写数据会阻塞。

值为nil管道,读写永久阻塞。

close()可以关闭管道,向关闭的管道写数据会panic,但关闭管道仍然可读。

  • x,ok:=ch 第一个变量表示读出的数据,第二变量表示是否成功读取了数据,第二个变量不用于指示管道的关闭状态。

第二个变量经常理解为管道的关闭状态,管道的关闭有两种状态,缓冲区已经没有数据,缓冲区还有数据。

  • 第一种情况,会读到相应类型的零值,第二个变量为false.
  • 第二种情况,第一个变量为读到的数据,第二个变量为true.

for-range

通过for-range可以持续地从管道读数据,当管道中没有数据时会阻塞当前协程,与读管道时的阻塞处理机制一样。 当管道被关闭,for-range读取完缓冲区的数据后优雅地结束,不需要通过val,ok:=<-ch形式判断ok的布尔值。

数据结构

type hchan struct {
	// 队列中剩余的元素个数
	qcount   uint
	// 队列长度,即缓冲区长度
	dataqsiz uint
	// 指向底层循环数组的指针
	// 只针对有缓冲的 channel
	buf      unsafe.Pointer
	// 每个元素大小
	elemsize uint16
	// chan 是否被关闭的标志
	closed   uint32
	// chan 中元素类型
	elemtype *_type // element type
	// 已发送元素在循环数组中的索引
	sendx    uint   // send index
	// 已接收元素在循环数组中的索引
	recvx    uint   // receive index
	// 等待接收的 goroutine 队列
	recvq    waitq  // list of recv waiters
	// 等待发送的 goroutine 队列
	sendq    waitq  // list of send waiters

	// 保护 hchan 中所有字段
	lock mutex
}

chan内部实现了一个环形队列作为其缓冲区,队列的长度是在创建chan时制定的。

  • buf 指向队列内存
  • dataqsiz 队列长度

等待队列

从管道读数据时,如果管道缓冲区为空或没有缓冲区,则当前协程会被阻塞,并被加入recvq队列。写数据时,缓冲区已满或没有缓冲区,则当前协程会阻塞,并加入sendq队列。

处于等待队列的中的协程会在其他协程操作管道时被唤醒。

example

package main

import (
	"fmt"
	"sync"
	"time"
	//"time"
)

var p chan int
var wg sync.WaitGroup

func get() {
	defer wg.Done()
	//for-range 可以持续的从管道读出数据,管道没有数据会阻塞当前协程。
	//即使管道被关闭,也会优雅的结束,不会读零值。
	for x := range p {
		fmt.Println("get:", x)
	}
}
func get2() {

	defer wg.Done()
	//for range 实现方式  通过判断ok来确认数据是否取完
	for {
		x, ok := <-p
		if !ok {
			//只有p已经关闭 取到零值才是false
			break
		}
		fmt.Println("get:", x)
	}
}

//ch struct{} 空结构体占的空间很小
func push() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		p <- i
		fmt.Println("push data:", i)
		time.Sleep(2 * time.Millisecond)
	}
	//一定要关 否则会死锁
	close(p)
}
func main() {
	p = make(chan int, 1)
	wg.Add(2)
	go push()
	go get()
	wg.Wait()

}

nil channel 例子

nil管道的作用是使case没有多余的数据可读,它也被永久阻塞。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 不断向channel c中发送[0,10)的随机数
func send(c chan int) {
	for {
		c <- rand.Intn(10)
	}
}

func add(c chan int) {
	sum := 0

	// 1秒后,将向t.C通道发送时间点,使其可读
	t := time.NewTimer(1 * time.Second)

	for {
		// 一秒内,将一直选择第一个case
		// 一秒后,t.C可读,将选择第二个case
		// c变成nil channel后,两个case分支都将一直阻塞
		select {
		case input := <-c:
			// 不断读取c中的随机数据进行加总
			sum = sum + input
		case <-t.C:
			c = nil
			fmt.Println(sum)
		}
	}
}

func main() {
	c := make(chan int)
	go add(c)
	go send(c)
	// 给3秒时间让前两个goroutine有足够时间运行
	time.Sleep(3 * time.Second)
}

文档信息

Search

    Table of Contents