channel

2021/08/18 go探索发现 共 4140 字,约 12 分钟

云原生科学-Go探索发现: 管道(channel)

管道提供了协程间的通信方式,类似于unix管道,管道的数据只能同时被一个groutine访问。

只有一个缓冲区的管道,写入数据类似于加锁,读出数据类似于释放锁。

特性

初始化

声明变量 var ch make int管道的值为nil

make()

ch1:=make(chan int) //无缓冲区
ch2:=make(chan string,5) //缓冲区

管道操作

操作符

ch:=make(chan int,10)
ch <- 1 //数据写入管道
d:=<-ch //读数据从管道
数据读写

管道没有缓冲区,读取数据会阻塞,直到有协程向管道写入数据。类似地向管道写入数据也会阻塞,直到有协程读数据。

管道有缓冲区但没有数据时,从管道读数据也会阻塞。缓冲区已满,写数据会阻塞。

值为nil管道,读写永久阻塞。

close()可以关闭管道,向关闭的管道写数据会panic,但关闭管道仍然可读。

  • x,ok:=ch 第一个变量表示读出的数据,第二变量表示是否成功读取了数据,第二个变量不用于指示管道的关闭状态。

第二个变量经常理解为管道的关闭状态,管道的关闭有两种状态,缓冲区已经没有数据,缓冲区还有数据。

  • 第一种情况,会读到相应类型的零值,第二个变量为false.
  • 第二种情况,第一个变量为读到的数据,第二个变量为true.

无缓冲 channel、缓冲 channel 关闭后读取(x, ok := <-ch),都不会阻塞,都返回零值和 false。

for-range

通过for-range可以持续地从管道读数据,当管道中没有数据时会阻塞当前协程,与读管道时的阻塞处理机制一样。 当管道被关闭,for-range读取完缓冲区的数据后优雅地结束,不需要通过val,ok:=<-ch形式判断ok的布尔值。

数据结构

type hchan struct {
	// 队列中剩余的元素个数
	qcount   uint
	// 队列长度,即缓冲区长度
	dataqsiz uint
	// 指向底层循环数组的指针
	// 只针对有缓冲的 channel
	buf      unsafe.Pointer
	// 每个元素大小
	elemsize uint16
	// chan 是否被关闭的标志
	closed   uint32
	// chan 中元素类型
	elemtype *_type // element type
	// 已发送元素在循环数组中的索引
	sendx    uint   // send index
	// 已接收元素在循环数组中的索引
	recvx    uint   // receive index
	// 等待接收的 goroutine 队列
	recvq    waitq  // list of recv waiters
	// 等待发送的 goroutine 队列
	sendq    waitq  // list of send waiters

	// 保护 hchan 中所有字段
	lock mutex
}

chan内部实现了一个环形队列作为其缓冲区,队列的长度是在创建chan时制定的。

  • buf 指向队列内存
  • dataqsiz 队列长度

循环队列实现(缓冲区)

循环队列就是一个定长数组配合两个指针(head、tail)。这两个指针随着入队(Enqueue)和出队(Dequeue)操作递增,但会对数组长度取模,使得指针回到头部,实现“环形”。这样就可以最大限度利用数组空间。

基本结构

用一个定长数组来存储数据。 设置两个指针(或索引)head(队首)和tail(队尾),分别用于出队和入队的位置。 入队操作(Enqueue):

把元素加入 queue[tail]。 然后 tail = (tail + 1) % cap。这样 tail 超过末尾时能“绕回”头部。 出队操作(Dequeue):

取出 queue[head] 元素。 然后 head = (head + 1) % cap,同理实现循环。 判空判满:

判空通常是 head == tail 并且队列没有元素(有两种实现方式,可能还要额外计数)。 判满常用“空一格”方案,即队列大小为 cap+1,tail 追上 head 时认为满,这样方便区分空和满的状态。 空间利用效率高:

应用场景:

特点:元素不断被加入和移除时,数组空间会被循环利用,无需数据整体移动。 非常适合生产者/消费者模式、高性能队列、Ring Buffer等需求场景。 Go语言的有缓冲channel底层缓冲实现就是循环队列。

手写例子

package main

import (
    "fmt"
)

type MyQueue struct {
    data        []int
    head, tail  int
    cap, length int
}

