golang设计一个goroutine池

Posted by kzdgt on Tuesday, May 16, 2023

请问你如何实现一个goroutine池?并描述一下其实现原理

具体实现上,可以使用带缓冲通道限制goroutine的数量,并使用sync.WaitGroup等工具调度运行的goroutine。当需要执行任务时,如果池中有空闲的goroutine则由其中一个goroutine接受任务并运行;如果没有空闲的goroutine则等待直到有goroutine执行完毕并释放出空闲状态。该方法可以有效地复用goroutine,高效利用系统资源。同时,还可以控制goroutine的数量及节省goroutine创建及销毁的开销。

package main

import (
    "fmt"
    "sync"
)

type ThreadPool struct {
    jobs chan func()
    wg   sync.WaitGroup
}

// NewThreadPool 返回一个goroutine池
func NewThreadPool(numWorkers int) *ThreadPool {
    tp := &ThreadPool{
        jobs: make(chan func()),
    }
    tp.wg.Add(numWorkers)
    for i := 0; i < numWorkers; i++ {
        go func() {
            defer tp.wg.Done()
            for job := range tp.jobs {
                job()
            }
        }()
    }
    return tp
}

// AddJob 添加一个工作任务到goroutine池
func (tp *ThreadPool) AddJob(job func()) {
    tp.jobs <- job
}

// Wait 等待所有任务结束并关闭goroutine池
func (tp *ThreadPool) Wait() {
    close(tp.jobs)
    tp.wg.Wait()
}

func main() {
    // 初始化一个goroutine池,10个并发goroutine
    tp := NewThreadPool(10)
    
    // 添加5个工作任务
    for i := 0; i < 5; i++ {
        n := i
        tp.AddJob(func() {
            fmt.Println("Processing job", n)
        })
    }
    
    // 等待所有任务完成
    tp.Wait()
}

上面是一种较为简单的实现,还有另外一种实现,也可以参考

package main

import "sync"

type job struct {
	payload Payload
	handler JobHandler
}

type JobHandler func(payload Payload)

type Payload interface{}

type Pool struct {
	jobs         chan job
	workers      int
	wait         sync.WaitGroup
	errorHandler func(error)
}

func NewPool(workers int, errorHandler func(error)) *Pool {
	return &Pool{
		jobs:         make(chan job),
		workers:      workers,
		errorHandler: errorHandler,
	}
}

func (p *Pool) Start() {
	for i := 0; i < p.workers; i++ {
		go p.worker()
	}
}

func (p *Pool) Job(payload Payload, handler JobHandler) {
	p.wait.Add(1)
	p.jobs <- job{payload, handler}
}

func (p *Pool) Shutdown() {
	close(p.jobs)
	p.wait.Wait()
}

func (p *Pool) worker() {
	defer p.wait.Done()

	for {
		select {
		case j, ok := <-p.jobs:
			if !ok {
				return
			}

			j.handler(j.payload)
		}
	}
}

「真诚赞赏,手留余香」

kzdgt Blog

真诚赞赏,手留余香

使用微信扫描二维码完成支付