GO如何实现协程池管理?本篇文章小编给大家分享一下GO实现协程池管理代码方法,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
使用channel实现协程池
通过 Channel 实现 Goroutine Pool,缺点是会造成协程的频繁开辟和注销,但好在简单灵活通用。
package main import ( "fmt" "io/ioutil" "net/http" "sync" ) // Pool goroutine Pool type Pool struct { queue chan int wg *sync.WaitGroup } // New 新建一个协程池 func New(size int) *Pool { if size <= 0 { size = 1 } return &Pool{ queue: make(chan int, size), wg: &sync.WaitGroup{}, } } // Add 新增一个执行 func (p *Pool) Add(delta int) { // delta为正数就添加 for i := 0; i < delta; i++ { p.queue <- 1 } // delta为负数就减少 for i := 0; i > delta; i-- { <-p.queue } p.wg.Add(delta) } // Done 执行完成减一 func (p *Pool) Done() { <-p.queue p.wg.Done() } func (p *Pool) Wait() { p.wg.Wait() } func main() { // 这里限制100个并发 pool := New(100) // sync.WaitGroup{} //假设需要发送1000万个http请求,然后我并发100个协程取完成这件事 for i := 0; i < 10000000; i++ { pool.Add(1) //发现已存在100个人正在发了,那么就会卡住,直到有人完成了宣布自己退出协程了 go func(i int) { resp, err := http.Get("https://www.baidu.com") if err != nil { fmt.Println(i, err) } else { defer resp.Body.Close() result, _ := ioutil.ReadAll(resp.Body) fmt.Println(i, string(result)) } pool.Done() }(i) } pool.Wait() }
消费者模式实现协程池
频繁对协程开辟与剔除,如果对性能有着很高的要求,建议优化成固定数目的协程取 channel 里面取数据进行消费,这样可以避免协程的创建与注销。
package main import ( "fmt" "strconv" "sync" ) // 任务对象 type task struct { Production Consumer } // 设置消费者数目,也就是work pool大小 func (t *task) setConsumerPoolSize(poolSize int) { t.Production.Jobs = make(chan *Job, poolSize*10) t.Consumer.WorkPoolNum = poolSize } // 任务数据对象 type Job struct { Data string } func NewTask(handler func(jobs chan *Job) (b bool)) (t *task) { t = &task{ Production: Production{Jobs: make(chan *Job, 100)}, Consumer: Consumer{WorkPoolNum: 10, Handler: handler}, } return } type Production struct { Jobs chan *Job } func (c Production) AddData(data *Job) { c.Jobs <- data } type Consumer struct { WorkPoolNum int Handler func(chan *Job) (b bool) Wg sync.WaitGroup } // 异步开启多个work去处理任务,但是所有work执行完毕才会退出程序 func (c *Consumer) disposeData(data chan *Job) { for i := 0; i <= c.WorkPoolNum; i++ { c.Wg.Add(1) go func() { defer func() { c.Wg.Done() }() c.Handler(data) }() } c.Wg.Wait() } func main() { // 实现一个用于处理数据的闭包,实现业务代码 consumerHandler := func(jobs chan *Job) (b bool) { for jobs := range jobs { fmt.Println(jobs) } return } // new一个任务处理对象 t := NewTask(consumerHandler) t.setConsumerPoolSize(500) // 500个协程同时消费 // 根据自己的业务去生成数据通过AddData方法添加数据到生成channel,这里是100万条数据 go func() { for i := 0; i < 1000000; i++ { job := new(Job) iStr := strconv.Itoa(i) job.Data = "定义任务数据格式" + iStr t.AddData(job) } }() // 消费者消费数据 t.Consumer.disposeData(t.Production.Jobs) }