Simplify submit method and remove atomic counter from resizing strategies

This commit is contained in:
Alejandro Durante
2022-03-12 15:20:12 -03:00
parent 15e5de604b
commit 1c5208b35a
3 changed files with 72 additions and 46 deletions
+38 -35
View File
@@ -231,7 +231,7 @@ func (p *WorkerPool) TrySubmit(task func()) bool {
func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
if task == nil { if task == nil {
return false return
} }
if p.Stopped() { if p.Stopped() {
@@ -239,7 +239,7 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
if mustSubmit { if mustSubmit {
panic(ErrSubmitOnStoppedPool) panic(ErrSubmitOnStoppedPool)
} }
return false return
} }
// Increment submitted and waiting task counters as soon as we receive a task // Increment submitted and waiting task counters as soon as we receive a task
@@ -256,35 +256,19 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
} }
}() }()
runningWorkerCount := p.RunningWorkers()
// Attempt to dispatch to an idle worker without blocking
if runningWorkerCount > 0 && p.IdleWorkers() > 0 {
select {
case p.tasks <- task:
submitted = true
return
default:
// No idle worker available, continue
}
}
// Start a worker as long as we haven't reached the limit // Start a worker as long as we haven't reached the limit
if runningWorkerCount < p.maxWorkers { if submitted = p.maybeStartWorker(task); submitted {
if ok := p.maybeStartWorker(task); ok {
submitted = true
return return
} }
}
if !mustSubmit { if !mustSubmit {
// Attempt to dispatch to an idle worker without blocking
select { select {
case p.tasks <- task: case p.tasks <- task:
submitted = true submitted = true
return return
default: default:
// Channel is full and can't wait for an idle worker, so need to exit // Channel is full and can't wait for an idle worker, so need to exit
submitted = false
return return
} }
} }
@@ -414,15 +398,20 @@ func (p *WorkerPool) stopIdleWorker() {
} }
} }
// startWorkers creates new worker goroutines to run the given tasks // maybeStartWorker attempts to create a new worker goroutine to run the given task.
// If the worker pool has reached the maximum number of workers or there are idle workers,
// it will not create a new worker.
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
// Attempt to increment worker count
if ok := p.incrementWorkerCount(); !ok { if ok := p.incrementWorkerCount(); !ok {
return false return false
} }
// Launch worker if firstTask == nil {
atomic.AddInt32(&p.idleWorkerCount, 1)
}
// Launch worker goroutine
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
return true return true
@@ -451,31 +440,44 @@ func (p *WorkerPool) executeTask(task func()) {
atomic.AddUint64(&p.successfulTaskCount, 1) atomic.AddUint64(&p.successfulTaskCount, 1)
} }
func (p *WorkerPool) incrementWorkerCount() bool { func (p *WorkerPool) incrementWorkerCount() (incremented bool) {
// Attempt to increment worker count
p.mutex.Lock()
runningWorkerCount := p.RunningWorkers() runningWorkerCount := p.RunningWorkers()
// Execute the resizing strategy to determine if we can create more workers
if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers { // Reached max workers, do not create a new one
p.mutex.Unlock() if runningWorkerCount >= p.maxWorkers {
return false return
} }
// Idle workers available, do not create a new one
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
return
}
p.mutex.Lock()
defer p.mutex.Unlock()
// Execute the resizing strategy to determine if we can create more workers
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)
if incremented {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1) atomic.AddInt32(&p.workerCount, 1)
p.mutex.Unlock()
// Increment waiting group semaphore // Increment waiting group semaphore
p.workersWaitGroup.Add(1) p.workersWaitGroup.Add(1)
}
return true return
} }
func (p *WorkerPool) decrementWorkerCount() { func (p *WorkerPool) decrementWorkerCount() {
// Decrement worker count
p.mutex.Lock() p.mutex.Lock()
defer p.mutex.Unlock()
// Decrement worker count
atomic.AddInt32(&p.workerCount, -1) atomic.AddInt32(&p.workerCount, -1)
p.mutex.Unlock()
// Decrement waiting group semaphore // Decrement waiting group semaphore
p.workersWaitGroup.Done() p.workersWaitGroup.Done()
@@ -502,9 +504,10 @@ func worker(context context.Context, firstTask func(), tasks <-chan func(), idle
// We have received a task, execute it // We have received a task, execute it
if firstTask != nil { if firstTask != nil {
taskExecutor(firstTask) taskExecutor(firstTask)
}
// Increment idle count // Increment idle count
atomic.AddInt32(idleWorkerCount, 1) atomic.AddInt32(idleWorkerCount, 1)
}
for { for {
select { select {
+24
View File
@@ -597,3 +597,27 @@ func TestConcurrentStopAndWait(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestSubmitToIdleWorker(t *testing.T) {
pool := pond.New(6, 0, pond.MinWorkers(3))
assertEqual(t, 3, pool.RunningWorkers())
// Submit task
var doneCount int32
for i := 0; i < 3; i++ {
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
}
// Verify no new workers were started
assertEqual(t, 3, pool.RunningWorkers())
// Wait until all submitted tasks complete
pool.StopAndWait()
assertEqual(t, int32(3), atomic.LoadInt32(&doneCount))
}
+5 -6
View File
@@ -2,7 +2,6 @@ package pond
import ( import (
"runtime" "runtime"
"sync/atomic"
) )
var maxProcs = runtime.GOMAXPROCS(0) var maxProcs = runtime.GOMAXPROCS(0)
@@ -25,8 +24,8 @@ var (
// ratedResizer implements a rated resizing strategy // ratedResizer implements a rated resizing strategy
type ratedResizer struct { type ratedResizer struct {
rate int rate uint64
hits int32 hits uint64
} }
// RatedResizer creates a resizing strategy which can be configured // RatedResizer creates a resizing strategy which can be configured
@@ -40,7 +39,7 @@ func RatedResizer(rate int) ResizingStrategy {
} }
return &ratedResizer{ return &ratedResizer{
rate: rate, rate: uint64(rate),
} }
} }
@@ -50,7 +49,7 @@ func (r *ratedResizer) Resize(runningWorkers, minWorkers, maxWorkers int) bool {
return true return true
} }
hits := int(atomic.AddInt32(&r.hits, 1)) r.hits++
return hits%r.rate == 1 return r.hits%r.rate == 1
} }