请问你如何实现一个goroutine池?并描述一下其实现原理
具体实现上,可以使用带缓冲通道限制goroutine的数量,并使用sync.WaitGroup等工具调度运行的goroutine。当需要执行任务时,如果池中有空闲的goroutine则由其中一个goroutine接受任务并运行;如果没有空闲的goroutine则等待直到有goroutine执行完毕并释放出空闲状态。该方法可以有效地复用goroutine,高效利用系统资源。同时,还可以控制goroutine的数量及节省goroutine创建及销毁的开销。
package main
import (
"fmt"
"sync"
)
type ThreadPool struct {
jobs chan func()
wg sync.WaitGroup
}
// NewThreadPool 返回一个goroutine池
func NewThreadPool(numWorkers int) *ThreadPool {
tp := &ThreadPool{
jobs: make(chan func()),
}
tp.wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer tp.wg.Done()
for job := range tp.jobs {
job()
}
}()
}
return tp
}
// AddJob 添加一个工作任务到goroutine池
func (tp *ThreadPool) AddJob(job func()) {
tp.jobs <- job
}
// Wait 等待所有任务结束并关闭goroutine池
func (tp *ThreadPool) Wait() {
close(tp.jobs)
tp.wg.Wait()
}
func main() {
// 初始化一个goroutine池,10个并发goroutine
tp := NewThreadPool(10)
// 添加5个工作任务
for i := 0; i < 5; i++ {
n := i
tp.AddJob(func() {
fmt.Println("Processing job", n)
})
}
// 等待所有任务完成
tp.Wait()
}
上面是一种较为简单的实现,还有另外一种实现,也可以参考
package main
import "sync"
type job struct {
payload Payload
handler JobHandler
}
type JobHandler func(payload Payload)
type Payload interface{}
type Pool struct {
jobs chan job
workers int
wait sync.WaitGroup
errorHandler func(error)
}
func NewPool(workers int, errorHandler func(error)) *Pool {
return &Pool{
jobs: make(chan job),
workers: workers,
errorHandler: errorHandler,
}
}
func (p *Pool) Start() {
for i := 0; i < p.workers; i++ {
go p.worker()
}
}
func (p *Pool) Job(payload Payload, handler JobHandler) {
p.wait.Add(1)
p.jobs <- job{payload, handler}
}
func (p *Pool) Shutdown() {
close(p.jobs)
p.wait.Wait()
}
func (p *Pool) worker() {
defer p.wait.Done()
for {
select {
case j, ok := <-p.jobs:
if !ok {
return
}
j.handler(j.payload)
}
}
}
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付