Merge pull request #22 from alitto/feature/1.6.1
Improve handling of tasks submitted to a stopped pool
This commit is contained in:
+5
-3
@@ -1,11 +1,13 @@
|
|||||||
module github.com/alitto/pond/benchmark
|
module github.com/alitto/pond/benchmark
|
||||||
|
|
||||||
go 1.14
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.3.0
|
github.com/alitto/pond v1.3.0
|
||||||
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6
|
github.com/gammazero/workerpool v1.1.2
|
||||||
github.com/panjf2000/ants/v2 v2.4.0
|
github.com/panjf2000/ants/v2 v2.4.7
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require github.com/gammazero/deque v0.1.0 // indirect
|
||||||
|
|
||||||
replace github.com/alitto/pond => ../
|
replace github.com/alitto/pond => ../
|
||||||
|
|||||||
+6
-7
@@ -1,18 +1,17 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8=
|
github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik=
|
||||||
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w=
|
github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
|
||||||
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38=
|
github.com/gammazero/workerpool v1.1.2 h1:vuioDQbgrz4HoaCi2q1HLlOXdpbap5AET7xu5/qj87g=
|
||||||
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs=
|
github.com/gammazero/workerpool v1.1.2/go.mod h1:UelbXcO0zCIGFcufcirHhq2/xtLXJdQ29qZNlXG9OjQ=
|
||||||
github.com/panjf2000/ants/v2 v2.4.0 h1:embKPQeNWMRbnrRKURv4TXJwjQRWMEAfqZT6Pe5hZNc=
|
github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M=
|
||||||
github.com/panjf2000/ants/v2 v2.4.0/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
|
github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
|
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package pond
|
package pond
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -14,6 +15,11 @@ const (
|
|||||||
defaultIdleTimeout = 5 * time.Second
|
defaultIdleTimeout = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrSubmitOnStoppedPool is thrown when attempting to submit a task to a pool that has been stopped
|
||||||
|
ErrSubmitOnStoppedPool = errors.New("worker pool has been stopped and is no longer accepting tasks")
|
||||||
|
)
|
||||||
|
|
||||||
// defaultPanicHandler is the default panic handler
|
// defaultPanicHandler is the default panic handler
|
||||||
func defaultPanicHandler(panic interface{}) {
|
func defaultPanicHandler(panic interface{}) {
|
||||||
fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack()))
|
fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack()))
|
||||||
@@ -77,6 +83,7 @@ type WorkerPool struct {
|
|||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
waitGroup sync.WaitGroup
|
waitGroup sync.WaitGroup
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
stopped bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
||||||
@@ -193,6 +200,11 @@ func (p *WorkerPool) CompletedTasks() uint64 {
|
|||||||
return p.SuccessfulTasks() + p.FailedTasks()
|
return p.SuccessfulTasks() + p.FailedTasks()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stopped returns true if the pool has been stopped and is no longer accepting tasks, and false otherwise.
|
||||||
|
func (p *WorkerPool) Stopped() bool {
|
||||||
|
return p.stopped
|
||||||
|
}
|
||||||
|
|
||||||
// Submit sends a task to this worker pool for execution. If the queue is full,
|
// Submit sends a task to this worker pool for execution. If the queue is full,
|
||||||
// it will wait until the task is dispatched to a worker goroutine.
|
// it will wait until the task is dispatched to a worker goroutine.
|
||||||
func (p *WorkerPool) Submit(task func()) {
|
func (p *WorkerPool) Submit(task func()) {
|
||||||
@@ -206,11 +218,19 @@ func (p *WorkerPool) TrySubmit(task func()) bool {
|
|||||||
return p.submit(task, false)
|
return p.submit(task, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted bool) {
|
func (p *WorkerPool) submit(task func(), mustSubmit bool) (submitted bool) {
|
||||||
if task == nil {
|
if task == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.Stopped() {
|
||||||
|
// Pool is stopped and caller must submit the task
|
||||||
|
if mustSubmit {
|
||||||
|
panic(ErrSubmitOnStoppedPool)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Increment submitted and waiting task counters as soon as we receive a task
|
// Increment submitted and waiting task counters as soon as we receive a task
|
||||||
atomic.AddUint64(&p.submittedTaskCount, 1)
|
atomic.AddUint64(&p.submittedTaskCount, 1)
|
||||||
atomic.AddUint64(&p.waitingTaskCount, 1)
|
atomic.AddUint64(&p.waitingTaskCount, 1)
|
||||||
@@ -244,7 +264,7 @@ func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !canWaitForIdleWorker {
|
if !mustSubmit {
|
||||||
select {
|
select {
|
||||||
case p.tasks <- task:
|
case p.tasks <- task:
|
||||||
submitted = true
|
submitted = true
|
||||||
@@ -301,6 +321,9 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
|||||||
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||||
func (p *WorkerPool) Stop() {
|
func (p *WorkerPool) Stop() {
|
||||||
p.stopOnce.Do(func() {
|
p.stopOnce.Do(func() {
|
||||||
|
// Mark pool as stopped
|
||||||
|
p.stopped = true
|
||||||
|
|
||||||
// Send the signal to stop the purger goroutine
|
// Send the signal to stop the purger goroutine
|
||||||
close(p.purgerQuit)
|
close(p.purgerQuit)
|
||||||
})
|
})
|
||||||
@@ -412,7 +435,7 @@ func (p *WorkerPool) Group() *TaskGroup {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// worker launches a worker goroutine
|
// worker launches a worker goroutine
|
||||||
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
|
func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// Decrement idle count
|
// Decrement idle count
|
||||||
|
|||||||
@@ -197,6 +197,20 @@ func TestTrySubmit(t *testing.T) {
|
|||||||
assertEqual(t, int32(1), atomic.LoadInt32(&doneCount))
|
assertEqual(t, int32(1), atomic.LoadInt32(&doneCount))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTrySubmitOnStoppedPool(t *testing.T) {
|
||||||
|
|
||||||
|
// Create a pool and stop it immediately
|
||||||
|
pool := pond.New(1, 0)
|
||||||
|
assertEqual(t, false, pool.Stopped())
|
||||||
|
pool.StopAndWait()
|
||||||
|
assertEqual(t, true, pool.Stopped())
|
||||||
|
|
||||||
|
submitted := pool.TrySubmit(func() {})
|
||||||
|
|
||||||
|
// Task should not be accepted by the pool
|
||||||
|
assertEqual(t, false, submitted)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubmitToIdle(t *testing.T) {
|
func TestSubmitToIdle(t *testing.T) {
|
||||||
|
|
||||||
pool := pond.New(1, 5)
|
pool := pond.New(1, 5)
|
||||||
@@ -224,6 +238,27 @@ func TestSubmitToIdle(t *testing.T) {
|
|||||||
assertEqual(t, int(0), pool.IdleWorkers())
|
assertEqual(t, int(0), pool.IdleWorkers())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubmitOnStoppedPool(t *testing.T) {
|
||||||
|
|
||||||
|
// Create a pool and stop it immediately
|
||||||
|
pool := pond.New(1, 0)
|
||||||
|
assertEqual(t, false, pool.Stopped())
|
||||||
|
pool.StopAndWait()
|
||||||
|
assertEqual(t, true, pool.Stopped())
|
||||||
|
|
||||||
|
// Attempt to submit a task on a stopped pool
|
||||||
|
var err interface{} = nil
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
err = recover()
|
||||||
|
}()
|
||||||
|
pool.Submit(func() {})
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Call to Submit should have failed with ErrSubmitOnStoppedPool error
|
||||||
|
assertEqual(t, pond.ErrSubmitOnStoppedPool, err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestRunning(t *testing.T) {
|
func TestRunning(t *testing.T) {
|
||||||
|
|
||||||
workerCount := 5
|
workerCount := 5
|
||||||
|
|||||||
Reference in New Issue
Block a user