// 创建队列
func NewMyQueue(cap int) *MyQueue {
    return &MyQueue{
        data:   make([]int, cap+1), // +1 用于判满判空区分
        cap:    cap + 1, 
        head:   0,
        tail:   0,
        length: 0,
    }
}

// 判空
func (q *MyQueue) IsEmpty() bool {
    return q.head == q.tail
}

// 判满
func (q *MyQueue) IsFull() bool {
    return (q.tail+1)%q.cap == q.head
}

// 入队
func (q *MyQueue) Enqueue(x int) bool {
    if q.IsFull() {
        return false
    }
    q.data[q.tail] = x
    q.tail = (q.tail + 1) % q.cap
    q.length++
    return true
}

// 出队
func (q *MyQueue) Dequeue() (int, bool) {
    if q.IsEmpty() {
        return 0, false
    }
    val := q.data[q.head]
    q.head = (q.head + 1) % q.cap
    q.length--
    return val, true
}

// 查看队头元素
func (q *MyQueue) Front() (int, bool) {
    if q.IsEmpty() {
        return 0, false
    }
    return q.data[q.head], true
}

func main() {
    q := NewMyQueue(3)
    fmt.Println(q.Enqueue(1))
    fmt.Println(q.Enqueue(2))
    fmt.Println(q.Enqueue(3)) // 超出容量,入队失败
    fmt.Println(q.Dequeue())
    fmt.Println(q.Enqueue(4))
    for !q.IsEmpty() {
        val, _ := q.Dequeue()
        fmt.Println(val)
    }
}

取模过程

cap := 10
tail := 0

tail = (tail + 1) % cap // (0 + 1) % 10 = 1
fmt.Println(tail)       // 输出1

tail = (tail + 1) % cap // (1 + 1) % 10 = 2
fmt.Println(tail)       // 输出2

// 一直加到 9
tail = (tail + 1) % cap // (8 + 1) % 10 = 9
fmt.Println(tail)       // 输出9

tail = (tail + 1) % cap // (9 + 1) % 10 = 0
fmt.Println(tail)       // 输出0

只有当 tail = 9,再加 1 后,(9+1) % 10 = 0,这时 tail “回到了队头”,形成循环。

等待队列

从管道读数据时,如果管道缓冲区为空或没有缓冲区,则当前协程会被阻塞,并被加入recvq队列。写数据时,缓冲区已满或没有缓冲区,则当前协程会阻塞,并加入sendq队列。

处于等待队列的中的协程会在其他协程操作管道时被唤醒。

example

package main

import (
	"fmt"
	"sync"
	"time"
	//"time"
)

var p chan int
var wg sync.WaitGroup

func get() {
	defer wg.Done()
	//for-range 可以持续的从管道读出数据,管道没有数据会阻塞当前协程。
	//即使管道被关闭,也会优雅的结束,不会读零值。
	for x := range p {
		fmt.Println("get:", x)
	}
}
func get2() {

	defer wg.Done()
	//for range 实现方式  通过判断ok来确认数据是否取完
	for {
		x, ok := <-p
		if !ok {
			//只有p已经关闭 取到零值才是false
			break
		}
		fmt.Println("get:", x)
	}
}

//ch struct{} 空结构体占的空间很小
func push() {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		p <- i
		fmt.Println("push data:", i)
		time.Sleep(2 * time.Millisecond)
	}
	//一定要关 否则会死锁
	close(p)
}
func main() {
	p = make(chan int, 1)
	wg.Add(2)
	go push()
	go get()
	wg.Wait()

}

nil channel 例子

nil管道的作用是使case没有多余的数据可读,它也被永久阻塞。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 不断向channel c中发送[0,10)的随机数
func send(c chan int) {
	for {
		c <- rand.Intn(10)
	}
}

func add(c chan int) {
	sum := 0

	// 1秒后,将向t.C通道发送时间点,使其可读
	t := time.NewTimer(1 * time.Second)

	for {
		// 一秒内,将一直选择第一个case
		// 一秒后,t.C可读,将选择第二个case
		// c变成nil channel后,两个case分支都将一直阻塞
		select {
		case input := <-c:
			// 不断读取c中的随机数据进行加总
			sum = sum + input
		case <-t.C:
			c = nil
			fmt.Println(sum)
		}
	}
}

func main() {
	c := make(chan int)
	go add(c)
	go send(c)
	// 给3秒时间让前两个goroutine有足够时间运行
	time.Sleep(3 * time.Second)
}

文档信息

Search

    Table of Contents