Simplify resizing strategies

This commit is contained in:
alitto
2020-06-06 16:37:53 -03:00
parent f8b427ec5a
commit 0a4f0e9f32
7 changed files with 441 additions and 639 deletions
+97 -267
View File
@@ -2,7 +2,6 @@ package pond
import (
"fmt"
"math"
"runtime/debug"
"sync"
"sync/atomic"
@@ -22,7 +21,7 @@ func defaultPanicHandler(panic interface{}) {
// ResizingStrategy represents a pool resizing strategy
type ResizingStrategy interface {
Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, delta time.Duration) int
Resize(runningWorkers, minWorkers, maxWorkers int) bool
}
// Option represents an option that can be passed when instantiating a worker pool to customize it
@@ -66,19 +65,13 @@ type WorkerPool struct {
strategy ResizingStrategy
panicHandler func(interface{})
// Atomic counters
workerCount int32
idleWorkerCount int32
completedTaskCount uint64
workerCount int32
idleWorkerCount int32
// Private properties
tasks chan func()
dispatchedTasks chan func()
stopOnce sync.Once
waitGroup sync.WaitGroup
lastResizeTime time.Time
lastResizeCompletedTasks uint64
// Debug information
debug bool
maxWorkerCount int
tasks chan func()
purgerQuit chan struct{}
stopOnce sync.Once
waitGroup sync.WaitGroup
}
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
@@ -92,9 +85,8 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
maxWorkers: maxWorkers,
maxCapacity: maxCapacity,
idleTimeout: defaultIdleTimeout,
strategy: Balanced(),
strategy: Eager(),
panicHandler: defaultPanicHandler,
debug: false,
}
// Apply all options
@@ -118,19 +110,21 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
// Create internal channels
pool.tasks = make(chan func(), pool.maxCapacity)
pool.dispatchedTasks = make(chan func(), pool.maxWorkers)
pool.purgerQuit = make(chan struct{})
// Start dispatcher goroutine
// Start purger goroutine
pool.waitGroup.Add(1)
go func() {
defer pool.waitGroup.Done()
pool.dispatch()
pool.purge()
}()
// Start minWorkers workers
if pool.minWorkers > 0 {
pool.startWorkers(pool.minWorkers, nil)
for i := 0; i < pool.minWorkers; i++ {
pool.startWorker(nil)
}
}
return pool
@@ -147,14 +141,51 @@ func (p *WorkerPool) Idle() int {
}
// Submit sends a task to this worker pool for execution. If the queue is full,
// it will wait until the task can be enqueued
// it will wait until the task is dispatched to a worker goroutine.
func (p *WorkerPool) Submit(task func()) {
p.submit(task, true)
}
// TrySubmit attempts to send a task to this worker pool for execution. If the queue is full,
// it will not wait for a worker to become idle. It returns true if it was able to dispatch
// the task and false otherwise.
func (p *WorkerPool) TrySubmit(task func()) bool {
return p.submit(task, false)
}
func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
if task == nil {
return
return false
}
// Submit the task to the task channel
runningWorkerCount := p.Running()
// Attempt to dispatch to an idle worker without blocking
if runningWorkerCount > 0 && p.Idle() > 0 {
select {
case p.tasks <- task:
return true
default:
// No idle worker available, continue
}
}
maxWorkersReached := runningWorkerCount >= p.maxWorkers
// Exit if we have reached the max. number of workers and can't wait for an idle worker
if maxWorkersReached && !waitForIdle {
return false
}
// Start a worker as long as we haven't reached the limit
if !maxWorkersReached && p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) {
p.startWorker(task)
return true
}
// Submit the task to the tasks channel and wait for it to be picked up by a worker
p.tasks <- task
return true
}
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
@@ -196,8 +227,8 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
func (p *WorkerPool) Stop() {
p.stopOnce.Do(func() {
// Close the tasks channel to prevent receiving new tasks
close(p.tasks)
// Send the signal to stop the purger goroutine
close(p.purgerQuit)
})
}
@@ -209,245 +240,50 @@ func (p *WorkerPool) StopAndWait() {
p.waitGroup.Wait()
}
// dispatch represents the work done by the dispatcher goroutine
func (p *WorkerPool) dispatch() {
// purge represents the work done by the purger goroutine
func (p *WorkerPool) purge() {
// Declare vars
var (
maxBatchSize = 1000
batch = make([]func(), maxBatchSize)
batchSize = int(math.Max(float64(p.minWorkers), 100))
idleWorkers = 0
dispatchedToIdleWorkers = 0
dispatchedToNewWorkers = 0
dispatchedBlocking = 0
nextTask func() = nil
)
idleTicker := time.NewTicker(p.idleTimeout)
defer idleTicker.Stop()
idleTimer := time.NewTimer(p.idleTimeout)
defer idleTimer.Stop()
// Start dispatching cycle
DispatchCycle:
Purge:
for {
// Reset idle timer
idleTimer.Reset(p.idleTimeout)
select {
// Receive a task
case task, ok := <-p.tasks:
if !ok {
// Received the signal to exit
break DispatchCycle
// Timed out waiting for any activity to happen, attempt to kill an idle worker
case <-idleTicker.C:
if p.Idle() > 0 {
p.tasks <- nil
}
idleWorkers = p.Idle()
// Dispatch tasks to idle workers
nextTask, dispatchedToIdleWorkers = p.dispatchToIdleWorkers(task, idleWorkers)
if nextTask == nil {
continue DispatchCycle
}
// Read up to batchSize tasks without blocking
p.receiveBatch(nextTask, &batch, batchSize)
// Resize the pool
dispatchedToNewWorkers = p.resizePool(batch, dispatchedToIdleWorkers)
dispatchedBlocking = 0
if len(batch) > dispatchedToNewWorkers {
for _, task := range batch[dispatchedToNewWorkers:] {
// Attempt to dispatch the task without blocking
select {
case p.dispatchedTasks <- task:
default:
// Block until a worker accepts this task
p.dispatchedTasks <- task
dispatchedBlocking++
}
}
}
// Adjust batch size
if dispatchedBlocking > 0 {
if batchSize > 1 {
batchSize = 1
}
} else {
batchSize = batchSize * 2
if batchSize > maxBatchSize {
batchSize = maxBatchSize
}
}
// Timed out waiting for any activity to happen, attempt to resize the pool
case <-idleTimer.C:
p.resizePool(batch[:0], 0)
case <-p.purgerQuit:
break Purge
}
}
// Send signal to stop all workers
close(p.dispatchedTasks)
close(p.tasks)
if p.debug {
fmt.Printf("Max workers: %d", p.maxWorkerCount)
}
}
func (p *WorkerPool) dispatchToIdleWorkers(task func(), limit int) (nextTask func(), dispatched int) {
// Dispatch up to limit tasks without blocking
nextTask = task
for i := 0; i < limit; i++ {
// Attempt to dispatch without blocking
select {
case p.dispatchedTasks <- nextTask:
nextTask = nil
dispatched++
default:
// Could not dispatch, return the task
return
}
// Attempt to receive another task
select {
case t, ok := <-p.tasks:
if !ok {
// Nothing else to dispatch
nextTask = nil
return
}
nextTask = t
default:
nextTask = nil
return
}
}
return
}
func (p *WorkerPool) receiveBatch(task func(), batch *[]func(), batchSize int) {
// Reset batch slice
*batch = (*batch)[:0]
*batch = append(*batch, task)
// Read up to batchSize tasks without blocking
for i := 0; i < batchSize-1; i++ {
select {
case t, ok := <-p.tasks:
if !ok {
return
}
if t != nil {
*batch = append(*batch, t)
}
default:
return
}
}
}
func (p *WorkerPool) resizePool(batch []func(), dispatchedToIdleWorkers int) int {
// Time to resize the pool
now := time.Now()
workload := len(batch)
currentCompletedTasks := atomic.LoadUint64(&p.completedTaskCount)
completedTasksDelta := int(currentCompletedTasks - p.lastResizeCompletedTasks)
if completedTasksDelta < 0 {
completedTasksDelta = 0
}
duration := 0 * time.Millisecond
if !p.lastResizeTime.IsZero() {
duration = now.Sub(p.lastResizeTime)
}
poolSizeDelta := p.calculatePoolSizeDelta(p.Running(), p.Idle(),
workload+dispatchedToIdleWorkers, completedTasksDelta, duration)
// Capture values for next resize cycle
p.lastResizeTime = now
p.lastResizeCompletedTasks = currentCompletedTasks
// Start up to poolSizeDelta workers
dispatched := 0
if poolSizeDelta > 0 {
p.startWorkers(poolSizeDelta, batch)
dispatched = workload
if poolSizeDelta < workload {
dispatched = poolSizeDelta
}
} else if poolSizeDelta < 0 {
// Kill poolSizeDelta workers
for i := 0; i < -poolSizeDelta; i++ {
p.dispatchedTasks <- nil
}
}
return dispatched
}
// calculatePoolSizeDelta calculates what's the delta to reach the ideal pool size based on the current size and workload
func (p *WorkerPool) calculatePoolSizeDelta(runningWorkers, idleWorkers,
incomingTasks, completedTasks int, duration time.Duration) int {
delta := p.strategy.Resize(runningWorkers, idleWorkers, p.minWorkers, p.maxWorkers,
incomingTasks, completedTasks, duration)
targetSize := runningWorkers + delta
// Cannot go below minWorkers
if targetSize < p.minWorkers {
targetSize = p.minWorkers
}
// Cannot go above maxWorkers
if targetSize > p.maxWorkers {
targetSize = p.maxWorkers
}
if p.debug {
// Print debugging information
durationSecs := duration.Seconds()
inputRate := float64(incomingTasks) / durationSecs
outputRate := float64(completedTasks) / durationSecs
message := fmt.Sprintf("%d\t%d\t%d\t%d\t\"%f\"\t\"%f\"\t%d\t\"%f\"\n",
runningWorkers, idleWorkers, incomingTasks, completedTasks,
inputRate, outputRate,
delta, durationSecs)
fmt.Printf(message)
}
return targetSize - runningWorkers
}
// startWorkers creates new worker goroutines to run the given tasks
func (p *WorkerPool) startWorkers(count int, firstTasks []func()) {
func (p *WorkerPool) startWorker(firstTask func()) {
// Increment worker count
workerCount := atomic.AddInt32(&p.workerCount, int32(count))
p.incrementWorkerCount()
// Collect debug information
if p.debug && int(workerCount) > p.maxWorkerCount {
p.maxWorkerCount = int(workerCount)
}
// Increment waiting group semaphore
p.waitGroup.Add(count)
// Launch workers
var firstTask func()
for i := 0; i < count; i++ {
firstTask = nil
if i < len(firstTasks) {
firstTask = firstTasks[i]
}
go worker(firstTask, p.dispatchedTasks, &p.idleWorkerCount, &p.completedTaskCount, p.decrementWorkers, p.panicHandler)
}
// Launch worker
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.panicHandler)
}
func (p *WorkerPool) decrementWorkers() {
func (p *WorkerPool) incrementWorkerCount() {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1)
// Increment waiting group semaphore
p.waitGroup.Add(1)
}
func (p *WorkerPool) decrementWorkerCount() {
// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)
@@ -464,32 +300,31 @@ func (p *WorkerPool) Group() *TaskGroup {
}
// worker launches a worker goroutine
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, completedTaskCount *uint64, exitHandler func(), panicHandler func(interface{})) {
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), panicHandler func(interface{})) {
defer func() {
if panic := recover(); panic != nil {
// Handle panic
panicHandler(panic)
// Restart goroutine
go worker(nil, tasks, idleWorkerCount, completedTaskCount, exitHandler, panicHandler)
go worker(nil, tasks, idleWorkerCount, exitHandler, panicHandler)
} else {
// Handle exit
// Handle normal exit
exitHandler()
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
}
}()
// We have received a task, execute it
func() {
// Increment idle count
defer atomic.AddInt32(idleWorkerCount, 1)
if firstTask != nil {
// Increment completed task count
defer atomic.AddUint64(completedTaskCount, 1)
firstTask()
}
}()
if firstTask != nil {
firstTask()
}
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
for task := range tasks {
if task == nil {
@@ -501,15 +336,10 @@ func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, complet
atomic.AddInt32(idleWorkerCount, -1)
// We have received a task, execute it
func() {
// Increment idle count
defer atomic.AddInt32(idleWorkerCount, 1)
task()
// Increment completed task count
defer atomic.AddUint64(completedTaskCount, 1)
task()
}()
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
}