diff --git a/pond.go b/pond.go index 70ab373..986bde5 100644 --- a/pond.go +++ b/pond.go @@ -231,7 +231,7 @@ func (p *WorkerPool) TrySubmit(task func()) bool { func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { if task == nil { - return false + return } if p.Stopped() { @@ -239,7 +239,7 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { if mustSubmit { panic(ErrSubmitOnStoppedPool) } - return false + return } // Increment submitted and waiting task counters as soon as we receive a task @@ -256,35 +256,19 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { } }() - runningWorkerCount := p.RunningWorkers() - - // Attempt to dispatch to an idle worker without blocking - if runningWorkerCount > 0 && p.IdleWorkers() > 0 { - select { - case p.tasks <- task: - submitted = true - return - default: - // No idle worker available, continue - } - } - // Start a worker as long as we haven't reached the limit - if runningWorkerCount < p.maxWorkers { - if ok := p.maybeStartWorker(task); ok { - submitted = true - return - } + if submitted = p.maybeStartWorker(task); submitted { + return } if !mustSubmit { + // Attempt to dispatch to an idle worker without blocking select { case p.tasks <- task: submitted = true return default: // Channel is full and can't wait for an idle worker, so need to exit - submitted = false return } } @@ -414,15 +398,20 @@ func (p *WorkerPool) stopIdleWorker() { } } -// startWorkers creates new worker goroutines to run the given tasks +// maybeStartWorker attempts to create a new worker goroutine to run the given task. +// If the worker pool has reached the maximum number of workers or there are idle workers, +// it will not create a new worker. func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { - // Attempt to increment worker count if ok := p.incrementWorkerCount(); !ok { return false } - // Launch worker + if firstTask == nil { + atomic.AddInt32(&p.idleWorkerCount, 1) + } + + // Launch worker goroutine go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) return true @@ -451,31 +440,44 @@ func (p *WorkerPool) executeTask(task func()) { atomic.AddUint64(&p.successfulTaskCount, 1) } -func (p *WorkerPool) incrementWorkerCount() bool { +func (p *WorkerPool) incrementWorkerCount() (incremented bool) { - // Attempt to increment worker count - p.mutex.Lock() runningWorkerCount := p.RunningWorkers() - // Execute the resizing strategy to determine if we can create more workers - if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers { - p.mutex.Unlock() - return false + + // Reached max workers, do not create a new one + if runningWorkerCount >= p.maxWorkers { + return } - atomic.AddInt32(&p.workerCount, 1) - p.mutex.Unlock() - // Increment waiting group semaphore - p.workersWaitGroup.Add(1) + // Idle workers available, do not create a new one + if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 { + return + } - return true + p.mutex.Lock() + defer p.mutex.Unlock() + + // Execute the resizing strategy to determine if we can create more workers + incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) + + if incremented { + // Increment worker count + atomic.AddInt32(&p.workerCount, 1) + + // Increment waiting group semaphore + p.workersWaitGroup.Add(1) + } + + return } func (p *WorkerPool) decrementWorkerCount() { - // Decrement worker count p.mutex.Lock() + defer p.mutex.Unlock() + + // Decrement worker count atomic.AddInt32(&p.workerCount, -1) - p.mutex.Unlock() // Decrement waiting group semaphore p.workersWaitGroup.Done() @@ -502,9 +504,10 @@ func worker(context context.Context, firstTask func(), tasks <-chan func(), idle // We have received a task, execute it if firstTask != nil { taskExecutor(firstTask) + + // Increment idle count + atomic.AddInt32(idleWorkerCount, 1) } - // Increment idle count - atomic.AddInt32(idleWorkerCount, 1) for { select { diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index 093af9c..f81280e 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -597,3 +597,27 @@ func TestConcurrentStopAndWait(t *testing.T) { wg.Wait() } + +func TestSubmitToIdleWorker(t *testing.T) { + + pool := pond.New(6, 0, pond.MinWorkers(3)) + + assertEqual(t, 3, pool.RunningWorkers()) + + // Submit task + var doneCount int32 + for i := 0; i < 3; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Verify no new workers were started + assertEqual(t, 3, pool.RunningWorkers()) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, int32(3), atomic.LoadInt32(&doneCount)) +} diff --git a/resizer.go b/resizer.go index 4bee95d..fb72e4c 100644 --- a/resizer.go +++ b/resizer.go @@ -2,7 +2,6 @@ package pond import ( "runtime" - "sync/atomic" ) var maxProcs = runtime.GOMAXPROCS(0) @@ -25,8 +24,8 @@ var ( // ratedResizer implements a rated resizing strategy type ratedResizer struct { - rate int - hits int32 + rate uint64 + hits uint64 } // RatedResizer creates a resizing strategy which can be configured @@ -40,7 +39,7 @@ func RatedResizer(rate int) ResizingStrategy { } return &ratedResizer{ - rate: rate, + rate: uint64(rate), } } @@ -50,7 +49,7 @@ func (r *ratedResizer) Resize(runningWorkers, minWorkers, maxWorkers int) bool { return true } - hits := int(atomic.AddInt32(&r.hits, 1)) + r.hits++ - return hits%r.rate == 1 + return r.hits%r.rate == 1 }