Prevent race condition in stopped flag

This commit is contained in:
Alejandro Durante
2022-03-12 17:27:58 -03:00
parent c0b34be41c
commit b52e83255c
2 changed files with 7 additions and 10 deletions
+6 -9
View File
@@ -93,8 +93,7 @@ type WorkerPool struct {
workersWaitGroup sync.WaitGroup workersWaitGroup sync.WaitGroup
tasksWaitGroup sync.WaitGroup tasksWaitGroup sync.WaitGroup
mutex sync.Mutex mutex sync.Mutex
stopped bool stopped int32
stoppedOnce sync.Once
} }
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). // New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
@@ -213,7 +212,7 @@ func (p *WorkerPool) CompletedTasks() uint64 {
// Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise. // Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise.
func (p *WorkerPool) Stopped() bool { func (p *WorkerPool) Stopped() bool {
return p.stopped return atomic.LoadInt32(&p.stopped) == 1
} }
// Submit sends a task to this worker pool for execution. If the queue is full, // Submit sends a task to this worker pool for execution. If the queue is full,
@@ -350,10 +349,8 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
} }
func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) { func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
// Mark pool as stopped (only once, in case multiple concurrent calls to StopAndWait are made) // Mark pool as stopped
p.stoppedOnce.Do(func() { atomic.StoreInt32(&p.stopped, 1)
p.stopped = true
})
if waitForQueuedTasksToComplete { if waitForQueuedTasksToComplete {
// Wait for all queued tasks to complete // Wait for all queued tasks to complete
@@ -464,7 +461,7 @@ func (p *WorkerPool) incrementWorkerCount() (incremented bool) {
// Increment worker count // Increment worker count
atomic.AddInt32(&p.workerCount, 1) atomic.AddInt32(&p.workerCount, 1)
// Increment waiting group semaphore // Increment wait group
p.workersWaitGroup.Add(1) p.workersWaitGroup.Add(1)
} }
@@ -479,7 +476,7 @@ func (p *WorkerPool) decrementWorkerCount() {
// Decrement worker count // Decrement worker count
atomic.AddInt32(&p.workerCount, -1) atomic.AddInt32(&p.workerCount, -1)
// Decrement waiting group semaphore // Decrement wait group
p.workersWaitGroup.Done() p.workersWaitGroup.Done()
} }
+1 -1
View File
@@ -43,6 +43,6 @@ func TestPurgeAfterPoolStopped(t *testing.T) {
assertEqual(t, 1, pool.RunningWorkers()) assertEqual(t, 1, pool.RunningWorkers())
// Simulate purger goroutine attempting to stop a worker after tasks channel is closed // Simulate purger goroutine attempting to stop a worker after tasks channel is closed
pool.stopped = true atomic.StoreInt32(&pool.stopped, 1)
pool.stopIdleWorker() pool.stopIdleWorker()
} }