diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1e81163..9d28663 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -21,7 +21,7 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - name: Test - run: go test -race -v ./ + run: make test codecov: name: Upload coverage report to Codecov runs-on: ubuntu-latest @@ -33,9 +33,9 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - name: Test - run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./ + run: make coverage - uses: codecov/codecov-action@v2 with: - files: ./coverage.txt + files: coverage.out fail_ci_if_error: true verbose: true diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2a9ba68 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +test: + go test -race -v ./ + +coverage: + go test -race -v -coverprofile=coverage.out -covermode=atomic ./ \ No newline at end of file diff --git a/pond.go b/pond.go index 307d710..9f16728 100644 --- a/pond.go +++ b/pond.go @@ -357,6 +357,9 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { p.tasksWaitGroup.Wait() } + // Reset worker count + p.resetWorkerCount() + // Terminate all workers & purger goroutine p.contextCancel() @@ -380,7 +383,7 @@ func (p *WorkerPool) purge() { select { // Timed out waiting for any activity to happen, attempt to stop an idle worker case <-idleTicker.C: - p.stopIdleWorker() + p.maybeStopIdleWorker() // Pool context was cancelled, exit case <-p.context.Done(): return @@ -388,34 +391,41 @@ func (p *WorkerPool) purge() { } } -// stopIdleWorker attempts to stop an idle worker by sending it a nil task -func (p *WorkerPool) stopIdleWorker() { - if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() { - p.tasks <- nil +// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task +func (p *WorkerPool) maybeStopIdleWorker() bool { + + if decremented := p.decrementWorkerCount(); !decremented { + return false } + + // Send a nil task to stop an idle worker + p.tasks <- nil + + return true } // maybeStartWorker attempts to create a new worker goroutine to run the given task. // If the worker pool has reached the maximum number of workers or there are idle workers, -// it will not create a new worker. +// it will not create a new one. func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { - if ok := p.incrementWorkerCount(); !ok { + if incremented := p.incrementWorkerCount(); !incremented { return false } if firstTask == nil { + // Worker starts idle atomic.AddInt32(&p.idleWorkerCount, 1) } // Launch worker goroutine - go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) + go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask) return true } // executeTask executes the given task and updates task-related counters -func (p *WorkerPool) executeTask(task func()) { +func (p *WorkerPool) executeTask(task func(), isFirstTask bool) { defer func() { if panic := recover(); panic != nil { @@ -424,60 +434,90 @@ func (p *WorkerPool) executeTask(task func()) { // Invoke panic handler p.panicHandler(panic) + + // Increment idle count + atomic.AddInt32(&p.idleWorkerCount, 1) } p.tasksWaitGroup.Done() }() + // Decrement idle count + if !isFirstTask { + atomic.AddInt32(&p.idleWorkerCount, -1) + } + // Decrement waiting task count atomic.AddUint64(&p.waitingTaskCount, ^uint64(0)) + // Execute task task() // Increment successful task count atomic.AddUint64(&p.successfulTaskCount, 1) + + // Increment idle count + atomic.AddInt32(&p.idleWorkerCount, 1) } -func (p *WorkerPool) incrementWorkerCount() (incremented bool) { +func (p *WorkerPool) incrementWorkerCount() bool { + + p.mutex.Lock() + defer p.mutex.Unlock() runningWorkerCount := p.RunningWorkers() // Reached max workers, do not create a new one if runningWorkerCount >= p.maxWorkers { - return + return false } // Idle workers available, do not create a new one if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 { - return + return false } - p.mutex.Lock() - defer p.mutex.Unlock() - - // Execute the resizing strategy to determine if we can create more workers - incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) - - if incremented { - // Increment worker count - atomic.AddInt32(&p.workerCount, 1) - - // Increment wait group - p.workersWaitGroup.Add(1) + // Execute the resizing strategy to determine if we should create more workers + if resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize { + return false } - return + // Increment worker count + atomic.AddInt32(&p.workerCount, 1) + + // Increment wait group + p.workersWaitGroup.Add(1) + + return true } -func (p *WorkerPool) decrementWorkerCount() { +func (p *WorkerPool) decrementWorkerCount() bool { p.mutex.Lock() defer p.mutex.Unlock() + if p.IdleWorkers() <= 0 || p.RunningWorkers() <= p.minWorkers || p.Stopped() { + return false + } + // Decrement worker count atomic.AddInt32(&p.workerCount, -1) - // Decrement wait group - p.workersWaitGroup.Done() + // Decrement idle count + atomic.AddInt32(&p.idleWorkerCount, -1) + + return true +} + +func (p *WorkerPool) resetWorkerCount() { + + p.mutex.Lock() + defer p.mutex.Unlock() + + // Reset worker count + atomic.StoreInt32(&p.workerCount, 0) + + // Reset idle count + atomic.StoreInt32(&p.idleWorkerCount, 0) } // Group creates a new task group @@ -506,45 +546,3 @@ func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, c cancel: cancel, }, ctx } - -// worker launches a worker goroutine -func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { - - defer func() { - // Decrement idle count - atomic.AddInt32(idleWorkerCount, -1) - - // Handle normal exit - exitHandler() - }() - - // We have received a task, execute it - if firstTask != nil { - taskExecutor(firstTask) - - // Increment idle count - atomic.AddInt32(idleWorkerCount, 1) - } - - for { - select { - case <-context.Done(): - // Pool context was cancelled, exit - return - case task, ok := <-tasks: - if task == nil || !ok { - // We have received a signal to quit - return - } - - // Decrement idle count - atomic.AddInt32(idleWorkerCount, -1) - - // We have received a task, execute it - taskExecutor(task) - - // Increment idle count - atomic.AddInt32(idleWorkerCount, 1) - } - } -} diff --git a/pond_test.go b/pond_test.go index 9324167..76a4876 100644 --- a/pond_test.go +++ b/pond_test.go @@ -44,5 +44,32 @@ func TestPurgeAfterPoolStopped(t *testing.T) { // Simulate purger goroutine attempting to stop a worker after tasks channel is closed atomic.StoreInt32(&pool.stopped, 1) - pool.stopIdleWorker() + pool.maybeStopIdleWorker() +} + +// See: https://github.com/alitto/pond/issues/33 +func TestPurgeDuringSubmit(t *testing.T) { + + pool := New(1, 1) + + var doneCount int32 + + // Submit a task to ensure at least 1 worker is started + pool.SubmitAndWait(func() { + atomic.AddInt32(&doneCount, 1) + }) + + assertEqual(t, 1, pool.IdleWorkers()) + + // Stop an idle worker right before submitting another task + pool.maybeStopIdleWorker() + pool.Submit(func() { + atomic.AddInt32(&doneCount, 1) + }) + + pool.StopAndWait() + + assertEqual(t, int32(2), atomic.LoadInt32(&doneCount)) + assertEqual(t, 0, pool.RunningWorkers()) + } diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..1677c27 --- /dev/null +++ b/worker.go @@ -0,0 +1,35 @@ +package pond + +import ( + "context" + "sync" +) + +// worker represents a worker goroutine +func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) { + + // If provided, execute the first task immediately, before listening to the tasks channel + if firstTask != nil { + taskExecutor(firstTask, true) + } + + defer func() { + waitGroup.Done() + }() + + for { + select { + case <-context.Done(): + // Pool context was cancelled, exit + return + case task, ok := <-tasks: + if task == nil || !ok { + // We have received a signal to exit + return + } + + // We have received a task, execute it + taskExecutor(task, false) + } + } +}