golang语言实现生产消费模型

单一生产者,单channel,单一消费者

适用于吞吐量排序: 生产者<消费者<channel

生产消费模型

graph LR;

    生产者-->channel;
    channel-->消费者;

程序关闭时的时序图说明如何保证优雅停止消费和已经产出的消息处理完成

sequenceDiagram
    participant 操作系统
    participant go应用程序监听signal的协程
    participant go应用程序生产者的协程
    participant go应用程序消费者的协程
    participant go应用程序主协程
    操作系统->>go应用程序监听signal的协程: kill signal
    go应用程序监听signal的协程->>go应用程序生产者的协程: 通知停止生产
    go应用程序生产者的协程->>go应用程序消费者的协程: 生产者关闭所持有的channel
    go应用程序消费者的协程->>go应用程序主协程: 消费任务处理完毕

当接受到应用程序kill信号的时候,通知生产者停止生产,然后关闭生产者所拥有的channel。 消费者监听到消费的channel关闭的时候跳出任务处理。主线程阻塞至所有的消费者关闭完成则退出。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannel chan<- Message
	ctx         context.Context
}

func NewProducer(taskChannel chan<- Message, ctx context.Context) *Producer {
	p := &Producer{taskChannel: taskChannel,
		ctx: ctx}
	return p
}

func (p *Producer) Produce(name string) {
    random := rand.New(rand.NewSource(1024))
    for {
        select {
        // 判断是否要退出了
        case <-p.ctx.Done():
            log.Printf("%s produce break", name)
            // produce在监听到种终止事件之后关闭channel
            close(p.taskChannel)
            return
        default:
            // 模拟生产者获取数据
            num := random.Int()
            // 间隔一秒产出一个数据
            time.Sleep(1 * time.Second)
            p.taskChannel <- Message{id: num}
        }
    }
}

/*
   消费者
*/
type Consumer struct {
	taskChannel <-chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel <-chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
    for {
        select {
        case task, ok := <-c.taskChannel:
            // 模拟消费者处理数据
            if ok {
                log.Printf("%s consumer  deal: %d", name, task.id)
            } else {
                // 数据消费完成,且channel关闭的时候,通知主线程消费者处理管道中的数据完成
                log.Printf("%s consumer done ", name)
                goto flag
            }
        }
    }
    // 跳出多层循环
flag:
    // 消费者标识所有消息已经处理完成
    defer c.wg.Done()
}

func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	taskChannel := make(chan Message)
	producer := NewProducer(taskChannel, wc)
	consumer := NewConsumer(taskChannel, wg)

	// 一个生产者
	go producer.Produce("p")

	// 一个消费者
	wg.Add(1)
	go consumer.Consume("c")

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

运行程序,终止程序的日志。可以看到关闭时的顺序是生产者停止生产后消费者处理完消息,主协程退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
2023/10/31 21:50:26 main start
2023/10/31 21:50:27 c consumer  deal: 882526541242338811
2023/10/31 21:50:28 c consumer  deal: 6993343009486954757
2023/10/31 21:50:29 c consumer  deal: 3359683908485645107
2023/10/31 21:50:30 c consumer  deal: 5363318930287979427
2023/10/31 21:50:31 start deal kill signal
2023/10/31 21:50:31 end deal kill signal
2023/10/31 21:50:31 p produce break
2023/10/31 21:50:31 c consumer  deal: 7051871835526759457
2023/10/31 21:50:31 c consumer done 
2023/10/31 21:50:31 main end

单一生产者,单channel,多消费者

适用于吞吐量排序:消费者<生产者<channel 这个时候增加消费者的数量可以提高吞吐。

生产消费模型

graph LR;

    生产者-->channel;
    channel-->消费者1;
    channel-->消费者2;
    channel-->消费者3;

代码在单一消费者基础上修改,只需要多个开启多个消费者协程即可

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannel chan<- Message
	ctx         context.Context
}

func NewProducer(taskChannel chan<- Message, ctx context.Context) *Producer {
	p := &Producer{taskChannel: taskChannel,
		ctx: ctx}
	return p
}

