From dd812951b9c27b5ab6bd97af492303ddb2afb9e7 Mon Sep 17 00:00:00 2001 From: chenyahui Date: Tue, 8 Mar 2022 11:07:50 +0800 Subject: [PATCH] Use Channel instead of purgerWaitGroup --- pond.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/pond.go b/pond.go index 12453b3..9667300 100644 --- a/pond.go +++ b/pond.go @@ -91,7 +91,7 @@ type WorkerPool struct { tasks chan func() workersWaitGroup sync.WaitGroup tasksWaitGroup sync.WaitGroup - purgerWaitGroup sync.WaitGroup + purgerDoneChan chan struct{} mutex sync.Mutex stopped bool } @@ -139,7 +139,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.tasks = make(chan func(), pool.maxCapacity) // Start purger goroutine - pool.purgerWaitGroup.Add(1) + pool.purgerDoneChan = make(chan struct{}) go pool.purge() // Start minWorkers workers @@ -333,13 +333,13 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks. // Tasks being processed by workers will continue until completion unless the process is terminated. func (p *WorkerPool) Stop() { - p.stop(false) + go p.stop() } // StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue // to complete before returning. func (p *WorkerPool) StopAndWait() { - p.stop(true) + p.stop() } // StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning @@ -349,7 +349,7 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { // Launch goroutine to detect when worker pool has stopped gracefully workersDone := make(chan struct{}) go func() { - p.stop(true) + p.stop() workersDone <- struct{}{} }() @@ -363,30 +363,29 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { } } -func (p *WorkerPool) stop(waitForCompletion bool) { - +func (p *WorkerPool) stop() { // Mark pool as stopped p.stopped = true // Wait for all queued tasks to complete - if waitForCompletion { - p.tasksWaitGroup.Wait() - } + p.tasksWaitGroup.Wait() // Terminate all workers & purger goroutine p.contextCancel() // Wait for all workers to exit - if waitForCompletion { - p.workersWaitGroup.Wait() - p.purgerWaitGroup.Wait() - close(p.tasks) - } + p.workersWaitGroup.Wait() + + // Wait purger goroutine to exit + <-p.purgerDoneChan + + // close tasks channel + close(p.tasks) } // purge represents the work done by the purger goroutine func (p *WorkerPool) purge() { - defer p.purgerWaitGroup.Done() + defer func() { p.purgerDoneChan <- struct{}{} }() idleTicker := time.NewTicker(p.idleTimeout) defer idleTicker.Stop()