1. 为什么需要考虑控制协程数?

我们都知道协程(goroutine)是一种轻量级的线程,在单个线程中运行多个协程,从而实现高并发和高效的并行处理。

在Go语言中开一个协程非常方便,在需要通过协程来执行的函数时,直接在函数前加go关键字就可以,并且一个协程只需要消耗几kb的内存空间。

那为什么在使用协程的时候需要注意控制协程数呢?

  • 系统资源限制:协程创建也是需要消耗几kb的内存空间,虽然并不是很多,但是一个系统资源是有限的,如果一次性创建过多的协程但没有来得及执行完并回收这样就会造成内存大的大量消耗,影响程序系统的性能和稳定性。

  • 协程调度器:在go中协程是基于GMP模型运行,在模型GMP中G就是指协程,P是指调度器,M是指系统线程,当我们创建一个协程会先写入到协程的队列中由P调度分配给M执行,过程中调度器P会根据一定的策略来分配协程的执行时间和资源。如果同时创建大量的协程,会导致调度器的负担增加,从而影响系统的性能和稳定性。

  • 任务类型和执行时间:在我们的系统中会存在多种类型的任务,而任务在运行中需要的协程数和执行行时间是不同的。例如,I/O密集型任务需要较多的协程来处理,而CPU密集型任务需要较少的协程来处理。如果同时创建大量的协程,会导致系统的负载不均衡,从而影响系统的性能和稳定性。因此,需要根据任务类型和执行时间来控制协程数,以保持系统的负载均衡

1.1. 举个栗子:如下代码

示例1

package main


import (
    "fmt"
    "math"
    "time"
)

func main() {
    maxCount := math.MaxInt64
    for i := 0; i < maxCount; i++ {
        go func(i int) {
           //各种各样的业务逻辑处理
            fmt.Printf("go func num: %d\n", i)
            time.Sleep(time.Second)
        }(i)
    }
}

对程序运行会发现系统抛出异常

panic: too many concurrent operations on a single file or socket (max 1048575)

异常信息中描述创建了太多的协程,超过了系统协程数支持的上限1048575,由此可以看出系统并不能无限创建协程。为什么是1048575了?这其实是对单个file/socket的并发操作个数超过了系统上限,这个是标准输出造成的,具体一点,就是文件句柄数量达到限制。

示例2:

package main


import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10000000; i++ {
        wg.Add(1)
        go func() {
        defer wg.Done()
            time.Sleep(2 * time.Minute)
        }()
    }
    wg.Wait()
    fmt.Println("main finished")
}

如果你的系统内存不多则会出现【errno 1455,即Out of Memory错误】因为默认每个 goroutine 占用 8KB 内存,那么一台8GB内存的机器大约能创建8GB/8KB = 1000000 个 goroutine,而我电脑的运行内存是16G。但在运行中可以看到我系统CPU和内存使用达到98%。

1.2. 总结

因此在工作中需要根据场景来控制协程数,避免系统资源的浪费和调度器的负担增加,保持系统的负载均衡,从而提高系统的性能和稳定性。

在go中控制协程数的方式有很多种如

  • 基于sync

  • 利用协程池

  • 基于channel实现

    等等,在本次内容中主要讲解基于channel实现的协程数控制

2. channel控制协程数的原理

在go语言中,channel是一种用于协程之间进行通信的机制。它可以在不同的协程之间传递数据,实现协程之间的同步和异步操作,为什么可以通过channel来控制协程数?

这和channel的特点离不开关系,我们可以把channel理解为一条管道,协程可以向管道中发送数据,也可以从管道中接收数据。如果管道中有数据,接收者就可以读取数据并进行处理;如果管道中没有数据,接收者就会阻塞等待,直到管道中有数据为止。同样地,发送者如果向管道中发送数据,如果管道已满,发送者就会阻塞等待,直到管道有空间为止.

利用这个特性,可以通过创建一个有缓冲的channel,并设置其缓冲区大小,来控制协程的数量。例如,如果创建一个缓冲区大小为n的channel,那么最多只有n个协程可以同时向该channel发送数据,否则发送协程会被阻塞,直到有其他协程从该channel接收数据。同样地,最多只有n个协程可以同时从该channel接收数据,否则接收协程会被阻塞,直到有其他协程向该channel发送数据。

因此,利用channel的阻塞特性,可以实现一种简单的协程调度方式,限制同时执行的协程数量,从而控制协程的数量。

3. 实践应用

接下来我们用channel来控制协程

package main

import (
    "fmt"
    "math"
    "time"
)

func main() {
    maxCount := math.MaxInt64
    maxConcurrent := 10 // 最大并发数为10

    ch := make(chan struct{}, maxConcurrent) // 创建一个容量为10的channel

    for i := 0; i < maxCount; i++ {
        ch <- struct{}{} // 在channel中放入一个空结构体
        go func(i int) {
            // 做一些各种各样的业务逻辑处理
            fmt.Printf("go func num: %d\n", i)
            time.Sleep(time.Second)
            <-ch // 从channel中取出一个空结构体
        }(i)
    }

}

输出

go func num: 1
go func num: 4
go func num: 3
go func num: 5
go func num: 6
go func num: 7
go func num: 8
go func num: 9
go func num: 2
go func num: 10
go func num: 11
go func num: 12
go func num: 13
......

创建了一个容量为10的channel,用于控制并发执行的协程数。在执行循环时,先将10个空结构体放入channel中,这样一开始就有10个协程可以同时执行。随着协程的执行,每个协程都会在执行完业务逻辑后从channel中取出一个空结构体,这样就有空余的协程可以继续执行。当channel中的空结构体全部被取出时,后续的协程将会阻塞在ch <- struct{}{}这一行代码处,直到有协程从channel中取出一个空结构体,才能继续执行。

通过这种方式,可以控制并发执行的协程数不超过10个,从而避免资源竞争和性能下降的问题。

原文:

作者:海马  创建时间:2023-11-23 09:03
最后编辑:海马  更新时间:2025-01-27 10:55