From 61e1a00a7647c4d091a883e9ea2668dd178684d8 Mon Sep 17 00:00:00 2001 From: Alejandro Durante Date: Sat, 25 Dec 2021 20:57:46 -0300 Subject: [PATCH 1/2] Improve handling of tasks submitted to a stopped pool --- pond.go | 29 ++++++++++++++++++++++++++--- pond_blackbox_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/pond.go b/pond.go index 10a80ea..58d82e5 100644 --- a/pond.go +++ b/pond.go @@ -1,6 +1,7 @@ package pond import ( + "errors" "fmt" "runtime/debug" "sync" @@ -14,6 +15,11 @@ const ( defaultIdleTimeout = 5 * time.Second ) +var ( + // SubmitOnStoppedPoolError is thrown when attempting to submit a task to a pool that has been stopped + SubmitOnStoppedPoolError = errors.New("worker pool has been stopped and is no longer accepting tasks") +) + // defaultPanicHandler is the default panic handler func defaultPanicHandler(panic interface{}) { fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack())) @@ -77,6 +83,7 @@ type WorkerPool struct { stopOnce sync.Once waitGroup sync.WaitGroup mutex sync.Mutex + stopped bool } // New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). @@ -193,6 +200,11 @@ func (p *WorkerPool) CompletedTasks() uint64 { return p.SuccessfulTasks() + p.FailedTasks() } +// Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise. +func (p *WorkerPool) Stopped() bool { + return p.stopped +} + // Submit sends a task to this worker pool for execution. If the queue is full, // it will wait until the task is dispatched to a worker goroutine. func (p *WorkerPool) Submit(task func()) { @@ -206,11 +218,19 @@ func (p *WorkerPool) TrySubmit(task func()) bool { return p.submit(task, false) } -func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted bool) { +func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { if task == nil { return false } + if p.Stopped() { + // Pool is stopped and caller must submit the task + if mustSubmit { + panic(SubmitOnStoppedPoolError) + } + return false + } + // Increment submitted and waiting task counters as soon as we receive a task atomic.AddUint64(&p.submittedTaskCount, 1) atomic.AddUint64(&p.waitingTaskCount, 1) @@ -244,7 +264,7 @@ func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted b } } - if !canWaitForIdleWorker { + if !mustSubmit { select { case p.tasks <- task: submitted = true @@ -301,6 +321,9 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit func (p *WorkerPool) Stop() { p.stopOnce.Do(func() { + // Mark pool as stopped + p.stopped = true + // Send the signal to stop the purger goroutine close(p.purgerQuit) }) @@ -412,7 +435,7 @@ func (p *WorkerPool) Group() *TaskGroup { } // worker launches a worker goroutine -func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { +func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { defer func() { // Decrement idle count diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index de52402..b9f90f2 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -197,6 +197,20 @@ func TestTrySubmit(t *testing.T) { assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) } +func TestTrySubmitOnStoppedPool(t *testing.T) { + + // Create a pool and stop it immediately + pool := pond.New(1, 0) + assertEqual(t, false, pool.Stopped()) + pool.StopAndWait() + assertEqual(t, true, pool.Stopped()) + + submitted := pool.TrySubmit(func() {}) + + // Task should not be accepted by the pool + assertEqual(t, false, submitted) +} + func TestSubmitToIdle(t *testing.T) { pool := pond.New(1, 5) @@ -224,6 +238,27 @@ func TestSubmitToIdle(t *testing.T) { assertEqual(t, int(0), pool.IdleWorkers()) } +func TestSubmitOnStoppedPool(t *testing.T) { + + // Create a pool and stop it immediately + pool := pond.New(1, 0) + assertEqual(t, false, pool.Stopped()) + pool.StopAndWait() + assertEqual(t, true, pool.Stopped()) + + // Attempt to submit a task on a stopped pool + var err interface{} = nil + func() { + defer func() { + err = recover() + }() + pool.Submit(func() {}) + }() + + // Call to Submit should have failed with SubmitOnStoppedPoolError error + assertEqual(t, pond.SubmitOnStoppedPoolError, err) +} + func TestRunning(t *testing.T) { workerCount := 5 From f833392da4edaf5b9d7934210dd739d66cfdf3bd Mon Sep 17 00:00:00 2001 From: Alejandro Durante Date: Sun, 26 Dec 2021 10:40:19 -0300 Subject: [PATCH 2/2] Update dependencies in benchmark --- benchmark/go.mod | 8 +++++--- benchmark/go.sum | 13 ++++++------- pond.go | 6 +++--- pond_blackbox_test.go | 4 ++-- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/benchmark/go.mod b/benchmark/go.mod index 0d016be..cc573f3 100644 --- a/benchmark/go.mod +++ b/benchmark/go.mod @@ -1,11 +1,13 @@ module github.com/alitto/pond/benchmark -go 1.14 +go 1.17 require ( github.com/alitto/pond v1.3.0 - github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 - github.com/panjf2000/ants/v2 v2.4.0 + github.com/gammazero/workerpool v1.1.2 + github.com/panjf2000/ants/v2 v2.4.7 ) +require github.com/gammazero/deque v0.1.0 // indirect + replace github.com/alitto/pond => ../ diff --git a/benchmark/go.sum b/benchmark/go.sum index 840d8cd..4df4784 100644 --- a/benchmark/go.sum +++ b/benchmark/go.sum @@ -1,18 +1,17 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= -github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= -github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38= -github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= -github.com/panjf2000/ants/v2 v2.4.0 h1:embKPQeNWMRbnrRKURv4TXJwjQRWMEAfqZT6Pe5hZNc= -github.com/panjf2000/ants/v2 v2.4.0/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik= +github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M= +github.com/gammazero/workerpool v1.1.2 h1:vuioDQbgrz4HoaCi2q1HLlOXdpbap5AET7xu5/qj87g= +github.com/gammazero/workerpool v1.1.2/go.mod h1:UelbXcO0zCIGFcufcirHhq2/xtLXJdQ29qZNlXG9OjQ= +github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= +github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= diff --git a/pond.go b/pond.go index 58d82e5..c5c849a 100644 --- a/pond.go +++ b/pond.go @@ -16,8 +16,8 @@ const ( ) var ( - // SubmitOnStoppedPoolError is thrown when attempting to submit a task to a pool that has been stopped - SubmitOnStoppedPoolError = errors.New("worker pool has been stopped and is no longer accepting tasks") + // ErrSubmitOnStoppedPool is thrown when attempting to submit a task to a pool that has been stopped + ErrSubmitOnStoppedPool = errors.New("worker pool has been stopped and is no longer accepting tasks") ) // defaultPanicHandler is the default panic handler @@ -226,7 +226,7 @@ func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) { if p.Stopped() { // Pool is stopped and caller must submit the task if mustSubmit { - panic(SubmitOnStoppedPoolError) + panic(ErrSubmitOnStoppedPool) } return false } diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index b9f90f2..88934db 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -255,8 +255,8 @@ func TestSubmitOnStoppedPool(t *testing.T) { pool.Submit(func() {}) }() - // Call to Submit should have failed with SubmitOnStoppedPoolError error - assertEqual(t, pond.SubmitOnStoppedPoolError, err) + // Call to Submit should have failed with ErrSubmitOnStoppedPool error + assertEqual(t, pond.ErrSubmitOnStoppedPool, err) } func TestRunning(t *testing.T) {