带缓冲的 channel

前面介绍的 channel 作用更多是同步,在乒乓球游戏中,channel 是为了保证两个队员按照顺序执行。 但是并发最重要的功能是让多个运行实体(goroutine)能够同时做事情,而不是像打乒乓球那样, 一方接球的时候,另外一方只能在那等着什么都不做。

提到并发模型,最经典的就是生产者和消费者问题。 这个问题的描述是这样的:有一个生产数据的实体,称为生产者; 另外有一个消费数据的实体,称为消费者,它们之间通过固定大小的队列作为缓存来通信。生产者把产生数据, 并把数据放到队列中;同时,消费者从队列中取出数据,执行任务。而且,生产者在队列满的时候不会继续 往里面放数据,消费者在队列空的时候不能从里面读数据。

和乒乓球游戏不同,生产者和消费者不需要互相等待,因为在实现中,它们根本不知道对方的存在。 这个问题更多关注如何保证数据传输的正确性,生产者和消费者是解耦的,而且因为缓存队列的出现, 可能有不同的处理速率。

这一节我们看看如何用 go 语言来解决生产者和消费者的问题。

buffered channel

生产者和消费者之间通过缓存队列通信,不仅使它们功能解耦,不需要知道互相的存在。 在并发上还有一个重要的功能:不需要等待(准确的说,避免了大部分情况的等待,因为在缓存队列 满或者为空的时候还是会等待)。生产者制造出东西,没必要一定要等到有人来消费才能继续下去。

go 语言支持有缓存和无缓存的 channel, 前面几节讲到 channel 是通过 make 关键词创建的:

ch := make(chan int)

但其实,在创建 channel 的时候可以带上第二个参数,表示 channel 可以缓存多少个数据:

// 创建一个能缓存 10 个整数的 channel
ch := make(chan int, 10)

缓存 channel 的特点是,当往里面写入数据时,如果缓存还没有满,则会立即写入成功,不会阻塞等待; 当从里面读取数据的时候,如果缓存不空,则读取会立即成功,也不会阻塞等待。

无缓存的 channel 只是有缓存 channel 的特例,可以看做缓存长度为 0。也就是说,下面两种定义是等价的:

// 创建一个无缓存的 channel
ch := make(chan int)

// 等价于,缓存长度为 0 的 channel
ch := make(chan int, 0)

cap 函数可以获取有缓存 channel 缓存区的长度:

capacity := cap(ch)

len 函数可以获取有缓存 channel 中当前有多少数据,但是在一个并发的程序中,这个数据是 一直处于快速变化的,因此只适合作为参考(比如日志、测试或者调优)。

举个我们取快递的例子,无缓存 channel 是快递员打电话给你,一定要等你亲手签收;有缓存 channel 更像是快递员直接把包裹放到前台或者快递柜,等你有空的时候自己取就行。只有当快递非常重要时( goroutine 之间同步非常重要),才会采取前者;更一般的情况(goroutine 之间只是为了交换数据,对 同步不感兴趣),我们倾向于后者。

channel 的方向性

乒乓球游戏中,双方队员都需要发球和接球,在代码中表现为要对 table channel 做读取数据和写入数据两种操作。 但是在生产者和消费者中,生产者只会往 channel 中写入数据,消费者只会从 channel 中读取数据,这种情况 在实际中很常见,channel 作为参数传递给函数时基本上都能确定它是只读还是只写的。

从安全性角度考虑,应该秉承最小权限原则,让生产者不能从 channel 中读取数据,让消费者不能从 channel 中写入数据。 为此,go 语言定义了单向 channel 类型,只暴露读写操作中的一个。比如 chan<- int 是 int 类型的写 channel, 只允许往里面写入数据,不允许从里面读取数据;相反的,<-chan int 是 int 类型的读 channel,只允许从里面 读取数据,不允许往里面写入数据。

**NOTE:**在编译的时候,go 就会检测单向 channel 是否被正常使用,避免了运行时可能产生的错误。

另外,因为 close 的作用是保证不会再往 channel 中写入数据(还记得吧,我们可以从已经关闭的 channel 中读取数据), 所以只有写 channel 才能调用 close 函数,关闭只读 channel 会导致编译错误。

细心的读者可能发现,一个完全只读或者完全只写的 channel 是没有实际意义的。在实际应用中, 更多的是创建一个双向的 channel,然后根据使用情况把它转换成只读或者只写的 channel。

生产者-消费者 go 语言解决方案

跳过一个生产者 VS 一个消费者,以及一个生产者 VS 多个消费者,和一个消费者 VS 多个生产者的情况, 我们直接来看多个生产者和多个消费者的情况,完整的代码如下:

package main 

import (
    "fmt"
    "sync"
    "time"
)

const (
    // NumOfProducers 表示要运行多少个生产者
    NumOfProducers = 3

    // NumOfConsumers  表示要运行多少消费者
    NumOfConsumers = 5
)

// Producer 生产者结构体,只有一个字段,用来标识生产者的 ID
type Producer struct {
    producerID int
}

// 构建函数,返回一个生产者指针对象
func newProducer(ID int) *Producer {
    return &Producer{
        producerID: ID,
    }
}