func (p *Producer) Produce(name string) {
    random := rand.New(rand.NewSource(1024))
    for {
        select {
        // 判断是否要退出了
        case <-p.ctx.Done():
            log.Printf("%s produce break", name)
            // produce在监听到种终止事件之后关闭channel
            close(p.taskChannel)
            return
        default:
            // 模拟生产者获取数据
            num := random.Int()
            // 间隔一秒产出一个数据
            time.Sleep(1 * time.Second)
            p.taskChannel <- Message{id: num}
        }
    }
}

/*
   消费者
*/
type Consumer struct {
	taskChannel <-chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel <-chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
    for {
        select {
        case task, ok := <-c.taskChannel:
            // 模拟消费者处理数据
            if ok {
                log.Printf("%s consumer  deal: %d", name, task.id)
            } else {
                // 数据消费完成,且channel关闭的时候,通知主线程消费者处理管道中的数据完成
                log.Printf("%s consumer done ", name)
                goto flag
            }
        }
    }
    // 跳出多层循环
flag:
    // 消费者标识所有消息已经处理完成
    defer c.wg.Done()
}

func (c *Consumer) GoConsume(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		c.wg.Add(1)
		go func(goName string) {
			c.Consume(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	taskChannel := make(chan Message)
	producer := NewProducer(taskChannel, wc)
	consumer := NewConsumer(taskChannel, wg)

	// 一个生产者
	go producer.Produce("p")

	// 一个消费者
	consumer.GoConsume("c",3)

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

同样运行程序,终止程序的日志。可以看到关闭时的顺序是生产者停止生产后消费者处理完消息,主协程退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
2023/10/31 21:57:20 main start
2023/10/31 21:57:21 c-2 consumer  deal: 882526541242338811
2023/10/31 21:57:22 c-0 consumer  deal: 6993343009486954757
2023/10/31 21:57:23 c-1 consumer  deal: 3359683908485645107
2023/10/31 21:57:24 c-2 consumer  deal: 5363318930287979427
2023/10/31 21:57:25 c-0 consumer  deal: 7051871835526759457
2023/10/31 21:57:26 c-1 consumer  deal: 6221605816674811576
2023/10/31 21:57:26 start deal kill signal
2023/10/31 21:57:26 end deal kill signal
2023/10/31 21:57:27 p produce break
2023/10/31 21:57:27 c-2 consumer  deal: 5938020141159581395
2023/10/31 21:57:27 c-2 consumer done 
2023/10/31 21:57:27 c-0 consumer done 
2023/10/31 21:57:27 c-1 consumer done 
2023/10/31 21:57:27 main end

单一生产者,多channel,多消费者

  1. 适用于生产者速度大于消费者,且大于单个channel的时候。这时候提高channel的数量解决channel的瓶颈,同时可以提升消费者的数量。 吞吐量:消费者<channel<生产者
  2. 适用于生产者速度大于消费者,且需要按照一定规则保证处理顺序的时候。这个时候可以扩展channel数量,按照规则额hash到指定的channel,每个channel 绑定一个协程处理。吞吐量:消费者<生产者<channel

实际生产消费模型中一般是第二种情况居多。

生产消费模型

graph LR;

    生产者1-->channel1;
    生产者1-->channel2;
    生产者1-->channel3;
    channel1-->消费者1;
    channel2-->消费者2;
    channel3-->消费者3;

代码将一个生产者支持绑定多个channel即可,然后将channel分配给消费者。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannels []chan Message
	ctx          context.Context
}

func NewProducer(taskChannels []chan Message, ctx context.Context) *Producer {
	p := &Producer{taskChannels: taskChannels,
		ctx: ctx}
	return p
}

func (p *Producer) Produce(name string) {
    random := rand.New(rand.NewSource(1024))
    for {
        select {
        // 判断是否要退出了
        case <-p.ctx.Done():
            log.Printf("%s produce break", name)
            // produce在监听到种终止事件之后关闭channel
            for _, ch := range p.taskChannels {
                // 关闭所有持有的channel
                close(ch)
            }
            return
        default:
            // 模拟生产者获取数据
            num := random.Int()
            // 间隔一秒产出一个数据
            time.Sleep(1 * time.Second)

            p.taskChannels[num%len(p.taskChannels)] <- Message{id: num}
        }
    }
}

/*
   消费者
*/
type Consumer struct {
	taskChannel <-chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel <-chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
    for {
        select {
        case task, ok := <-c.taskChannel:
            // 模拟消费者处理数据
            if ok {
                log.Printf("%s consumer  deal: %d", name, task.id)
            } else {
                // 数据消费完成,且channel关闭的时候,通知主线程消费者处理管道中的数据完成
                log.Printf("%s consumer done ", name)
                goto flag
            }
        }
    }
    // 跳出多层循环
flag:
    // 消费者标识所有消息已经处理完成
    defer c.wg.Done()
}

func (c *Consumer) GoConsume(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		c.wg.Add(1)
		go func(goName string) {
			c.Consume(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	channelNum := 3

	taskChannels := make([]chan Message, channelNum)
	for i := range taskChannels {
		taskChannels[i] = make(chan Message)
	}
	// 创建持有多个channel的生产者
	producer := NewProducer(taskChannels, wc)

	// 创建多个消费者,每个消费者最多持有一个channel
	consumers := make([]*Consumer, len(taskChannels))
	for i := 0; i < len(taskChannels); i++ {
		consumers[i] = NewConsumer(taskChannels[i], wg)
	}

	// 启动生产者
	go producer.Produce("p")

	// 每个消费者的的数量
	consumerGoNum := 1
	// 启动消费者
	for i, consumer := range consumers {
		consumer.GoConsume("c"+strconv.Itoa(i), consumerGoNum)
	}


	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

程序运行输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
2023/11/01 11:28:35 main start
2023/11/01 11:28:36 c1-0 consumer  deal: 882526541242338811
2023/11/01 11:28:37 c2-0 consumer  deal: 6993343009486954757
2023/11/01 11:28:38 c1-0 consumer  deal: 3359683908485645107
2023/11/01 11:28:39 c0-0 consumer  deal: 5363318930287979427
2023/11/01 11:28:40 c2-0 consumer  deal: 7051871835526759457
2023/11/01 11:28:41 c1-0 consumer  deal: 6221605816674811576
2023/11/01 11:28:41 start deal kill signal
2023/11/01 11:28:41 end deal kill signal
2023/11/01 11:28:42 p produce break
2023/11/01 11:28:42 c0-0 consumer done 
2023/11/01 11:28:42 c2-0 consumer done 
2023/11/01 11:28:42 c1-0 consumer  deal: 5938020141159581395
2023/11/01 11:28:42 c1-0 consumer done 
2023/11/01 11:28:42 main end

单一生产者,多channel,单一消费者

适用于吞吐量:channel<生产者<消费者 的场景,提升channel的数量提高吞吐量

生产消费模型

graph LR;

    生产者1-->channel1;
    生产者1-->channel2;
    生产者1-->channel3;
    channel1-->消费者1;
    channel2-->消费者1;
    channel3-->消费者1;

代码实现上,将消费者支持绑定多个channel即可. 当消费者协程绑定的所有channel都close的时候主协程才能退出。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannels []chan Message
	ctx          context.Context
}

func NewProducer(taskChannels []chan Message, ctx context.Context) *Producer {
	p := &Producer{taskChannels: taskChannels,
		ctx: ctx}
	return p
}

func (p *Producer) Produce(name string) {
    random := rand.New(rand.NewSource(1024))
    for {
        select {
        // 判断是否要退出了
        case <-p.ctx.Done():
            log.Printf("%s produce break", name)
            // produce在监听到种终止事件之后关闭channel
            for _, ch := range p.taskChannels {
                // 关闭所有持有的channel
                close(ch)
            }
            return
        default:
            // 模拟生产者获取数据
            num := random.Intn(len(p.taskChannels))
            // 间隔一秒产出一个数据
            time.Sleep(1 * time.Second)

            p.taskChannels[num] <- Message{id: num}
        }
    }
}

/*
   消费者
*/
type Consumer struct {
	taskChannel []chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel []chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
    channelClose := make(map[int]bool, len(c.taskChannel))
    random := rand.New(rand.NewSource(1024))
    for {
        index := random.Intn(len(c.taskChannel))
        select {
        case task, ok := <-c.taskChannel[index]:
            // 模拟消费者处理数据
            if ok {
                log.Printf("%s consumer  deal: %d", name, task.id)
            } else {
                channelClose[index] = true
                if len(channelClose) == len(c.taskChannel) {
                    // 检查所持有的所有channel已经关闭
                    // 数据消费完成,且channel关闭的时候,通知主线程消费者处理管道中的数据完成
                    log.Printf("%s consumer done ", name)
                    goto flag
                }
            }
        default:
            // 当存在多个channel的时候,避免等待一个一直阻塞,尝试获取其他channel
        }
    }
    // 跳出多层循环
flag:
    // 消费者标识所有消息已经处理完成
    defer c.wg.Done()
}

func (c *Consumer) GoConsume(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		c.wg.Add(1)
		go func(goName string) {
			c.Consume(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	channelNum := 3

	taskChannels := make([]chan Message, channelNum)
	for i := range taskChannels {
		taskChannels[i] = make(chan Message)
	}
	// 创建持有多个channel的生产者
	producer := NewProducer(taskChannels, wc)

	// 创建持有多个channel的消费者
	consumer := NewConsumer(taskChannels, wg)

	// 启动生产者
	go producer.Produce("p")

	// 消费者的的数量
	consumerGoNum := 1
	// 启动消费者
	consumer.GoConsume("c", consumerGoNum)

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

程序运行输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
2023/11/01 12:23:58 main start
2023/11/01 12:23:59 c-0 consumer  deal: 0
2023/11/01 12:24:00 c-0 consumer  deal: 1
2023/11/01 12:24:01 c-0 consumer  deal: 2
2023/11/01 12:24:02 c-0 consumer  deal: 2
2023/11/01 12:24:03 start deal kill signal
2023/11/01 12:24:03 end deal kill signal
2023/11/01 12:24:03 c-0 consumer  deal: 0
2023/11/01 12:24:03 p produce break
2023/11/01 12:24:03 c-0 consumer done 
2023/11/01 12:24:03 main end

多生产者,单channel,单消费者

适用于吞吐量排序:生产者< <消费者<channel 生产者的数量可以提高吞吐量的同时避免channel和消费者的浪费

生产消费模型

graph LR;

    生产者1-->channel;
    生产者2-->channel;
    生产者3-->channel;
    生产者4-->channel;
    channel-->消费者1;
    channel-->消费者1;
    channel-->消费者1;

将代码支持多生产者创建即可,需要注意的是go进程接受到kill信号之后,要确保所有的生产者都接受到了cancel信号停止生产后再关闭channel避免消息的丢失和 生产者向已经关闭的channel中发送消息的情况出现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package main

import (
	"context"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

type Message struct {
	id int
}

/*
   生产者
*/
type Producer struct {
	taskChannels []chan Message
	ctx          context.Context
	// 用于等待所有的生产者全部接受到停止信号停止处理
	wg           *sync.WaitGroup
	// 所有生产者接受到停止信号的时候,只有一个生产者去关闭所有taskChannels
	once         *sync.Once
}

func NewProducer(taskChannels []chan Message, ctx context.Context) *Producer {
	p := &Producer{taskChannels: taskChannels,
		ctx:  ctx,
		wg:   &sync.WaitGroup{},
		once: &sync.Once{}}
	return p
}

func (p *Producer) Produce(name string) {
	random := rand.New(rand.NewSource(1024))
	for {
		select {
		// 判断是否要退出了
		case <-p.ctx.Done():
			log.Printf("%s produce break start", name)
			// 通知通知监听者组任务的协程我受到取消信号了
			p.wg.Done()
			// 等待所有协程都处理完成
			p.wg.Wait()
			log.Printf("%s produce break done", name)
			p.once.Do(func() {
				// produce在监听到种终止事件之后关闭channel,所有的produce只执行一次
				for _, ch := range p.taskChannels {
					// 关闭所有持有的channel
					close(ch)
				}
			})
			// 这个之后的代码是否能执行到就不一定了,因为channel关闭就可能导致主协程wait终止掉

			return
		default:
			// 模拟生产者获取数据
			num := random.Intn(len(p.taskChannels))
			// 间隔一秒产出一个数据
			time.Sleep(1 * time.Second)

			p.taskChannels[num] <- Message{id: num}
		}
	}
}

func (p *Producer) GoProduce(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		p.wg.Add(1)
		go func(goName string) {
			p.Produce(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

/*
   消费者
*/
type Consumer struct {
	taskChannel []chan Message
	wg          *sync.WaitGroup
}

func NewConsumer(taskChannel []chan Message, wg *sync.WaitGroup) *Consumer {
	c := &Consumer{taskChannel: taskChannel,
		wg: wg}
	return c
}

func (c *Consumer) Consume(name string) {
	channelClose := make(map[int]bool, len(c.taskChannel))
	random := rand.New(rand.NewSource(1024))
	for {
		index := random.Intn(len(c.taskChannel))
		select {
		case task, ok := <-c.taskChannel[index]:
			// 模拟消费者处理数据
			if ok {
				log.Printf("%s consumer  deal: %d", name, task.id)
			} else {
				// 每个生产者都去确认自己所消费的channel都已经关闭
				channelClose[index] = true
				if len(channelClose) == len(c.taskChannel) {
					// 检查所持有的所有channel已经关闭
					// 数据消费完成,且channel关闭的时候,通知主线程消费者处理管道中的数据完成
					log.Printf("%s consumer done ", name)
					goto flag
				}
			}
		default:
			// 当存在多个channel的时候,避免等待一个一直阻塞,尝试获取其他channel
		}
	}
	// 跳出多层循环
flag:
	// 消费者标识所有消息已经处理完成
	defer c.wg.Done()
}

func (c *Consumer) GoConsume(name string, goNum int) {
	for i := 0; i < goNum; i++ {
		c.wg.Add(1)
		go func(goName string) {
			c.Consume(goName)
		}(name + "-" + strconv.Itoa(i))
	}
}

func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	// 生产者的数量
	producerNum := 4

	// 管道的数量
	channelNum := 1

	// 消费者的的数量
	consumerGoNum := 1

	// channel创建
	taskChannels := make([]chan Message, channelNum)
	for i := range taskChannels {
		taskChannels[i] = make(chan Message)
	}
	// 生产者创建
	producer := NewProducer(taskChannels, wc)

	// 消费者创建
	consumer := NewConsumer(taskChannels, wg)

	// 启动生产者
	producer.GoProduce("p", producerNum)

	// 启动消费者
	consumer.GoConsume("c", consumerGoNum)

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

程序的运行日志

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2023/11/01 13:35:32 main start
2023/11/01 13:35:33 c-0 consumer  deal: 0
2023/11/01 13:35:33 c-0 consumer  deal: 0
2023/11/01 13:35:33 c-0 consumer  deal: 0
2023/11/01 13:35:33 c-0 consumer  deal: 0
2023/11/01 13:35:34 c-0 consumer  deal: 0
2023/11/01 13:35:34 c-0 consumer  deal: 0
2023/11/01 13:35:34 c-0 consumer  deal: 0
2023/11/01 13:35:34 c-0 consumer  deal: 0
2023/11/01 13:35:35 c-0 consumer  deal: 0
2023/11/01 13:35:35 c-0 consumer  deal: 0
2023/11/01 13:35:35 c-0 consumer  deal: 0
2023/11/01 13:35:35 c-0 consumer  deal: 0
2023/11/01 13:35:36 start deal kill signal
2023/11/01 13:35:36 end deal kill signal
2023/11/01 13:35:36 c-0 consumer  deal: 0
2023/11/01 13:35:36 p-0 produce break start
2023/11/01 13:35:36 c-0 consumer  deal: 0
2023/11/01 13:35:36 c-0 consumer  deal: 0
2023/11/01 13:35:36 c-0 consumer  deal: 0
2023/11/01 13:35:36 p-2 produce break start
2023/11/01 13:35:36 p-1 produce break start
2023/11/01 13:35:36 p-3 produce break start
2023/11/01 13:35:36 p-3 produce break done
2023/11/01 13:35:36 c-0 consumer done 
2023/11/01 13:35:36 p-2 produce break done
2023/11/01 13:35:36 p-1 produce break done
2023/11/01 13:35:36 p-0 produce break done
2023/11/01 13:35:36 main end

多生产者,单channel,多消费者

适用于吞吐量排序:生产者<消费者< <channel 或 消费者<生产者< <channel 这时候提高消费者和生产者的数量保证消费平衡,且减少channel资源的浪费

生产消费模型

graph LR;

    生产者1-->channel;
    生产者2-->channel;
    生产者3-->channel;
    生产者4-->channel;
    channel-->消费者1;
    channel-->消费者2;
    channel-->消费者3;

代码只需要将上述章节中的 consumerGoNum 变量改成3即可。

多生产者,多channel,单一消费者

适用于吞吐量排序: 生产者<channel< < 消费者

生产消费模型

graph LR;

    生产者1-->channel1;
    生产者2-->channel2;
    生产者3-->channel3;
    channel1-->消费者1;
    channel2-->消费者1;
    channel3-->消费者1;

不需要修改consumer和producer的代码,只需要修改main方法中编排生产者和消费者的逻辑即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	// 生产者的数量
	producerGoNum := 1

	// 管道的数量
	channelNum := 1

	// 消费者的的数量
	consumerGoNum := 1

	// new生产者的个数
	newProducerNum := 3

	// 创建生产者
	producers := make([]*Producer, newProducerNum)
	// all task channel
	var allTaskChannel []chan Message
	for i := 0; i < newProducerNum; i++ {
		// channel创建
		taskChannels := make([]chan Message, channelNum)
		for i := range taskChannels {
			taskChannels[i] = make(chan Message)
			allTaskChannel = append(allTaskChannel, taskChannels[i])
		}
		// 生产者创建
		producers[i] = NewProducer(taskChannels, wc)
	}

	// 消费者创建
	consumer := NewConsumer(allTaskChannel, wg)
	// 启动消费者
	consumer.GoConsume("c", consumerGoNum)

	// 启动生产者
	for i, producer := range producers {
		// 启动生产者
		producer.GoProduce("p"+strconv.Itoa(i), producerGoNum)
	}

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}

多生产者,多channel,多消费者

适用于生产者、消费者、channel之间需要按照特殊的比例配置才能达到吞吐能力平衡的状态,较少使用。因为大部分场景只需要保证所有消费者和channel的吞吐量 高于所有生产者即可,甚至还需要一些消费能力的冗余,因此用其它生产消费模型也能实现。

生产消费模型

graph LR;

    生产者1-->channel1;
    生产者1-->channel2;
    生产者2-->channel1;
    生产者2-->channel2;
    channel1-->消费者1;
    channel1-->消费者2;
    channel2-->消费者1;
    channel2-->消费者2;

代码实现只需要修改如下变量即可

1
2
3
4
5
6
7
producerNum := 2

// 管道的数量
channelNum := 2

// 消费者的的数量
consumerGoNum := 2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func main() {
	log.Print("main start")
	parent := context.Background()
	wc, cancelFunc := context.WithCancel(parent)

	go func() {
		signals := make(chan os.Signal)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
		// 这个协程 就阻塞这里等待结果
		<-signals
		// 监听进程kill信号,通知produce停止产出
		log.Printf("start deal kill signal")
		cancelFunc()
		log.Printf("end deal kill signal")
	}()

	// 用于判断所有消费者处理完成
	wg := &sync.WaitGroup{}

	// 生产者的数量
	producerNum := 2

	// 管道的数量
	channelNum := 2

	// 消费者的的数量
	consumerGoNum := 2

	// channel创建
	taskChannels := make([]chan Message, channelNum)
	for i := range taskChannels {
		taskChannels[i] = make(chan Message)
	}
	// 生产者创建
	producer := NewProducer(taskChannels, wc)

	// 消费者创建
	consumer := NewConsumer(taskChannels, wg)

	// 启动生产者
	producer.GoProduce("p", producerNum)

	// 启动消费者
	consumer.GoConsume("c", consumerGoNum)

	// 等待consumer完成
	wg.Wait()
	log.Print("main end")
}