Use Channel instead of purgerWaitGroup

This commit is contained in:
chenyahui
2022-03-08 11:07:50 +08:00
parent fe39f93496
commit dd812951b9
+15 -16
View File
@@ -91,7 +91,7 @@ type WorkerPool struct {
tasks chan func() tasks chan func()
workersWaitGroup sync.WaitGroup workersWaitGroup sync.WaitGroup
tasksWaitGroup sync.WaitGroup tasksWaitGroup sync.WaitGroup
purgerWaitGroup sync.WaitGroup purgerDoneChan chan struct{}
mutex sync.Mutex mutex sync.Mutex
stopped bool stopped bool
} }
@@ -139,7 +139,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
pool.tasks = make(chan func(), pool.maxCapacity) pool.tasks = make(chan func(), pool.maxCapacity)
// Start purger goroutine // Start purger goroutine
pool.purgerWaitGroup.Add(1) pool.purgerDoneChan = make(chan struct{})
go pool.purge() go pool.purge()
// Start minWorkers workers // Start minWorkers workers
@@ -333,13 +333,13 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
// Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks. // Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks.
// Tasks being processed by workers will continue until completion unless the process is terminated. // Tasks being processed by workers will continue until completion unless the process is terminated.
func (p *WorkerPool) Stop() { func (p *WorkerPool) Stop() {
p.stop(false) go p.stop()
} }
// StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue // StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue
// to complete before returning. // to complete before returning.
func (p *WorkerPool) StopAndWait() { func (p *WorkerPool) StopAndWait() {
p.stop(true) p.stop()
} }
// StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning // StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning
@@ -349,7 +349,7 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
// Launch goroutine to detect when worker pool has stopped gracefully // Launch goroutine to detect when worker pool has stopped gracefully
workersDone := make(chan struct{}) workersDone := make(chan struct{})
go func() { go func() {
p.stop(true) p.stop()
workersDone <- struct{}{} workersDone <- struct{}{}
}() }()
@@ -363,30 +363,29 @@ func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
} }
} }
func (p *WorkerPool) stop(waitForCompletion bool) { func (p *WorkerPool) stop() {
// Mark pool as stopped // Mark pool as stopped
p.stopped = true p.stopped = true
// Wait for all queued tasks to complete // Wait for all queued tasks to complete
if waitForCompletion { p.tasksWaitGroup.Wait()
p.tasksWaitGroup.Wait()
}
// Terminate all workers & purger goroutine // Terminate all workers & purger goroutine
p.contextCancel() p.contextCancel()
// Wait for all workers to exit // Wait for all workers to exit
if waitForCompletion { p.workersWaitGroup.Wait()
p.workersWaitGroup.Wait()
p.purgerWaitGroup.Wait() // Wait purger goroutine to exit
close(p.tasks) <-p.purgerDoneChan
}
// close tasks channel
close(p.tasks)
} }
// purge represents the work done by the purger goroutine // purge represents the work done by the purger goroutine
func (p *WorkerPool) purge() { func (p *WorkerPool) purge() {
defer p.purgerWaitGroup.Done() defer func() { p.purgerDoneChan <- struct{}{} }()
idleTicker := time.NewTicker(p.idleTimeout) idleTicker := time.NewTicker(p.idleTimeout)
defer idleTicker.Stop() defer idleTicker.Stop()