云原生科学-Go探索发现: 管道(channel)
管道提供了协程间的通信方式,类似于unix管道,管道的数据只能同时被一个groutine访问。
只有一个缓冲区的管道,写入数据类似于加锁,读出数据类似于释放锁。
特性
初始化
声明变量 var ch make int
管道的值为nil
make()
ch1:=make(chan int) //无缓冲区
ch2:=make(chan string,5) //缓冲区
管道操作
操作符
ch:=make(chan int,10)
ch <- 1 //数据写入管道
d:=<-ch //读数据从管道
数据读写
管道没有缓冲区,读取数据会阻塞,直到有协程向管道写入数据。类似地向管道写入数据也会阻塞,直到有协程读数据。
管道有缓冲区但没有数据时,从管道读数据也会阻塞。缓冲区已满,写数据会阻塞。
值为nil管道,读写永久阻塞。
close()可以关闭管道,向关闭的管道写数据会panic,但关闭管道仍然可读。
x,ok:=ch
第一个变量表示读出的数据,第二变量表示是否成功读取了数据,第二个变量不用于指示管道的关闭状态。
第二个变量经常理解为管道的关闭状态,管道的关闭有两种状态,缓冲区已经没有数据,缓冲区还有数据。
- 第一种情况,会读到相应类型的零值,第二个变量为false.
- 第二种情况,第一个变量为读到的数据,第二个变量为true.
for-range
通过for-range可以持续地从管道读数据,当管道中没有数据时会阻塞当前协程,与读管道时的阻塞处理机制一样。 当管道被关闭,for-range读取完缓冲区的数据后优雅地结束,不需要通过val,ok:=<-ch
形式判断ok
的布尔值。
数据结构
type hchan struct {
// 队列中剩余的元素个数
qcount uint
// 队列长度,即缓冲区长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// 每个元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
chan内部实现了一个环形队列作为其缓冲区,队列的长度是在创建chan时制定的。
- buf 指向队列内存
- dataqsiz 队列长度
等待队列
从管道读数据时,如果管道缓冲区为空或没有缓冲区,则当前协程会被阻塞,并被加入recvq队列。写数据时,缓冲区已满或没有缓冲区,则当前协程会阻塞,并加入sendq队列。
处于等待队列的中的协程会在其他协程操作管道时被唤醒。
example
package main
import (
"fmt"
"sync"
"time"
//"time"
)
var p chan int
var wg sync.WaitGroup
func get() {
defer wg.Done()
//for-range 可以持续的从管道读出数据,管道没有数据会阻塞当前协程。
//即使管道被关闭,也会优雅的结束,不会读零值。
for x := range p {
fmt.Println("get:", x)
}
}
func get2() {
defer wg.Done()
//for range 实现方式 通过判断ok来确认数据是否取完
for {
x, ok := <-p
if !ok {
//只有p已经关闭 取到零值才是false
break
}
fmt.Println("get:", x)
}
}
//ch struct{} 空结构体占的空间很小
func push() {
defer wg.Done()
for i := 0; i < 10; i++ {
p <- i
fmt.Println("push data:", i)
time.Sleep(2 * time.Millisecond)
}
//一定要关 否则会死锁
close(p)
}
func main() {
p = make(chan int, 1)
wg.Add(2)
go push()
go get()
wg.Wait()
}
nil channel 例子
nil管道的作用是使case没有多余的数据可读,它也被永久阻塞。
package main
import (
"fmt"
"math/rand"
"time"
)
// 不断向channel c中发送[0,10)的随机数
func send(c chan int) {
for {
c <- rand.Intn(10)
}
}
func add(c chan int) {
sum := 0
// 1秒后,将向t.C通道发送时间点,使其可读
t := time.NewTimer(1 * time.Second)
for {
// 一秒内,将一直选择第一个case
// 一秒后,t.C可读,将选择第二个case
// c变成nil channel后,两个case分支都将一直阻塞
select {
case input := <-c:
// 不断读取c中的随机数据进行加总
sum = sum + input
case <-t.C:
c = nil
fmt.Println(sum)
}
}
}
func main() {
c := make(chan int)
go add(c)
go send(c)
// 给3秒时间让前两个goroutine有足够时间运行
time.Sleep(3 * time.Second)
}
文档信息
- 本文作者:Beast
- 本文链接:https://beastpu.github.io/2021/08/18/go-06/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)