diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index dab5b9f..9ff1664 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -43,9 +43,9 @@ var defaultPoolConfig = poolConfig{ } var pondSubjects = []subject{ - {"Pond-Eager", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Eager}}, - {"Pond-Balanced", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Balanced}}, - {"Pond-Lazy", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Lazy}}, + {"Pond-Eager", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Eager()}}, + {"Pond-Balanced", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Balanced()}}, + {"Pond-Lazy", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Lazy()}}, } var otherSubjects = []subject{ diff --git a/pond.go b/pond.go index 80bd88f..cb498c1 100644 --- a/pond.go +++ b/pond.go @@ -70,11 +70,12 @@ type WorkerPool struct { idleWorkerCount int32 completedTaskCount uint64 // Private properties - tasks chan func() - dispatchedTasks chan func() - purgerQuit chan struct{} - stopOnce sync.Once - waitGroup sync.WaitGroup + tasks chan func() + dispatchedTasks chan func() + stopOnce sync.Once + waitGroup sync.WaitGroup + lastResizeTime time.Time + lastResizeCompletedTasks uint64 // Debug information debug bool maxWorkerCount int @@ -91,8 +92,9 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { maxWorkers: maxWorkers, maxCapacity: maxCapacity, idleTimeout: defaultIdleTimeout, - strategy: Balanced, + strategy: Balanced(), panicHandler: defaultPanicHandler, + debug: false, } // Apply all options @@ -117,7 +119,6 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { // Create internal channels pool.tasks = make(chan func(), pool.maxCapacity) pool.dispatchedTasks = make(chan func(), pool.maxWorkers) - pool.purgerQuit = make(chan struct{}) // Start dispatcher goroutine pool.waitGroup.Add(1) @@ -127,14 +128,6 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.dispatch() }() - // Start purger goroutine - pool.waitGroup.Add(1) - go func() { - defer pool.waitGroup.Done() - - pool.purge() - }() - // Start minWorkers workers if pool.minWorkers > 0 { pool.startWorkers(pool.minWorkers, nil) @@ -203,8 +196,8 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit func (p *WorkerPool) Stop() { p.stopOnce.Do(func() { - // Send signal to stop the purger - close(p.purgerQuit) + // Close the tasks channel to prevent receiving new tasks + close(p.tasks) }) } @@ -219,157 +212,181 @@ func (p *WorkerPool) StopAndWait() { // dispatch represents the work done by the dispatcher goroutine func (p *WorkerPool) dispatch() { - batch := make([]func(), 0) - batchSize := int(math.Max(float64(p.minWorkers), 1000)) - var lastCompletedTasks uint64 = 0 - var lastCycle time.Time = time.Now() + // Declare vars + var ( + maxBatchSize = 1000 + batch = make([]func(), maxBatchSize) + batchSize = int(math.Max(float64(p.minWorkers), 100)) + idleWorkers = 0 + dispatchedToIdleWorkers = 0 + dispatchedToNewWorkers = 0 + dispatchedBlocking = 0 + nextTask func() = nil + ) - for task := range p.tasks { + idleTimer := time.NewTimer(p.idleTimeout) + defer idleTimer.Stop() - idleCount := p.Idle() - dispatchedImmediately := 0 + // Start dispatching cycle +DispatchCycle: + for { + // Reset idle timer + idleTimer.Reset(p.idleTimeout) - // Dispatch up to idleCount tasks without blocking - nextTask := task - ImmediateDispatch: - for i := 0; i < idleCount; i++ { - - // Attempt to dispatch - select { - case p.dispatchedTasks <- nextTask: - dispatchedImmediately++ - default: - break ImmediateDispatch + select { + // Receive a task + case task, ok := <-p.tasks: + if !ok { + // Received the signal to exit + break DispatchCycle } - // Attempt to receive another task - select { - case t, ok := <-p.tasks: - if !ok { - // Nothing to dispatch - nextTask = nil - break ImmediateDispatch - } - nextTask = t - default: - nextTask = nil - break ImmediateDispatch + idleWorkers = p.Idle() + + // Dispatch tasks to idle workers + nextTask, dispatchedToIdleWorkers = p.dispatchToIdleWorkers(task, idleWorkers) + if nextTask == nil { + continue DispatchCycle } - } - if nextTask == nil { - continue - } - // Start batching tasks - batch = append(batch, nextTask) + // Read up to batchSize tasks without blocking + p.receiveBatch(nextTask, &batch, batchSize) - // Read up to batchSize tasks without blocking - BulkReceive: - for i := 0; i < batchSize-1; i++ { - select { - case t, ok := <-p.tasks: - if !ok { - break BulkReceive - } - if t != nil { - batch = append(batch, t) - } - default: - break BulkReceive - } - } + // Resize the pool + dispatchedToNewWorkers = p.resizePool(batch, dispatchedToIdleWorkers) - // Resize the pool - now := time.Now() - delta := now.Sub(lastCycle) - workload := len(batch) - runningCount := p.Running() - lastCycle = now - currentCompletedTasks := atomic.LoadUint64(&p.completedTaskCount) - completedTasks := int(currentCompletedTasks - lastCompletedTasks) - if completedTasks < 0 { - completedTasks = 0 - } - lastCompletedTasks = currentCompletedTasks - targetDelta := p.calculatePoolSizeDelta(runningCount, idleCount, workload+dispatchedImmediately, completedTasks, delta) - - // Start up to targetDelta workers - dispatched := 0 - if targetDelta > 0 { - p.startWorkers(targetDelta, batch) - dispatched = workload - if targetDelta < workload { - dispatched = targetDelta - } - } else if targetDelta < 0 { - // Kill targetDelta workers - for i := 0; i < -targetDelta; i++ { - p.dispatchedTasks <- nil - } - } - - dispatchedBlocking := 0 - - if workload > dispatched { - for _, task := range batch[dispatched:] { - // Attempt to dispatch the task without blocking - select { - case p.dispatchedTasks <- task: - default: - // Block until a worker accepts this task - p.dispatchedTasks <- task - dispatchedBlocking++ + dispatchedBlocking = 0 + if len(batch) > dispatchedToNewWorkers { + for _, task := range batch[dispatchedToNewWorkers:] { + // Attempt to dispatch the task without blocking + select { + case p.dispatchedTasks <- task: + default: + // Block until a worker accepts this task + p.dispatchedTasks <- task + dispatchedBlocking++ + } } } - } - // Adjust batch size - if dispatchedBlocking > 0 { - if batchSize > 1 { - batchSize = 1 - } - } else { - maxBatchSize := runningCount + targetDelta - batchSize = batchSize * 2 - if batchSize > maxBatchSize { - batchSize = maxBatchSize + // Adjust batch size + if dispatchedBlocking > 0 { + if batchSize > 1 { + batchSize = 1 + } + } else { + batchSize = batchSize * 2 + if batchSize > maxBatchSize { + batchSize = maxBatchSize + } } + // Timed out waiting for any activity to happen, attempt to resize the pool + case <-idleTimer.C: + p.resizePool(batch[:0], 0) } - - // Clear batch slice - batch = nil } // Send signal to stop all workers close(p.dispatchedTasks) + + if p.debug { + fmt.Printf("Max workers: %d", p.maxWorkerCount) + } } -// purge represents the work done by the purger goroutine -func (p *WorkerPool) purge() { - ticker := time.NewTicker(p.idleTimeout) - defer ticker.Stop() +func (p *WorkerPool) dispatchToIdleWorkers(task func(), limit int) (nextTask func(), dispatched int) { - for { + // Dispatch up to limit tasks without blocking + nextTask = task + for i := 0; i < limit; i++ { + + // Attempt to dispatch without blocking select { - // Timed out waiting for any activity to happen, attempt to resize the pool - case <-ticker.C: - if p.Idle() > 0 { - select { - case p.tasks <- nil: - default: - // If tasks channel is full, there's no need to resize the pool - } + case p.dispatchedTasks <- nextTask: + nextTask = nil + dispatched++ + default: + // Could not dispatch, return the task + return + } + + // Attempt to receive another task + select { + case t, ok := <-p.tasks: + if !ok { + // Nothing else to dispatch + nextTask = nil + return } - - // Received the signal to exit - case <-p.purgerQuit: - - // Close the tasks channel to prevent receiving new tasks - close(p.tasks) - + nextTask = t + default: + nextTask = nil return } } + + return +} + +func (p *WorkerPool) receiveBatch(task func(), batch *[]func(), batchSize int) { + + // Reset batch slice + *batch = (*batch)[:0] + *batch = append(*batch, task) + + // Read up to batchSize tasks without blocking + for i := 0; i < batchSize-1; i++ { + select { + case t, ok := <-p.tasks: + if !ok { + return + } + if t != nil { + *batch = append(*batch, t) + } + default: + return + } + } +} + +func (p *WorkerPool) resizePool(batch []func(), dispatchedToIdleWorkers int) int { + + // Time to resize the pool + now := time.Now() + workload := len(batch) + currentCompletedTasks := atomic.LoadUint64(&p.completedTaskCount) + completedTasksDelta := int(currentCompletedTasks - p.lastResizeCompletedTasks) + if completedTasksDelta < 0 { + completedTasksDelta = 0 + } + duration := 0 * time.Millisecond + if !p.lastResizeTime.IsZero() { + duration = now.Sub(p.lastResizeTime) + } + poolSizeDelta := p.calculatePoolSizeDelta(p.Running(), p.Idle(), + workload+dispatchedToIdleWorkers, completedTasksDelta, duration) + + // Capture values for next resize cycle + p.lastResizeTime = now + p.lastResizeCompletedTasks = currentCompletedTasks + + // Start up to poolSizeDelta workers + dispatched := 0 + if poolSizeDelta > 0 { + p.startWorkers(poolSizeDelta, batch) + dispatched = workload + if poolSizeDelta < workload { + dispatched = poolSizeDelta + } + } else if poolSizeDelta < 0 { + // Kill poolSizeDelta workers + for i := 0; i < -poolSizeDelta; i++ { + p.dispatchedTasks <- nil + } + } + + return dispatched } // calculatePoolSizeDelta calculates what's the delta to reach the ideal pool size based on the current size and workload @@ -420,23 +437,25 @@ func (p *WorkerPool) startWorkers(count int, firstTasks []func()) { p.waitGroup.Add(count) // Launch workers + var firstTask func() for i := 0; i < count; i++ { - var firstTask func() = nil + firstTask = nil if i < len(firstTasks) { firstTask = firstTasks[i] } - worker(firstTask, p.dispatchedTasks, &p.idleWorkerCount, &p.completedTaskCount, func() { - - // Decrement worker count - atomic.AddInt32(&p.workerCount, -1) - - // Decrement waiting group semaphore - p.waitGroup.Done() - - }, p.panicHandler) + go worker(firstTask, p.dispatchedTasks, &p.idleWorkerCount, &p.completedTaskCount, p.decrementWorkers, p.panicHandler) } } +func (p *WorkerPool) decrementWorkers() { + + // Decrement worker count + atomic.AddInt32(&p.workerCount, -1) + + // Decrement waiting group semaphore + p.waitGroup.Done() +} + // Group creates a new task group func (p *WorkerPool) Group() *TaskGroup { return &TaskGroup{ @@ -447,53 +466,51 @@ func (p *WorkerPool) Group() *TaskGroup { // worker launches a worker goroutine func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, completedTaskCount *uint64, exitHandler func(), panicHandler func(interface{})) { - go func() { - defer func() { - if panic := recover(); panic != nil { - // Handle panic - panicHandler(panic) + defer func() { + if panic := recover(); panic != nil { + // Handle panic + panicHandler(panic) - // Restart goroutine - worker(nil, tasks, idleWorkerCount, completedTaskCount, exitHandler, panicHandler) - } else { - // Handle exit - exitHandler() - } - }() + // Restart goroutine + go worker(nil, tasks, idleWorkerCount, completedTaskCount, exitHandler, panicHandler) + } else { + // Handle exit + exitHandler() + } + }() + + // We have received a task, execute it + func() { + // Increment idle count + defer atomic.AddInt32(idleWorkerCount, 1) + if firstTask != nil { + // Increment completed task count + defer atomic.AddUint64(completedTaskCount, 1) + + firstTask() + } + }() + + for task := range tasks { + if task == nil { + // We have received a signal to quit + return + } + + // Decrement idle count + atomic.AddInt32(idleWorkerCount, -1) // We have received a task, execute it func() { // Increment idle count defer atomic.AddInt32(idleWorkerCount, 1) - if firstTask != nil { - // Increment completed task count - defer atomic.AddUint64(completedTaskCount, 1) - firstTask() - } + // Increment completed task count + defer atomic.AddUint64(completedTaskCount, 1) + + task() }() - - for task := range tasks { - if task == nil { - // We have received a signal to quit - return - } - - // Decrement idle count - atomic.AddInt32(idleWorkerCount, -1) - - // We have received a task, execute it - func() { - // Increment idle count - defer atomic.AddInt32(idleWorkerCount, 1) - - // Increment completed task count - defer atomic.AddUint64(completedTaskCount, 1) - - task() - }() - } - }() + } } // TaskGroup represents a group of related tasks diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index 11c2cb2..3485e8e 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -238,7 +238,7 @@ func TestSubmitWithPanic(t *testing.T) { func TestPoolWithCustomIdleTimeout(t *testing.T) { - pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond)) + pool := pond.New(1, 5, pond.IdleTimeout(1*time.Millisecond)) // Submit a task started := make(chan bool) @@ -258,8 +258,8 @@ func TestPoolWithCustomIdleTimeout(t *testing.T) { // Let the task complete completed <- true - // Wait for idle timeout + 1ms - time.Sleep(3 * time.Millisecond) + // Wait for some time + time.Sleep(10 * time.Millisecond) // Worker should have been killed assertEqual(t, 0, pool.Running()) diff --git a/resizer.go b/resizer.go index 74fe870..ec9666e 100644 --- a/resizer.go +++ b/resizer.go @@ -12,14 +12,14 @@ var ( // which can reduce throughput under certain conditions. // This strategy is meant for worker pools that will operate at a small percentage of their capacity // most of the time and may occasionally receive bursts of tasks. - Eager = DynamicResizer(1, 0.01) + Eager = func() ResizingStrategy { return DynamicResizer(1, 0.01) } // Balanced tries to find a balance between responsiveness and throughput. // It's the default strategy and it's suitable for general purpose worker pools or those // that will operate close to 50% of their capacity most of the time. - Balanced = DynamicResizer(3, 0.01) + Balanced = func() ResizingStrategy { return DynamicResizer(3, 0.01) } // Lazy maximizes throughput at the expense of responsiveness. // This strategy is meant for worker pools that will operate close to their max. capacity most of the time. - Lazy = DynamicResizer(5, 0.01) + Lazy = func() ResizingStrategy { return DynamicResizer(5, 0.01) } ) // dynamicResizer implements a configurable dynamic resizing strategy @@ -29,6 +29,7 @@ type dynamicResizer struct { incomingTasks *ring.Ring completedTasks *ring.Ring duration *ring.Ring + busyWorkers *ring.Ring } // DynamicResizer creates a dynamic resizing strategy that gradually increases or decreases @@ -58,15 +59,18 @@ func (r *dynamicResizer) reset() { r.incomingTasks = ring.New(r.windowSize) r.completedTasks = ring.New(r.windowSize) r.duration = ring.New(r.windowSize) + r.busyWorkers = ring.New(r.windowSize) // Initialize with 0s for i := 0; i < r.windowSize; i++ { r.incomingTasks.Value = 0 r.completedTasks.Value = 0 r.duration.Value = 0 * time.Second + r.busyWorkers.Value = 0 r.incomingTasks = r.incomingTasks.Next() r.completedTasks = r.completedTasks.Next() r.duration = r.duration.Next() + r.busyWorkers = r.busyWorkers.Next() } } @@ -94,18 +98,28 @@ func (r *dynamicResizer) totalDuration() time.Duration { return valueSum } -func (r *dynamicResizer) push(incomingTasks int, completedTasks int, duration time.Duration) { +func (r *dynamicResizer) avgBusyWorkers() float64 { + var valueSum int = 0 + r.busyWorkers.Do(func(value interface{}) { + valueSum += value.(int) + }) + return float64(valueSum) / float64(r.windowSize) +} + +func (r *dynamicResizer) push(incomingTasks, completedTasks, busyWorkers int, duration time.Duration) { r.incomingTasks.Value = incomingTasks r.completedTasks.Value = completedTasks r.duration.Value = duration + r.busyWorkers.Value = busyWorkers r.incomingTasks = r.incomingTasks.Next() r.completedTasks = r.completedTasks.Next() r.duration = r.duration.Next() + r.busyWorkers = r.busyWorkers.Next() } func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, duration time.Duration) int { - r.push(incomingTasks, completedTasks, duration) + r.push(incomingTasks, completedTasks, runningWorkers-idleWorkers, duration) windowIncomingTasks := r.totalIncomingTasks() windowCompletedTasks := r.totalCompletedTasks() @@ -116,10 +130,24 @@ func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWork if runningWorkers == 0 || windowCompletedTasks == 0 { // No workers yet, create as many workers ar.incomingTasks-idleWorkers delta := incomingTasks - idleWorkers + if delta < 0 { + delta = 0 + } return r.fitDelta(delta, runningWorkers, minWorkers, maxWorkers) } - deltaRate := windowInputRate - windowOutputRate + // Calculate max throughput + avgBusyWorkers := r.avgBusyWorkers() + if avgBusyWorkers < 1 { + avgBusyWorkers = 1 + } + windowWorkerRate := windowOutputRate / avgBusyWorkers + if windowWorkerRate < 1 { + windowWorkerRate = 1 + } + maxOutputRate := windowWorkerRate * float64(runningWorkers) + + deltaRate := windowInputRate - maxOutputRate // No changes, do not resize if deltaRate == 0 { @@ -127,7 +155,6 @@ func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWork } // If delta % is below the defined tolerance, do not resize - if r.tolerance > 0 { deltaPercentage := math.Abs(deltaRate / windowInputRate) if deltaPercentage < r.tolerance { @@ -136,10 +163,11 @@ func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWork } if deltaRate > 0 { - // Need to grow the pool - workerRate := windowOutputRate / float64(runningWorkers) ratio := windowSecs / float64(r.windowSize) - delta := int(ratio*(deltaRate/workerRate)) - idleWorkers + delta := int(ratio * (deltaRate / windowWorkerRate)) + if delta < 0 { + delta = 0 + } if deltaRate > 0 && delta < 1 { delta = 1 } diff --git a/resizer_test.go b/resizer_test.go index c2d7901..749feb5 100644 --- a/resizer_test.go +++ b/resizer_test.go @@ -13,19 +13,19 @@ func TestResize(t *testing.T) { assertEqual(t, 10, resizer.Resize(0, 0, 1, 100, 10, 0, 1*time.Second)) // Now the input rate grows but below the tolerance (10%) - assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 1, 10, 1*time.Second)) + assertEqual(t, -1, resizer.Resize(10, 10, 1, 100, 1, 10, 1*time.Second)) // Now the input rate grows more assertEqual(t, 90, resizer.Resize(10, 10, 1, 100, 100000, 11, 1*time.Second)) // Now there's no new tasks for 3 cycles - assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second)) + assertEqual(t, -1, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second)) assertEqual(t, -1, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second)) assertEqual(t, 0, resizer.Resize(1, 1, 1, 100, 0, 100011, 10*time.Second)) } func TestEagerPool(t *testing.T) { - pool := New(100, 1000, Strategy(Eager)) + pool := New(100, 1000, Strategy(Eager())) pool.debug = true for i := 0; i < 100; i++ {