迹忆客 专注技术分享

当前位置:主页 > 学无止境 > 编程语言 >

Go Worker pool 工作池实现详解

作者:迹忆客 最近更新:2021/11/29 浏览次数:

Channel缓冲的重要用途之一是实现工作池。

通常,工作池是等待分配给它们的任务的线程的集合。 一旦他们完成分配的任务,他们就会再次为下一个任务提供服务。

我们将使用缓冲通道实现一个工作池。 我们的工作池将执行计算输入数字的数字总和的任务。 例如,如果传递了 234,则输出将为 9 (2 + 3 + 4)。 工作池的输入将是一个伪随机整数列表。

下面是我们的工作池的核心功能

  • 创建一个 Goroutines 池,它侦听输入缓冲通道,等待分配作业
  • 将作业添加到输入缓冲通道
  • 作业完成后将结果写入输出缓冲通道
  • 从输出缓冲通道读取和打印结果

我们将逐步实现这个程序,以使其更易于理解。

第一步将是创建表示作业和结果的结构体

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

每个 Job 结构体都有一个 id 和一个 randomno ,必须为其计算各个数字的总和。

Result 结构体有一个 job 字段,该字段是在 sumofdigits 字段中保存结果(单个数字的总和)的作业。

下一步是创建用于接收作业和写入输出的缓冲通道。

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)  

Worker Goroutines 在 jobs 缓冲通道上监听新任务。 任务完成后,将结果写入 results 缓冲通道。

下面我们定义一个 digits 函数,该函数的主要功能是计算各个数字的总和并返回它。 我们将给这个函数添加一个 2 秒的睡眠,只是为了模拟这个函数需要一些时间来计算结果的事实。

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

接下来我们定义一个 worker 函数。

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

上述函数创建一个从 jobs 通道读取数据的工作协程,使用当前作业和 digits 函数的返回值创建一个 Result 结构体,然后将结果写入 results 缓冲通道。 此函数将 WaitGroup wg 作为参数,当所有作业完成后,它将调用 Done() 方法。

createWorkerPool 函数将创建一个 worker Goroutines 池。

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

上面的函数将要创建的 worker 数量作为参数。 它在创建 Goroutine 之前调用 wg.Add(1) 来增加 WaitGroup 计数器。 然后它通过将 WaitGroup wg 的指针传递给 worker 函数来创建 worker Goroutines。 在创建所需的 Goroutines 后,它通过调用 wg.Wait() 等待所有 Goroutines 完成它们的执行。 在所有 Goroutines 执行完毕后,它关闭 results 通道,因为所有 Goroutines 都已完成它们的执行,并且没有其他的协程会进一步写入 results 通道。

现在我们已经准备好工作池,让我们继续定义一个用来进行分配的函数。

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

上面的 allocate 函数以要创建的任务的数量作为输入参数,生成最大值为998的伪随机数,以随机数和for循环计数器i为id创建Job结构体,然后将它们写入 jobs 通道。 它在写入所有作业后关闭 jobs 通道。

下一步是创建读取 results 通道并打印输出的函数。

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

result 函数读取 results 通道并打印 Job ID,输入的随机数,以及随机数的数字总和。 result 函数还将 接收done 通道作为参数,一旦它打印了所有结果,它就会写入该通道。

我们现在已经准备好了一切。 让我们继续完成最后一步,编写 main() 函数调用所有这些函数。

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

我们首先保存程序的执行开始时间,在最后我们计算 endTime 和 startTime 之间的时间差并显示程序花费的总时间。 这是必要的,因为我们将通过改变 Goroutines 的数量来做一些基准测试。

noOfJobs 设置为 100,然后调用 allocate 将作业添加到 jobs 通道。

然后创建 done 通道并将其传递给 result Goroutine,以便它可以开始打印输出并在打印完所有内容后通知主协程。

最后,通过调用 createWorkerPool 函数创建了一个包含 10 个 job Goroutines 的池,然后 main 在 done 通道上等待所有要打印的结果。

下面是完整的程序。

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

请在本地机器上运行此程序,以获得更准确的总时间计算。

上面程序执行结果如下

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.025220391 seconds  

上面程序总共将打印 100 行,对应 100 个作业,最后将在最后一行打印程序运行所需的总时间。 没个人的输出将与这里的结果不同,因为 Goroutine 可以以任何顺序运行,并且总时间也会因硬件而异。 就我而言,程序完成大约需要 20 秒。

Go 工作池示例运行时间

现在让我们将主函数中的 noOfWorkers 增加到 20。我们已经将 worker 的数量增加了一倍。 由于 worker Goroutines 增加了(准确地说是翻了一番),程序完成所需的总时间应该减少(准确地说是减少一半)。

go 工作池示例2运行时间

在我的机器上运行时间是 10.00877495。

现在我们可以理解,随着 worker Goroutine 数量的增加,完成作业所需的总时间减少。我们可以将 main 函数中的 noOfJobs 和 noOfWorkers 设置为不同的值并分析运行结果。

除非注明转载,本站文章均为原创或翻译,欢迎转载,转载请以链接形式注明出处

本文地址:

迹忆客

专注技术分享,项目实战分享!

技术宅 乐于分享 7年编程经验
社交账号
  • https://www.github.com/onmpw
  • qq:1244347461

热门文章

教程更新

热门标签

Go