// Run 就是生产者的核心逻辑,产生数据,并放到 channel 中
func (p* Producer) Run(ch chan<- string){
    for i:=0; i<5; i++{
        fmt.Printf("producer %d put data %d\n", p.producerID, i)
        data := fmt.Sprintf("data %d from producer %d", i, p.producerID)
        ch <- data

		// 休息一段时间模拟生产者工作时间花费
        time.Sleep(time.Microsecond * 10)
    }
}

// Consumer  消费者结构体,也有一个标识消费者身份的 ID
type Consumer struct {
    consumerID int
}

func newConsumer(ID int) *Consumer {
    return &Consumer{
        consumerID: ID,
    }
}

// Run 消费者的核心逻辑:不断从 channel 中读取数据进行处理
func (c *Consumer) Run(ch <-chan string){
    for {
        data, ok := <- ch
        if !ok {
            fmt.Printf("consumer %d: detect channel close\n", c.consumerID)
            return
        }

        fmt.Printf("consumer %d got: %s\n", c.consumerID, data)
        time.Sleep(time.Microsecond * 10)
    }
}

func main(){
    buffer := make(chan string, 10)

    // 以 goroutine 运行多个生产者
    prodWg := sync.WaitGroup{}
    prodWg.Add(NumOfProducers)
    for i:=0; i<NumOfProducers;i++ {
        go func(ID int){
            p := newProducer(ID)
            p.Run(buffer)
            prodWg.Done()
        }(i+1)
    }

	// 以 goroutine 运行多个消费者
    consumerWg := sync.WaitGroup{}
    consumerWg.Add(NumOfConsumers)
    //run consumers
    for i:=0; i<NumOfConsumers; i++{
        go func(ID int){
            c := newConsumer(ID)
            c.Run(buffer)
            consumerWg.Done()
        }(i+1)
    }

    // 等生产者都运行完成,则关闭 channel
    prodWg.Wait()
    close(buffer)

    // 消费者任务完成后,则退出程序
    consumerWg.Wait()
    fmt.Printf("exit...\n")
}

这段程序中,我们分别把生产者和消费者都封装为一个结构体,每个结构体有一个用来标识身份的 ID。 然后各自定义了 Run 方法接受 channel 作为参数,运行核心的逻辑。

在实际的程序中,生产者和消费者很可能会一直运行,不会自动退出,除非程序被手动结束。而在我们的例子中, 为了让运行结束,我们让每个生产者只生产特定数量的数据;然后关闭 channel,当消费者读取数据发现 channel 关闭了,就知道已经没有数据要处理,因此也会退出。

可以看到,我们只创建一个 channel,然后传递给不同的 goroutine 使用,而且这个 channel 是有缓存的,可以 存放 10 个数据:

buffer := make(chan string, 10)

其次,为了等待生产者和消费者 goroutine 运行完成,我们封装了匿名函数。需要注意的是, 匿名函数接受 ID 作为参数,每次调用会把 i+1 传递过去,这是函数闭包的特性;如果直接使用下面的代码:

for i:=0; i<NumOfProducers;i++ {
    go func(){
        p := newProducer(i+1)
        p.Run(buffer)
        prodWg.Done()
    }()
}

不使用参数传递,而是直接使用 i+1,那么所有的生产者内部变量 producerID 其实都指向同一个值,也就是说它们的 ID 将会一样。

最后,我们在 main 函数中创建的 channel 是双向的,但是生产者接收的 channel 是只写的,而消费者接受的 channel 是只读的。这是因为在做参数传递时,go 会自动做类型转换。

运行以上程序,可能会得到类似下面的结果:

producer 2 put data 0
producer 1 put data 0
producer 3 put data 0
consumer 2 got: data 0 from producer 3
consumer 1 got: data 0 from producer 2
consumer 5 got: data 0 from producer 1
producer 1 put data 1
consumer 3 got: data 1 from producer 1
producer 3 put data 1
consumer 4 got: data 1 from producer 3
producer 2 put data 1
consumer 2 got: data 1 from producer 2
producer 3 put data 2
producer 1 put data 2
consumer 1 got: data 2 from producer 3
producer 2 put data 2
consumer 3 got: data 2 from producer 2
consumer 5 got: data 2 from producer 1
producer 3 put data 3
consumer 4 got: data 3 from producer 3
producer 1 put data 3
consumer 2 got: data 3 from producer 1
producer 2 put data 3
consumer 1 got: data 3 from producer 2
producer 3 put data 4
producer 1 put data 4
consumer 5 got: data 4 from producer 1
consumer 3 got: data 4 from producer 3
producer 2 put data 4
consumer 4 got: data 4 from producer 2
consumer 2: detect channel close
consumer 3: detect channel close
consumer 5: detect channel close
consumer 1: detect channel close
consumer 4: detect channel close
exit...

因为并发程序的特定,每次运行的结果不一定完全相同。

充当缓存队列的 channel 有效解决了生产者和消费者之间数据同步的问题,而且也能缓解速率不平衡的问题。 但是后面这个问题并没有那么简单,如果生产者生产数据的速率明显大于消费者,缓存队列会经常是满的, 那么生产者就需要等待;反之如果消费者消费数据的速率明显大于生产者,缓存队列会经常为空,消费者需要等待。 一般情况下,应该事先预估两者的速率,然后配置合适的缓存大小以及生产者和消费者的数量。

Last updated