golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。但是goroutine泄漏引发的血案,想必各位gopher都经历过,通过协程池限制goroutine数一个有效避免泄漏的手段,但是自己手动实现一个协程池,总是会兼顾不到各种场景,比如释放,处理panic,动态扩容等。那么ants是公认的优秀实现协程池。
ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果
学习一个库先从结构看起吧,pool、pool_func、ants初始化一个pool等操作都在这里
编辑切换为居中
ants库代码结构
介绍完了主要的库文件后,我们进行逐个的了解,具体的使用,我们可以结合官方的使用案例进行了解,这里就不进行展开了。
创建Pool对象需调用ants.NewPool(size, options)函数,返回一个pool的指针
先看Pool的接口,对我们创建的Pool先做个初步印象
编辑切换为居中
Pool结构
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
if size <= 0 {
size = -1
}
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically.
go p.purgePeriodically()
return p, nil
}
ants.NewPool创建Pool过程
ants.PoolWithFunc创建PoolWithFunc和New.Pool整体的结构很像,多了个poolFunc func(interface{})字段,也就是提交到池子的函数,然后workers的类型不一样
可以查看出pool中的worker在整个流程起着很重要的作用,也就是ants中为每个任务都是由 worker 对象来处理的,每个work都会创建一个goroutine来处理任务,ants中的worker结构如下
type goWorker struct {
//work的所属者
pool *Pool
//任务通道,通过这个发送给goWorker
task chan func()
//将work放入到队列时更新
recycleTime time.Time
}
从ants.Pool创建对象Pool的过程第四步可以看出,通过newWorkerArray创建workers,因为workerArray是个接口,有如下方法。
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
通过newWorkerArray,返回实现了workerArray接口的workerStack,这里newWorkerArray其实是用了个工厂方法来实现的,根据传入的类型,并不需要知道具体实现了接口的结构体,只要实现了workerArray接口就可以返回实现者的结构体,然后调用具体的实现
Submit(task func())接收一个func作为参数,将task通过通道task将类型为func的函数给到goWorker,然后调用retrieveWorker返回一个可用的worker给task
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
执行过程分析:
func (w *goWorker) run() {
w.pool.incRunning()
go func() {
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
释放和重启Pool分别调用了Release和Reboot,这两个函数都在ants.Pool这个文件中可以找到,具体实现这里做个简单说明
下面这个是NewPool变量workerCachesyn类型sync.Pool创建goWorker对象的代码
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
workerChanCap作为容量,这个变量定义在ants.go文件中的定义如下:
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()
ants参考了著名的 Web框架fasthttp的实现。当GOMAXPROCS为 1时(即操作系统线程数为1),向通道task发送会挂起发送 goroutine,将执行流程转向接收goroutine,这能提升接收处理性能。如果GOMAXPROCS大于1,ants使用带缓冲的通道,为了防止接收 goroutine 是 CPU密集的,导致发送 goroutine 被阻塞。
在NewPool中lock,其实给lock初始化了一个自旋锁,这里是利用atomic.CompareAndSwapUint32()这个原子操作实现的,在加锁失败后不会等待,而是继续尝试,提高了加锁减锁的性能
在开发中刚好遇到需要ants,这次也做个记录作为分享,其实慢慢的会发现三方库的xx_test用例是最好的学习例子,希望能和大家一起知其然知其所以然,加油!
参考文档