Prevent deadlock when purging idle worker during task submission

This commit is contained in:
Alejandro Durante
2022-08-28 18:53:17 -03:00
parent d01340a7a2
commit ac8953dc20
5 changed files with 139 additions and 74 deletions
+3 -3
View File
@@ -21,7 +21,7 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Test - name: Test
run: go test -race -v ./ run: make test
codecov: codecov:
name: Upload coverage report to Codecov name: Upload coverage report to Codecov
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -33,9 +33,9 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Test - name: Test
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./ run: make coverage
- uses: codecov/codecov-action@v2 - uses: codecov/codecov-action@v2
with: with:
files: ./coverage.txt files: coverage.out
fail_ci_if_error: true fail_ci_if_error: true
verbose: true verbose: true
+5
View File
@@ -0,0 +1,5 @@
test:
go test -race -v ./
coverage:
go test -race -v -coverprofile=coverage.out -covermode=atomic ./
+68 -70
View File
@@ -357,6 +357,9 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
p.tasksWaitGroup.Wait() p.tasksWaitGroup.Wait()
} }
// Reset worker count
p.resetWorkerCount()
// Terminate all workers & purger goroutine // Terminate all workers & purger goroutine
p.contextCancel() p.contextCancel()
@@ -380,7 +383,7 @@ func (p *WorkerPool) purge() {
select { select {
// Timed out waiting for any activity to happen, attempt to stop an idle worker // Timed out waiting for any activity to happen, attempt to stop an idle worker
case <-idleTicker.C: case <-idleTicker.C:
p.stopIdleWorker() p.maybeStopIdleWorker()
// Pool context was cancelled, exit // Pool context was cancelled, exit
case <-p.context.Done(): case <-p.context.Done():
return return
@@ -388,34 +391,41 @@ func (p *WorkerPool) purge() {
} }
} }
// stopIdleWorker attempts to stop an idle worker by sending it a nil task // maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) stopIdleWorker() { func (p *WorkerPool) maybeStopIdleWorker() bool {
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() {
p.tasks <- nil if decremented := p.decrementWorkerCount(); !decremented {
return false
} }
// Send a nil task to stop an idle worker
p.tasks <- nil
return true
} }
// maybeStartWorker attempts to create a new worker goroutine to run the given task. // maybeStartWorker attempts to create a new worker goroutine to run the given task.
// If the worker pool has reached the maximum number of workers or there are idle workers, // If the worker pool has reached the maximum number of workers or there are idle workers,
// it will not create a new worker. // it will not create a new one.
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
if ok := p.incrementWorkerCount(); !ok { if incremented := p.incrementWorkerCount(); !incremented {
return false return false
} }
if firstTask == nil { if firstTask == nil {
// Worker starts idle
atomic.AddInt32(&p.idleWorkerCount, 1) atomic.AddInt32(&p.idleWorkerCount, 1)
} }
// Launch worker goroutine // Launch worker goroutine
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask)
return true return true
} }
// executeTask executes the given task and updates task-related counters // executeTask executes the given task and updates task-related counters
func (p *WorkerPool) executeTask(task func()) { func (p *WorkerPool) executeTask(task func(), isFirstTask bool) {
defer func() { defer func() {
if panic := recover(); panic != nil { if panic := recover(); panic != nil {
@@ -424,60 +434,90 @@ func (p *WorkerPool) executeTask(task func()) {
// Invoke panic handler // Invoke panic handler
p.panicHandler(panic) p.panicHandler(panic)
// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
} }
p.tasksWaitGroup.Done() p.tasksWaitGroup.Done()
}() }()
// Decrement idle count
if !isFirstTask {
atomic.AddInt32(&p.idleWorkerCount, -1)
}
// Decrement waiting task count // Decrement waiting task count
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0)) atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
// Execute task
task() task()
// Increment successful task count // Increment successful task count
atomic.AddUint64(&p.successfulTaskCount, 1) atomic.AddUint64(&p.successfulTaskCount, 1)
// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
} }
func (p *WorkerPool) incrementWorkerCount() (incremented bool) { func (p *WorkerPool) incrementWorkerCount() bool {
p.mutex.Lock()
defer p.mutex.Unlock()
runningWorkerCount := p.RunningWorkers() runningWorkerCount := p.RunningWorkers()
// Reached max workers, do not create a new one // Reached max workers, do not create a new one
if runningWorkerCount >= p.maxWorkers { if runningWorkerCount >= p.maxWorkers {
return return false
} }
// Idle workers available, do not create a new one // Idle workers available, do not create a new one
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 { if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
return return false
} }
p.mutex.Lock() // Execute the resizing strategy to determine if we should create more workers
defer p.mutex.Unlock() if resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize {
return false
// Execute the resizing strategy to determine if we can create more workers
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)
if incremented {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1)
// Increment wait group
p.workersWaitGroup.Add(1)
} }
return // Increment worker count
atomic.AddInt32(&p.workerCount, 1)
// Increment wait group
p.workersWaitGroup.Add(1)
return true
} }
func (p *WorkerPool) decrementWorkerCount() { func (p *WorkerPool) decrementWorkerCount() bool {
p.mutex.Lock() p.mutex.Lock()
defer p.mutex.Unlock() defer p.mutex.Unlock()
if p.IdleWorkers() <= 0 || p.RunningWorkers() <= p.minWorkers || p.Stopped() {
return false
}
// Decrement worker count // Decrement worker count
atomic.AddInt32(&p.workerCount, -1) atomic.AddInt32(&p.workerCount, -1)
// Decrement wait group // Decrement idle count
p.workersWaitGroup.Done() atomic.AddInt32(&p.idleWorkerCount, -1)
return true
}
func (p *WorkerPool) resetWorkerCount() {
p.mutex.Lock()
defer p.mutex.Unlock()
// Reset worker count
atomic.StoreInt32(&p.workerCount, 0)
// Reset idle count
atomic.StoreInt32(&p.idleWorkerCount, 0)
} }
// Group creates a new task group // Group creates a new task group
@@ -506,45 +546,3 @@ func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, c
cancel: cancel, cancel: cancel,
}, ctx }, ctx
} }
// worker launches a worker goroutine
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
defer func() {
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// Handle normal exit
exitHandler()
}()
// We have received a task, execute it
if firstTask != nil {
taskExecutor(firstTask)
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to quit
return
}
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// We have received a task, execute it
taskExecutor(task)
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
}
}
+28 -1
View File
@@ -44,5 +44,32 @@ func TestPurgeAfterPoolStopped(t *testing.T) {
// Simulate purger goroutine attempting to stop a worker after tasks channel is closed // Simulate purger goroutine attempting to stop a worker after tasks channel is closed
atomic.StoreInt32(&pool.stopped, 1) atomic.StoreInt32(&pool.stopped, 1)
pool.stopIdleWorker() pool.maybeStopIdleWorker()
}
// See: https://github.com/alitto/pond/issues/33
func TestPurgeDuringSubmit(t *testing.T) {
pool := New(1, 1)
var doneCount int32
// Submit a task to ensure at least 1 worker is started
pool.SubmitAndWait(func() {
atomic.AddInt32(&doneCount, 1)
})
assertEqual(t, 1, pool.IdleWorkers())
// Stop an idle worker right before submitting another task
pool.maybeStopIdleWorker()
pool.Submit(func() {
atomic.AddInt32(&doneCount, 1)
})
pool.StopAndWait()
assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
assertEqual(t, 0, pool.RunningWorkers())
} }
+35
View File
@@ -0,0 +1,35 @@
package pond
import (
"context"
"sync"
)
// worker represents a worker goroutine
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) {
// If provided, execute the first task immediately, before listening to the tasks channel
if firstTask != nil {
taskExecutor(firstTask, true)
}
defer func() {
waitGroup.Done()
}()
for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to exit
return
}
// We have received a task, execute it
taskExecutor(task, false)
}
}
}