Fix corner cases and improve benchmarks section

This commit is contained in:
alitto
2020-06-07 10:53:27 -03:00
parent dfbb355ab5
commit 5f7cd69f18
6 changed files with 43 additions and 16 deletions
+28 -12
View File
@@ -72,6 +72,7 @@ type WorkerPool struct {
purgerQuit chan struct{}
stopOnce sync.Once
waitGroup sync.WaitGroup
mutex sync.Mutex
}
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
@@ -123,7 +124,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
// Start minWorkers workers
if pool.minWorkers > 0 {
for i := 0; i < pool.minWorkers; i++ {
pool.startWorker(nil)
pool.maybeStartWorker(nil)
}
}
@@ -178,8 +179,7 @@ func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
}
// Start a worker as long as we haven't reached the limit
if !maxWorkersReached && p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) {
p.startWorker(task)
if ok := p.maybeStartWorker(task); ok {
return true
}
@@ -251,7 +251,7 @@ Purge:
select {
// Timed out waiting for any activity to happen, attempt to kill an idle worker
case <-idleTicker.C:
if p.Idle() > 0 {
if p.Idle() > 0 && p.Running() > p.minWorkers {
p.tasks <- nil
}
case <-p.purgerQuit:
@@ -265,28 +265,44 @@ Purge:
}
// startWorkers creates new worker goroutines to run the given tasks
func (p *WorkerPool) startWorker(firstTask func()) {
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
// Increment worker count
p.incrementWorkerCount()
// Attempt to increment worker count
if ok := p.incrementWorkerCount(); !ok {
return false
}
// Launch worker
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.panicHandler)
return true
}
func (p *WorkerPool) incrementWorkerCount() {
func (p *WorkerPool) incrementWorkerCount() bool {
// Increment worker count
// Attempt to increment worker count
p.mutex.Lock()
runningWorkerCount := p.Running()
// Execute the resizing strategy to determine if we can create more workers
if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers {
p.mutex.Unlock()
return false
}
atomic.AddInt32(&p.workerCount, 1)
p.mutex.Unlock()
// Increment waiting group semaphore
p.waitGroup.Add(1)
return true
}
func (p *WorkerPool) decrementWorkerCount() {
// Decrement worker count
p.mutex.Lock()
atomic.AddInt32(&p.workerCount, -1)
p.mutex.Unlock()
// Decrement waiting group semaphore
p.waitGroup.Done()
@@ -311,11 +327,11 @@ func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHan
// Restart goroutine
go worker(nil, tasks, idleWorkerCount, exitHandler, panicHandler)
} else {
// Handle normal exit
exitHandler()
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// Handle normal exit
exitHandler()
}
}()