From 5a66a00af99caad12478b858e4c67bba570a889c Mon Sep 17 00:00:00 2001 From: Alejandro Durante Date: Sun, 6 Mar 2022 11:41:28 -0300 Subject: [PATCH 1/3] Prevent "send on closed channel" in purger goroutine --- pond.go | 83 ++++++++++++++++++++++++++++------------------------ pond_test.go | 17 +++++++++++ 2 files changed, 61 insertions(+), 39 deletions(-) diff --git a/pond.go b/pond.go index 9f29c5b..2ca413b 100644 --- a/pond.go +++ b/pond.go @@ -88,11 +88,11 @@ type WorkerPool struct { successfulTaskCount uint64 failedTaskCount uint64 // Private properties - tasks chan func() - stopOnce sync.Once - waitGroup sync.WaitGroup - mutex sync.Mutex - stopped bool + tasks chan func() + workersWaitGroup sync.WaitGroup + tasksWaitGroup sync.WaitGroup + mutex sync.Mutex + stopped bool } // New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). @@ -242,12 +242,14 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { // Increment submitted and waiting task counters as soon as we receive a task atomic.AddUint64(&p.submittedTaskCount, 1) atomic.AddUint64(&p.waitingTaskCount, 1) + p.tasksWaitGroup.Add(1) defer func() { if !submitted { // Task was not sumitted to the pool, decrement submitted and waiting task counters atomic.AddUint64(&p.submittedTaskCount, ^uint64(0)) atomic.AddUint64(&p.waitingTaskCount, ^uint64(0)) + p.tasksWaitGroup.Done() } }() @@ -328,46 +330,24 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks. // Tasks being processed by workers will continue until completion unless the process is terminated. -// This method can only be called once. func (p *WorkerPool) Stop() { - p.stopOnce.Do(func() { - // Mark pool as stopped - p.stopped = true - - // Stop accepting new tasks - close(p.tasks) - - // Terminate all workers & purger goroutine - p.contextCancel() - }) + p.stop(false) } // StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue -// to complete before returning. This method can only be called once. +// to complete before returning. func (p *WorkerPool) StopAndWait() { - p.stopOnce.Do(func() { - // Mark pool as stopped - p.stopped = true - - // Stop accepting new tasks - close(p.tasks) - - // Wait for all workers to exit - p.waitGroup.Wait() - - // Terminate all workers & purger goroutine - p.contextCancel() - }) + p.stop(true) } // StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning -// or until the given deadline is reached, whichever comes first. This method can only be called once. +// or until the given deadline is reached, whichever comes first. func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { - // Detect if worker pool is already stopped + // Launch goroutine to detect when worker pool has stopped gracefully workersDone := make(chan struct{}) go func() { - p.StopAndWait() + p.stop(true) workersDone <- struct{}{} }() @@ -381,6 +361,25 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { } } +func (p *WorkerPool) stop(waitForCompletion bool) { + + // Mark pool as stopped + p.stopped = true + + // Wait for all queued tasks to complete + if waitForCompletion { + p.tasksWaitGroup.Wait() + } + + // Terminate all workers & purger goroutine + p.contextCancel() + + // Wait for all workers to exit + if waitForCompletion { + p.workersWaitGroup.Wait() + } +} + // purge represents the work done by the purger goroutine func (p *WorkerPool) purge() { @@ -389,11 +388,9 @@ func (p *WorkerPool) purge() { for { select { - // Timed out waiting for any activity to happen, attempt to kill an idle worker + // Timed out waiting for any activity to happen, attempt to stop an idle worker case <-idleTicker.C: - if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers { - p.tasks <- nil - } + p.stopIdleWorker() // Pool context was cancelled, exit case <-p.context.Done(): return @@ -401,6 +398,13 @@ func (p *WorkerPool) purge() { } } +// stopIdleWorker attempts to stop an idle worker by sending it a nil task +func (p *WorkerPool) stopIdleWorker() { + if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() { + p.tasks <- nil + } +} + // startWorkers creates new worker goroutines to run the given tasks func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { @@ -426,6 +430,7 @@ func (p *WorkerPool) executeTask(task func()) { // Invoke panic handler p.panicHandler(panic) } + p.tasksWaitGroup.Done() }() // Decrement waiting task count @@ -451,7 +456,7 @@ func (p *WorkerPool) incrementWorkerCount() bool { p.mutex.Unlock() // Increment waiting group semaphore - p.waitGroup.Add(1) + p.workersWaitGroup.Add(1) return true } @@ -464,7 +469,7 @@ func (p *WorkerPool) decrementWorkerCount() { p.mutex.Unlock() // Decrement waiting group semaphore - p.waitGroup.Done() + p.workersWaitGroup.Done() } // Group creates a new task group diff --git a/pond_test.go b/pond_test.go index e56aca2..9785543 100644 --- a/pond_test.go +++ b/pond_test.go @@ -1,6 +1,7 @@ package pond import ( + "sync/atomic" "testing" "time" ) @@ -29,3 +30,19 @@ func TestNewWithInconsistentOptions(t *testing.T) { assertEqual(t, 1, pool.minWorkers) assertEqual(t, defaultIdleTimeout, pool.idleTimeout) } + +func TestPurgeAfterPoolStopped(t *testing.T) { + + pool := New(1, 1) + + var doneCount int32 + pool.SubmitAndWait(func() { + atomic.AddInt32(&doneCount, 1) + }) + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) + assertEqual(t, 1, pool.RunningWorkers()) + + // Simulate purger goroutine attempting to stop a worker after tasks channel is closed + pool.stopped = true + pool.stopIdleWorker() +} From fe39f934963a60e73a75960c64b931584fed70d8 Mon Sep 17 00:00:00 2001 From: chenyahui Date: Mon, 7 Mar 2022 18:22:12 +0800 Subject: [PATCH 2/3] Wait purge goroutine exit in stop function to prevent send on closed channel --- pond.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pond.go b/pond.go index 2ca413b..12453b3 100644 --- a/pond.go +++ b/pond.go @@ -91,6 +91,7 @@ type WorkerPool struct { tasks chan func() workersWaitGroup sync.WaitGroup tasksWaitGroup sync.WaitGroup + purgerWaitGroup sync.WaitGroup mutex sync.Mutex stopped bool } @@ -138,6 +139,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.tasks = make(chan func(), pool.maxCapacity) // Start purger goroutine + pool.purgerWaitGroup.Add(1) go pool.purge() // Start minWorkers workers @@ -377,11 +379,14 @@ func (p *WorkerPool) stop(waitForCompletion bool) { // Wait for all workers to exit if waitForCompletion { p.workersWaitGroup.Wait() + p.purgerWaitGroup.Wait() + close(p.tasks) } } // purge represents the work done by the purger goroutine func (p *WorkerPool) purge() { + defer p.purgerWaitGroup.Done() idleTicker := time.NewTicker(p.idleTimeout) defer idleTicker.Stop() From dd812951b9c27b5ab6bd97af492303ddb2afb9e7 Mon Sep 17 00:00:00 2001 From: chenyahui Date: Tue, 8 Mar 2022 11:07:50 +0800 Subject: [PATCH 3/3] Use Channel instead of purgerWaitGroup --- pond.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/pond.go b/pond.go index 12453b3..9667300 100644 --- a/pond.go +++ b/pond.go @@ -91,7 +91,7 @@ type WorkerPool struct { tasks chan func() workersWaitGroup sync.WaitGroup tasksWaitGroup sync.WaitGroup - purgerWaitGroup sync.WaitGroup + purgerDoneChan chan struct{} mutex sync.Mutex stopped bool } @@ -139,7 +139,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.tasks = make(chan func(), pool.maxCapacity) // Start purger goroutine - pool.purgerWaitGroup.Add(1) + pool.purgerDoneChan = make(chan struct{}) go pool.purge() // Start minWorkers workers @@ -333,13 +333,13 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks. // Tasks being processed by workers will continue until completion unless the process is terminated. func (p *WorkerPool) Stop() { - p.stop(false) + go p.stop() } // StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue // to complete before returning. func (p *WorkerPool) StopAndWait() { - p.stop(true) + p.stop() } // StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning @@ -349,7 +349,7 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { // Launch goroutine to detect when worker pool has stopped gracefully workersDone := make(chan struct{}) go func() { - p.stop(true) + p.stop() workersDone <- struct{}{} }() @@ -363,30 +363,29 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { } } -func (p *WorkerPool) stop(waitForCompletion bool) { - +func (p *WorkerPool) stop() { // Mark pool as stopped p.stopped = true // Wait for all queued tasks to complete - if waitForCompletion { - p.tasksWaitGroup.Wait() - } + p.tasksWaitGroup.Wait() // Terminate all workers & purger goroutine p.contextCancel() // Wait for all workers to exit - if waitForCompletion { - p.workersWaitGroup.Wait() - p.purgerWaitGroup.Wait() - close(p.tasks) - } + p.workersWaitGroup.Wait() + + // Wait purger goroutine to exit + <-p.purgerDoneChan + + // close tasks channel + close(p.tasks) } // purge represents the work done by the purger goroutine func (p *WorkerPool) purge() { - defer p.purgerWaitGroup.Done() + defer func() { p.purgerDoneChan <- struct{}{} }() idleTicker := time.NewTicker(p.idleTimeout) defer idleTicker.Stop()