Pool context option & stop with timeout

This commit is contained in:
Alejandro Durante
2022-01-02 09:38:32 -03:00
parent 9d7bd8fe08
commit 6c719078e5
9 changed files with 243 additions and 60 deletions
+96 -50
View File
@@ -1,6 +1,7 @@
package pond
import (
"context"
"errors"
"fmt"
"runtime/debug"
@@ -54,22 +55,31 @@ func Strategy(strategy ResizingStrategy) Option {
}
}
// PanicHandler allows to change the panic handler function for a worker pool
// PanicHandler allows to change the panic handler function of a worker pool
func PanicHandler(panicHandler func(interface{})) Option {
return func(pool *WorkerPool) {
pool.panicHandler = panicHandler
}
}
// Context configures a parent context on a worker pool to stop all workers when it is cancelled
func Context(parentCtx context.Context) Option {
return func(pool *WorkerPool) {
pool.context, pool.contextCancel = context.WithCancel(parentCtx)
}
}
// WorkerPool models a pool of workers
type WorkerPool struct {
// Configurable settings
maxWorkers int
maxCapacity int
minWorkers int
idleTimeout time.Duration
strategy ResizingStrategy
panicHandler func(interface{})
maxWorkers int
maxCapacity int
minWorkers int
idleTimeout time.Duration
strategy ResizingStrategy
panicHandler func(interface{})
context context.Context
contextCancel context.CancelFunc
// Atomic counters
workerCount int32
idleWorkerCount int32
@@ -78,12 +88,11 @@ type WorkerPool struct {
successfulTaskCount uint64
failedTaskCount uint64
// Private properties
tasks chan func()
purgerQuit chan struct{}
stopOnce sync.Once
waitGroup sync.WaitGroup
mutex sync.Mutex
stopped bool
tasks chan func()
stopOnce sync.Once
waitGroup sync.WaitGroup
mutex sync.Mutex
stopped bool
}
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
@@ -120,17 +129,16 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
pool.idleTimeout = defaultIdleTimeout
}
// Create internal channels
// Initialize base context (if not already set)
if pool.context == nil {
Context(context.Background())(pool)
}
// Create tasks channel
pool.tasks = make(chan func(), pool.maxCapacity)
pool.purgerQuit = make(chan struct{})
// Start purger goroutine
pool.waitGroup.Add(1)
go func() {
defer pool.waitGroup.Done()
pool.purge()
}()
go pool.purge()
// Start minWorkers workers
if pool.minWorkers > 0 {
@@ -298,7 +306,7 @@ func (p *WorkerPool) SubmitAndWait(task func()) {
}
// SubmitBefore attempts to send a task for execution to this worker pool but aborts it
// if the task did not start before the given deadline
// if the task did not start before the given deadline.
func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
if task == nil {
return
@@ -318,23 +326,59 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
})
}
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
// Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks.
// Tasks being processed by workers will continue until completion unless the process is terminated.
// This method can only be called once.
func (p *WorkerPool) Stop() {
p.stopOnce.Do(func() {
// Mark pool as stopped
p.stopped = true
// Send the signal to stop the purger goroutine
close(p.purgerQuit)
// Stop accepting new tasks
close(p.tasks)
// Terminate all workers & purger goroutine
p.contextCancel()
})
}
// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete
// StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue
// to complete before returning. This method can only be called once.
func (p *WorkerPool) StopAndWait() {
p.Stop()
p.stopOnce.Do(func() {
// Mark pool as stopped
p.stopped = true
// Wait for all goroutines to exit
p.waitGroup.Wait()
// Stop accepting new tasks
close(p.tasks)
// Wait for all workers to exit
p.waitGroup.Wait()
// Terminate all workers & purger goroutine
p.contextCancel()
})
}
// StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning
// or until the given deadline is reached, whichever comes first. This method can only be called once.
func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
// Detect if worker pool is already stopped
workersDone := make(chan struct{})
go func() {
p.StopAndWait()
workersDone <- struct{}{}
}()
// Wait until either all workers have exited or the deadline is reached
select {
case <-workersDone:
return
case <-time.After(deadline):
p.contextCancel()
return
}
}
// purge represents the work done by the purger goroutine
@@ -343,7 +387,6 @@ func (p *WorkerPool) purge() {
idleTicker := time.NewTicker(p.idleTimeout)
defer idleTicker.Stop()
Purge:
for {
select {
// Timed out waiting for any activity to happen, attempt to kill an idle worker
@@ -351,14 +394,11 @@ Purge:
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers {
p.tasks <- nil
}
case <-p.purgerQuit:
break Purge
// Pool context was cancelled, exit
case <-p.context.Done():
return
}
}
// Send signal to stop all workers
close(p.tasks)
}
// startWorkers creates new worker goroutines to run the given tasks
@@ -370,7 +410,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
}
// Launch worker
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
return true
}
@@ -435,7 +475,7 @@ func (p *WorkerPool) Group() *TaskGroup {
}
// worker launches a worker goroutine
func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
defer func() {
// Decrement idle count
@@ -452,20 +492,26 @@ func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitH
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
for task := range tasks {
if task == nil {
// We have received a signal to quit
for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to quit
return
}
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// We have received a task, execute it
taskExecutor(task)
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// We have received a task, execute it
taskExecutor(task)
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
}