Merge pull request #34 from alitto/issue/22
Prevent deadlock when purging idle worker during task submission
This commit is contained in:
@@ -10,7 +10,7 @@ jobs:
|
|||||||
name: Test
|
name: Test
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
|
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x]
|
||||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
@@ -21,7 +21,7 @@ jobs:
|
|||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test -race -v ./
|
run: make test
|
||||||
codecov:
|
codecov:
|
||||||
name: Upload coverage report to Codecov
|
name: Upload coverage report to Codecov
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
@@ -33,9 +33,9 @@ jobs:
|
|||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./
|
run: make coverage
|
||||||
- uses: codecov/codecov-action@v2
|
- uses: codecov/codecov-action@v2
|
||||||
with:
|
with:
|
||||||
files: ./coverage.txt
|
files: coverage.out
|
||||||
fail_ci_if_error: true
|
fail_ci_if_error: true
|
||||||
verbose: true
|
verbose: true
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
test:
|
||||||
|
go test -race -v ./
|
||||||
|
|
||||||
|
coverage:
|
||||||
|
go test -race -v -coverprofile=coverage.out -covermode=atomic ./
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/dynamic_size
|
module github.com/alitto/pond/examples/dynamic_size
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.7.1
|
github.com/alitto/pond v1.7.1
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/fixed_size
|
module github.com/alitto/pond/examples/fixed_size
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.7.1
|
github.com/alitto/pond v1.7.1
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/group_context
|
module github.com/alitto/pond/examples/group_context
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.7.1
|
github.com/alitto/pond v1.7.1
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/pool_context
|
module github.com/alitto/pond/examples/pool_context
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require github.com/alitto/pond v1.7.1
|
require github.com/alitto/pond v1.7.1
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/fixed_size
|
module github.com/alitto/pond/examples/fixed_size
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.7.1
|
github.com/alitto/pond v1.7.1
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
module github.com/alitto/pond/examples/task_group
|
module github.com/alitto/pond/examples/task_group
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.7.1
|
github.com/alitto/pond v1.7.1
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
module github.com/alitto/pond
|
module github.com/alitto/pond
|
||||||
|
|
||||||
go 1.18
|
go 1.19
|
||||||
|
|||||||
@@ -357,6 +357,9 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
|
|||||||
p.tasksWaitGroup.Wait()
|
p.tasksWaitGroup.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset worker count
|
||||||
|
p.resetWorkerCount()
|
||||||
|
|
||||||
// Terminate all workers & purger goroutine
|
// Terminate all workers & purger goroutine
|
||||||
p.contextCancel()
|
p.contextCancel()
|
||||||
|
|
||||||
@@ -380,7 +383,7 @@ func (p *WorkerPool) purge() {
|
|||||||
select {
|
select {
|
||||||
// Timed out waiting for any activity to happen, attempt to stop an idle worker
|
// Timed out waiting for any activity to happen, attempt to stop an idle worker
|
||||||
case <-idleTicker.C:
|
case <-idleTicker.C:
|
||||||
p.stopIdleWorker()
|
p.maybeStopIdleWorker()
|
||||||
// Pool context was cancelled, exit
|
// Pool context was cancelled, exit
|
||||||
case <-p.context.Done():
|
case <-p.context.Done():
|
||||||
return
|
return
|
||||||
@@ -388,34 +391,41 @@ func (p *WorkerPool) purge() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stopIdleWorker attempts to stop an idle worker by sending it a nil task
|
// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
|
||||||
func (p *WorkerPool) stopIdleWorker() {
|
func (p *WorkerPool) maybeStopIdleWorker() bool {
|
||||||
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() {
|
|
||||||
p.tasks <- nil
|
if decremented := p.decrementWorkerCount(); !decremented {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send a nil task to stop an idle worker
|
||||||
|
p.tasks <- nil
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeStartWorker attempts to create a new worker goroutine to run the given task.
|
// maybeStartWorker attempts to create a new worker goroutine to run the given task.
|
||||||
// If the worker pool has reached the maximum number of workers or there are idle workers,
|
// If the worker pool has reached the maximum number of workers or there are idle workers,
|
||||||
// it will not create a new worker.
|
// it will not create a new one.
|
||||||
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
|
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
|
||||||
|
|
||||||
if ok := p.incrementWorkerCount(); !ok {
|
if incremented := p.incrementWorkerCount(); !incremented {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if firstTask == nil {
|
if firstTask == nil {
|
||||||
|
// Worker starts idle
|
||||||
atomic.AddInt32(&p.idleWorkerCount, 1)
|
atomic.AddInt32(&p.idleWorkerCount, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Launch worker goroutine
|
// Launch worker goroutine
|
||||||
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
|
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// executeTask executes the given task and updates task-related counters
|
// executeTask executes the given task and updates task-related counters
|
||||||
func (p *WorkerPool) executeTask(task func()) {
|
func (p *WorkerPool) executeTask(task func(), isFirstTask bool) {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if panic := recover(); panic != nil {
|
if panic := recover(); panic != nil {
|
||||||
@@ -424,60 +434,90 @@ func (p *WorkerPool) executeTask(task func()) {
|
|||||||
|
|
||||||
// Invoke panic handler
|
// Invoke panic handler
|
||||||
p.panicHandler(panic)
|
p.panicHandler(panic)
|
||||||
|
|
||||||
|
// Increment idle count
|
||||||
|
atomic.AddInt32(&p.idleWorkerCount, 1)
|
||||||
}
|
}
|
||||||
p.tasksWaitGroup.Done()
|
p.tasksWaitGroup.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Decrement idle count
|
||||||
|
if !isFirstTask {
|
||||||
|
atomic.AddInt32(&p.idleWorkerCount, -1)
|
||||||
|
}
|
||||||
|
|
||||||
// Decrement waiting task count
|
// Decrement waiting task count
|
||||||
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
|
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
|
||||||
|
|
||||||
|
// Execute task
|
||||||
task()
|
task()
|
||||||
|
|
||||||
// Increment successful task count
|
// Increment successful task count
|
||||||
atomic.AddUint64(&p.successfulTaskCount, 1)
|
atomic.AddUint64(&p.successfulTaskCount, 1)
|
||||||
|
|
||||||
|
// Increment idle count
|
||||||
|
atomic.AddInt32(&p.idleWorkerCount, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *WorkerPool) incrementWorkerCount() (incremented bool) {
|
func (p *WorkerPool) incrementWorkerCount() bool {
|
||||||
|
|
||||||
|
p.mutex.Lock()
|
||||||
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
runningWorkerCount := p.RunningWorkers()
|
runningWorkerCount := p.RunningWorkers()
|
||||||
|
|
||||||
// Reached max workers, do not create a new one
|
// Reached max workers, do not create a new one
|
||||||
if runningWorkerCount >= p.maxWorkers {
|
if runningWorkerCount >= p.maxWorkers {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Idle workers available, do not create a new one
|
// Idle workers available, do not create a new one
|
||||||
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
|
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
p.mutex.Lock()
|
// Execute the resizing strategy to determine if we should create more workers
|
||||||
defer p.mutex.Unlock()
|
if resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize {
|
||||||
|
return false
|
||||||
// Execute the resizing strategy to determine if we can create more workers
|
|
||||||
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)
|
|
||||||
|
|
||||||
if incremented {
|
|
||||||
// Increment worker count
|
|
||||||
atomic.AddInt32(&p.workerCount, 1)
|
|
||||||
|
|
||||||
// Increment wait group
|
|
||||||
p.workersWaitGroup.Add(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
// Increment worker count
|
||||||
|
atomic.AddInt32(&p.workerCount, 1)
|
||||||
|
|
||||||
|
// Increment wait group
|
||||||
|
p.workersWaitGroup.Add(1)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *WorkerPool) decrementWorkerCount() {
|
func (p *WorkerPool) decrementWorkerCount() bool {
|
||||||
|
|
||||||
p.mutex.Lock()
|
p.mutex.Lock()
|
||||||
defer p.mutex.Unlock()
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
|
if p.IdleWorkers() <= 0 || p.RunningWorkers() <= p.minWorkers || p.Stopped() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Decrement worker count
|
// Decrement worker count
|
||||||
atomic.AddInt32(&p.workerCount, -1)
|
atomic.AddInt32(&p.workerCount, -1)
|
||||||
|
|
||||||
// Decrement wait group
|
// Decrement idle count
|
||||||
p.workersWaitGroup.Done()
|
atomic.AddInt32(&p.idleWorkerCount, -1)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *WorkerPool) resetWorkerCount() {
|
||||||
|
|
||||||
|
p.mutex.Lock()
|
||||||
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
|
// Reset worker count
|
||||||
|
atomic.StoreInt32(&p.workerCount, 0)
|
||||||
|
|
||||||
|
// Reset idle count
|
||||||
|
atomic.StoreInt32(&p.idleWorkerCount, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group creates a new task group
|
// Group creates a new task group
|
||||||
@@ -506,45 +546,3 @@ func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, c
|
|||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}, ctx
|
}, ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
// worker launches a worker goroutine
|
|
||||||
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
// Decrement idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, -1)
|
|
||||||
|
|
||||||
// Handle normal exit
|
|
||||||
exitHandler()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// We have received a task, execute it
|
|
||||||
if firstTask != nil {
|
|
||||||
taskExecutor(firstTask)
|
|
||||||
|
|
||||||
// Increment idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-context.Done():
|
|
||||||
// Pool context was cancelled, exit
|
|
||||||
return
|
|
||||||
case task, ok := <-tasks:
|
|
||||||
if task == nil || !ok {
|
|
||||||
// We have received a signal to quit
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decrement idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, -1)
|
|
||||||
|
|
||||||
// We have received a task, execute it
|
|
||||||
taskExecutor(task)
|
|
||||||
|
|
||||||
// Increment idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
+28
-1
@@ -44,5 +44,32 @@ func TestPurgeAfterPoolStopped(t *testing.T) {
|
|||||||
|
|
||||||
// Simulate purger goroutine attempting to stop a worker after tasks channel is closed
|
// Simulate purger goroutine attempting to stop a worker after tasks channel is closed
|
||||||
atomic.StoreInt32(&pool.stopped, 1)
|
atomic.StoreInt32(&pool.stopped, 1)
|
||||||
pool.stopIdleWorker()
|
pool.maybeStopIdleWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
// See: https://github.com/alitto/pond/issues/33
|
||||||
|
func TestPurgeDuringSubmit(t *testing.T) {
|
||||||
|
|
||||||
|
pool := New(1, 1)
|
||||||
|
|
||||||
|
var doneCount int32
|
||||||
|
|
||||||
|
// Submit a task to ensure at least 1 worker is started
|
||||||
|
pool.SubmitAndWait(func() {
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
assertEqual(t, 1, pool.IdleWorkers())
|
||||||
|
|
||||||
|
// Stop an idle worker right before submitting another task
|
||||||
|
pool.maybeStopIdleWorker()
|
||||||
|
pool.Submit(func() {
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
|
||||||
|
assertEqual(t, 0, pool.RunningWorkers())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package pond
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// worker represents a worker goroutine
|
||||||
|
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) {
|
||||||
|
|
||||||
|
// If provided, execute the first task immediately, before listening to the tasks channel
|
||||||
|
if firstTask != nil {
|
||||||
|
taskExecutor(firstTask, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
waitGroup.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-context.Done():
|
||||||
|
// Pool context was cancelled, exit
|
||||||
|
return
|
||||||
|
case task, ok := <-tasks:
|
||||||
|
if task == nil || !ok {
|
||||||
|
// We have received a signal to exit
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have received a task, execute it
|
||||||
|
taskExecutor(task, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user