Initial commit
This commit is contained in:
@@ -0,0 +1,9 @@
|
|||||||
|
module github.com/alitto/pond
|
||||||
|
|
||||||
|
go 1.14
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6
|
||||||
|
github.com/panjf2000/ants/v2 v2.3.1
|
||||||
|
github.com/stretchr/testify v1.5.1
|
||||||
|
)
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8=
|
||||||
|
github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w=
|
||||||
|
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38=
|
||||||
|
github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs=
|
||||||
|
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
|
||||||
|
github.com/panjf2000/ants/v2 v2.3.1 h1:9iOZHO5XlSO1Gs5K7x06uDFy8bkicWlhOKGh/TufAZg=
|
||||||
|
github.com/panjf2000/ants/v2 v2.3.1/go.mod h1:LtwNaBX6OeF5qRtQlaeGndalVwJlS2ueur7uwoAHbPA=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||||
|
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||||
|
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
|
||||||
|
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
@@ -0,0 +1,280 @@
|
|||||||
|
package pond
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime/debug"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultIdleTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func defaultPanicHandler(panic interface{}) {
|
||||||
|
fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option represents an option that can be passed when building a worker pool to customize it
|
||||||
|
type Option func(*WorkerPool)
|
||||||
|
|
||||||
|
// IdleTimeout allows to change the idle timeout for a worker pool
|
||||||
|
func IdleTimeout(idleTimeout time.Duration) Option {
|
||||||
|
return func(pool *WorkerPool) {
|
||||||
|
pool.idleTimeout = idleTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PanicHandler allows to change the panic handler function for a worker pool
|
||||||
|
func PanicHandler(panicHandler func(interface{})) Option {
|
||||||
|
return func(pool *WorkerPool) {
|
||||||
|
pool.panicHandler = panicHandler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkerPool models a pool of workers
|
||||||
|
type WorkerPool struct {
|
||||||
|
maxWorkers int
|
||||||
|
maxCapacity int
|
||||||
|
idleTimeout time.Duration
|
||||||
|
workerCount int32
|
||||||
|
tasks chan func()
|
||||||
|
dispatchedTasks chan func()
|
||||||
|
purgerQuit chan struct{}
|
||||||
|
stopOnce sync.Once
|
||||||
|
waitGroup sync.WaitGroup
|
||||||
|
panicHandler func(interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a worker pool with that can scale up to the given number of workers and capacity
|
||||||
|
func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||||
|
|
||||||
|
pool := &WorkerPool{
|
||||||
|
maxWorkers: maxWorkers,
|
||||||
|
maxCapacity: maxCapacity,
|
||||||
|
idleTimeout: defaultIdleTimeout,
|
||||||
|
tasks: make(chan func(), maxCapacity),
|
||||||
|
dispatchedTasks: make(chan func(), maxWorkers),
|
||||||
|
purgerQuit: make(chan struct{}),
|
||||||
|
panicHandler: defaultPanicHandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply all options
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start dispatcher goroutine
|
||||||
|
pool.waitGroup.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer pool.waitGroup.Done()
|
||||||
|
|
||||||
|
pool.dispatch()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start purger goroutine
|
||||||
|
pool.waitGroup.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer pool.waitGroup.Done()
|
||||||
|
|
||||||
|
pool.purge()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Running returns the number of running workers
|
||||||
|
func (p *WorkerPool) Running() int {
|
||||||
|
return int(atomic.LoadInt32(&p.workerCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit sends a task to this worker pool for execution. If the queue is full,
|
||||||
|
// it will wait until the task can be enqueued.
|
||||||
|
func (p *WorkerPool) Submit(task func()) {
|
||||||
|
if task == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit the task to the task channel
|
||||||
|
p.tasks <- task
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
|
||||||
|
// before returning.
|
||||||
|
func (p *WorkerPool) SubmitAndWait(task func()) {
|
||||||
|
if task == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
p.Submit(func() {
|
||||||
|
defer close(done)
|
||||||
|
task()
|
||||||
|
})
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||||
|
func (p *WorkerPool) Stop() {
|
||||||
|
p.stop(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete
|
||||||
|
func (p *WorkerPool) StopAndWait() {
|
||||||
|
p.stop(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispatch represents the work done by the dispatcher goroutine
|
||||||
|
func (p *WorkerPool) dispatch() {
|
||||||
|
|
||||||
|
for task := range p.tasks {
|
||||||
|
if task == nil {
|
||||||
|
// Received the signal to exit gracefully
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
// Attempt to submit the task to a worker without blocking
|
||||||
|
case p.dispatchedTasks <- task:
|
||||||
|
if p.Running() == 0 {
|
||||||
|
p.startWorker()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Create a new worker if we haven't reached the limit yet
|
||||||
|
if p.Running() < p.maxWorkers {
|
||||||
|
p.startWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block until a worker accepts this task
|
||||||
|
p.dispatchedTasks <- task
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send signal to stop all workers
|
||||||
|
close(p.dispatchedTasks)
|
||||||
|
|
||||||
|
// Send signal to stop the purger
|
||||||
|
close(p.purgerQuit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// purge represents the work done by the purger goroutine
|
||||||
|
func (p *WorkerPool) purge() {
|
||||||
|
ticker := time.NewTicker(p.idleTimeout)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// Timed out waiting for any activity to happen, attempt to stop an idle worker
|
||||||
|
case <-ticker.C:
|
||||||
|
if p.Running() > 0 {
|
||||||
|
select {
|
||||||
|
case p.dispatchedTasks <- nil:
|
||||||
|
default:
|
||||||
|
// If dispatchedTasks channel is full, no need to kill the worker
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Received the signal to exit
|
||||||
|
case <-p.purgerQuit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *WorkerPool) startWorker() {
|
||||||
|
// Increment worker count
|
||||||
|
atomic.AddInt32(&p.workerCount, 1)
|
||||||
|
|
||||||
|
// Increment waiting group semaphore
|
||||||
|
p.waitGroup.Add(1)
|
||||||
|
|
||||||
|
worker(p.dispatchedTasks, func() {
|
||||||
|
|
||||||
|
// Decrement worker count
|
||||||
|
atomic.AddInt32(&p.workerCount, -1)
|
||||||
|
|
||||||
|
// Decrement waiting group semaphore
|
||||||
|
p.waitGroup.Done()
|
||||||
|
|
||||||
|
}, p.panicHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||||
|
func (p *WorkerPool) stop(wait bool) {
|
||||||
|
|
||||||
|
p.stopOnce.Do(func() {
|
||||||
|
if wait {
|
||||||
|
// Make sure all queued tasks complete before stopping the dispatcher
|
||||||
|
p.tasks <- nil
|
||||||
|
|
||||||
|
// Close the tasks channel to prevent receiving new tasks
|
||||||
|
close(p.tasks)
|
||||||
|
|
||||||
|
// Wait for all goroutines to exit
|
||||||
|
p.waitGroup.Wait()
|
||||||
|
} else {
|
||||||
|
// Close the tasks channel to prevent receiving new tasks
|
||||||
|
close(p.tasks)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group creates a new task group
|
||||||
|
func (p *WorkerPool) Group() *TaskGroup {
|
||||||
|
return &TaskGroup{
|
||||||
|
pool: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker launches a worker goroutine
|
||||||
|
func worker(tasks chan func(), exitHandler func(), panicHandler func(interface{})) {
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if panic := recover(); panic != nil {
|
||||||
|
// Handle panic
|
||||||
|
panicHandler(panic)
|
||||||
|
|
||||||
|
// Restart goroutine
|
||||||
|
worker(tasks, exitHandler, panicHandler)
|
||||||
|
} else {
|
||||||
|
// Handle exit
|
||||||
|
exitHandler()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for task := range tasks {
|
||||||
|
if task == nil {
|
||||||
|
// We have received a signal to quit
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have received a task, execute it
|
||||||
|
task()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskGroup represents a group of related tasks
|
||||||
|
type TaskGroup struct {
|
||||||
|
pool *WorkerPool
|
||||||
|
waitGroup sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit adds a task to this group and sends it to the worker pool to be executed.
|
||||||
|
func (g *TaskGroup) Submit(task func()) {
|
||||||
|
g.waitGroup.Add(1)
|
||||||
|
g.pool.Submit(func() {
|
||||||
|
defer g.waitGroup.Done()
|
||||||
|
task()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait waits until all the tasks in this group have completed. It returns
|
||||||
|
// a slice with all (non-nil) errors returned by tasks in this group.
|
||||||
|
func (g *TaskGroup) Wait() {
|
||||||
|
|
||||||
|
// Wait for all tasks to complete
|
||||||
|
g.waitGroup.Wait()
|
||||||
|
}
|
||||||
@@ -0,0 +1,113 @@
|
|||||||
|
package pond_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alitto/pond"
|
||||||
|
"github.com/gammazero/workerpool"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
taskCount = 1000000
|
||||||
|
taskDuration = 10 * time.Millisecond
|
||||||
|
workerCount = 200000
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkPond(b *testing.B) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
pool := pond.New(workerCount, taskCount)
|
||||||
|
defer pool.StopAndWait()
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
wg.Add(taskCount)
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(taskDuration)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPondGroup(b *testing.B) {
|
||||||
|
pool := pond.New(workerCount, taskCount)
|
||||||
|
defer pool.StopAndWait()
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
group := pool.Group()
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
group.Submit(func() {
|
||||||
|
time.Sleep(taskDuration)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
group.Wait()
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkGoroutines(b *testing.B) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
wg.Add(taskCount)
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(taskDuration)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkGammazeroWorkerpool(b *testing.B) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wp := workerpool.New(workerCount)
|
||||||
|
defer wp.StopWait()
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
wg.Add(taskCount)
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
wp.Submit(func() {
|
||||||
|
time.Sleep(taskDuration)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkAnts(b *testing.B) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
p, _ := ants.NewPool(workerCount, ants.WithExpiryDuration(10*time.Second))
|
||||||
|
defer p.Release()
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
wg.Add(taskCount)
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
_ = p.Submit(func() {
|
||||||
|
time.Sleep(taskDuration)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
+295
@@ -0,0 +1,295 @@
|
|||||||
|
package pond_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alitto/pond"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubmitAndStopWaiting(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(1, 5)
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
var doneCount int32
|
||||||
|
for i := 0; i < 17; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all submitted tasks complete
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assert.Equal(int32(17), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(18, 5)
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
var doneCount int32
|
||||||
|
for i := 0; i < 17; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all submitted tasks complete
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assert.Equal(int32(17), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndStopWithoutWaiting(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(1, 5)
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
started := make(chan bool)
|
||||||
|
completed := make(chan bool)
|
||||||
|
var doneCount int32
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
started <- true
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
<-completed
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the first task started
|
||||||
|
<-started
|
||||||
|
|
||||||
|
// Stop without waiting for the rest of the tasks to start
|
||||||
|
pool.Stop()
|
||||||
|
|
||||||
|
// Let the first task complete now
|
||||||
|
completed <- true
|
||||||
|
|
||||||
|
// Only the first task should have been completed, the rest are discarded
|
||||||
|
assert.Equal(int32(1), atomic.LoadInt32(&doneCount))
|
||||||
|
|
||||||
|
// Make sure the exit lines in the worker pool are executed and covered
|
||||||
|
time.Sleep(6 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitWithNilTask(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(2, 5)
|
||||||
|
|
||||||
|
// Submit nil task
|
||||||
|
pool.Submit(nil)
|
||||||
|
|
||||||
|
// Wait until all submitted tasks complete
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndWait(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(1, 5)
|
||||||
|
defer pool.StopAndWait()
|
||||||
|
|
||||||
|
// Submit a task and wait for it to complete
|
||||||
|
var doneCount int32
|
||||||
|
pool.SubmitAndWait(func() {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(int32(1), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndWaitWithNilTask(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(2, 5)
|
||||||
|
|
||||||
|
// Submit nil task
|
||||||
|
pool.SubmitAndWait(nil)
|
||||||
|
|
||||||
|
// Wait until all submitted tasks complete
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunning(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
workerCount := 5
|
||||||
|
taskCount := 10
|
||||||
|
pool := pond.New(workerCount, taskCount)
|
||||||
|
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
var started = make(chan struct{}, workerCount)
|
||||||
|
var completed = make(chan struct{}, workerCount)
|
||||||
|
for i := 0; i < taskCount; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
started <- struct{}{}
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
<-completed
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until half the tasks have started
|
||||||
|
for i := 0; i < taskCount/2; i++ {
|
||||||
|
<-started
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(workerCount, pool.Running())
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
|
||||||
|
// Make sure half the tasks tasks complete
|
||||||
|
for i := 0; i < taskCount/2; i++ {
|
||||||
|
completed <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until the rest of the tasks have started
|
||||||
|
for i := 0; i < taskCount/2; i++ {
|
||||||
|
<-started
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure all tasks complete
|
||||||
|
for i := 0; i < taskCount/2; i++ {
|
||||||
|
completed <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitWithPanic(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(1, 5)
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
|
||||||
|
// Submit a task that panics
|
||||||
|
var doneCount int32
|
||||||
|
pool.Submit(func() {
|
||||||
|
arr := make([]string, 0)
|
||||||
|
fmt.Printf("Out of range value: %s", arr[1])
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Submit a task that completes normally
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
assert.Equal(int32(1), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitWithIdleTimeout(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond))
|
||||||
|
|
||||||
|
// Submit a task
|
||||||
|
started := make(chan bool)
|
||||||
|
completed := make(chan bool)
|
||||||
|
pool.Submit(func() {
|
||||||
|
<-started
|
||||||
|
time.Sleep(3 * time.Millisecond)
|
||||||
|
<-completed
|
||||||
|
})
|
||||||
|
|
||||||
|
// Make sure the first task has started
|
||||||
|
started <- true
|
||||||
|
|
||||||
|
// There should be 1 worker running
|
||||||
|
assert.Equal(1, pool.Running())
|
||||||
|
|
||||||
|
// Let the task complete
|
||||||
|
completed <- true
|
||||||
|
|
||||||
|
// Wait for idle timeout + 1ms
|
||||||
|
time.Sleep(3 * time.Millisecond)
|
||||||
|
|
||||||
|
// Worker should have been killed
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitWithPanicHandler(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
var capturedPanic interface{} = nil
|
||||||
|
panicHandler := func(panic interface{}) {
|
||||||
|
capturedPanic = panic
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := pond.New(1, 5, pond.PanicHandler(panicHandler))
|
||||||
|
|
||||||
|
// Submit a task that panics
|
||||||
|
pool.Submit(func() {
|
||||||
|
panic("panic now!")
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
// Panic should have been captured
|
||||||
|
assert.Equal("panic now!", capturedPanic)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGroupSubmit(t *testing.T) {
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
pool := pond.New(5, 5)
|
||||||
|
assert.Equal(0, pool.Running())
|
||||||
|
|
||||||
|
// Submit groups of tasks
|
||||||
|
var doneCount, taskCount int32
|
||||||
|
var groups []*pond.TaskGroup
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
group := pool.Group()
|
||||||
|
for j := 0; j < i+5; j++ {
|
||||||
|
group.Submit(func() {
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
taskCount++
|
||||||
|
}
|
||||||
|
groups = append(groups, group)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all groups to complete
|
||||||
|
for _, group := range groups {
|
||||||
|
group.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(int32(taskCount), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user