diff --git a/.gitignore b/.gitignore index 66fd13c..f2dd955 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,3 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out - -# Dependency directories (remove the comment below to include it) -# vendor/ diff --git a/README.md b/README.md index b8007cf..0814f4f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,47 @@ # pond -pond: Minimalistic and High-performance goroutine worker pool writen in Go +pond: Minimalistic and High-performance goroutine worker pool written in Go + +## Features: + +- Managing and recycling a massive number of goroutines automatically +- Purging overdue goroutines periodically +- Minimalistic API for submitting tasks, getting the number of running goroutines, stopping the pool and more. +- Zero dependencies +- Handle task panics gracefully +- Efficient memory usage +- Blocking and Nonblocking modes supported + +## How to install + +```powershell +go get -u github.com/alitto/pond +``` + +## How to use + +``` go +package main + +import ( + "fmt" + + "github.com/alitto/pond" +) + +func main() { + + // Create a pool with 100 workers + pool := pond.New(100, 1000) + + // Submit 1000 tasks + for i := 0; i < 1000; i++ { + n := i + pool.Submit(func() { + fmt.Printf("Running task #%d\n", n) + }) + } + + // Stop the pool and wait for all submitted tasks to complete + pool.StopAndWait() +} +``` \ No newline at end of file diff --git a/pond_benchmark_test.go b/benchmark/benchmark_test.go similarity index 88% rename from pond_benchmark_test.go rename to benchmark/benchmark_test.go index cebb7bf..0c66f54 100644 --- a/pond_benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -1,4 +1,4 @@ -package pond_test +package benchmark import ( "sync" @@ -11,11 +11,15 @@ import ( ) const ( - taskCount = 10000 + taskCount = 1000000 taskDuration = 1 * time.Millisecond - workerCount = 100 + workerCount = 20000 ) +func testFunc() { + time.Sleep(taskDuration) +} + func BenchmarkPond(b *testing.B) { var wg sync.WaitGroup pool := pond.New(workerCount, taskCount) @@ -27,7 +31,7 @@ func BenchmarkPond(b *testing.B) { wg.Add(taskCount) for i := 0; i < taskCount; i++ { pool.Submit(func() { - time.Sleep(taskDuration) + testFunc() wg.Done() }) } @@ -47,7 +51,7 @@ func BenchmarkPondMinWorkers(b *testing.B) { wg.Add(taskCount) for i := 0; i < taskCount; i++ { pool.Submit(func() { - time.Sleep(taskDuration) + testFunc() wg.Done() }) } @@ -65,9 +69,7 @@ func BenchmarkPondGroup(b *testing.B) { for i := 0; i < b.N; i++ { group := pool.Group() for i := 0; i < taskCount; i++ { - group.Submit(func() { - time.Sleep(taskDuration) - }) + group.Submit(testFunc) } group.Wait() } @@ -83,7 +85,7 @@ func BenchmarkGoroutines(b *testing.B) { wg.Add(taskCount) for i := 0; i < taskCount; i++ { go func() { - time.Sleep(taskDuration) + testFunc() wg.Done() }() } @@ -111,9 +113,7 @@ func BenchmarkGoroutinePool(b *testing.B) { } // Submit tasks for i := 0; i < taskCount; i++ { - taskChan <- func() { - time.Sleep(taskDuration) - } + taskChan <- testFunc } close(taskChan) wg.Wait() @@ -140,9 +140,7 @@ func BenchmarkBufferedGoroutinePool(b *testing.B) { } // Submit tasks for i := 0; i < taskCount; i++ { - taskChan <- func() { - time.Sleep(taskDuration) - } + taskChan <- testFunc } close(taskChan) wg.Wait() @@ -161,7 +159,7 @@ func BenchmarkGammazeroWorkerpool(b *testing.B) { wg.Add(taskCount) for i := 0; i < taskCount; i++ { wp.Submit(func() { - time.Sleep(taskDuration) + testFunc() wg.Done() }) } @@ -181,7 +179,7 @@ func BenchmarkAnts(b *testing.B) { wg.Add(taskCount) for i := 0; i < taskCount; i++ { _ = p.Submit(func() { - time.Sleep(taskDuration) + testFunc() wg.Done() }) } diff --git a/benchmark/go.mod b/benchmark/go.mod new file mode 100644 index 0000000..7094d81 --- /dev/null +++ b/benchmark/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond/benchmark + +go 1.14 + +require ( + github.com/alitto/pond v0.0.0-20200328140845-2664f1bbde39 + github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 + github.com/panjf2000/ants/v2 v2.3.1 +) diff --git a/go.sum b/benchmark/go.sum similarity index 88% rename from go.sum rename to benchmark/go.sum index d0f64de..164166d 100644 --- a/go.sum +++ b/benchmark/go.sum @@ -1,4 +1,5 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/alitto/pond v0.0.0-20200328140845-2664f1bbde39 h1:SIo+wt6R2/Kr95aTdvNuu4ByPknfHbJPfGbJMoi85q4= +github.com/alitto/pond v0.0.0-20200328140845-2664f1bbde39/go.mod h1:EDWzkjFhFR5/owRwSl7DXKa2yuSQmPB0g6C/YZSAxCo= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -6,7 +7,6 @@ github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8Skm github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38= github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= -github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= github.com/panjf2000/ants/v2 v2.3.1 h1:9iOZHO5XlSO1Gs5K7x06uDFy8bkicWlhOKGh/TufAZg= github.com/panjf2000/ants/v2 v2.3.1/go.mod h1:LtwNaBX6OeF5qRtQlaeGndalVwJlS2ueur7uwoAHbPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -17,7 +17,6 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/examples/basic.go b/examples/basic.go new file mode 100644 index 0000000..17efba2 --- /dev/null +++ b/examples/basic.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + + "github.com/alitto/pond" +) + +func main() { + + // Create a pool with 100 workers + pool := pond.New(100, 1000) + + // Submit 1000 tasks + for i := 0; i < 1000; i++ { + n := i + pool.Submit(func() { + fmt.Printf("Running task #%d\n", n) + }) + } + + // Stop the pool and wait for all submitted tasks to complete + pool.StopAndWait() +} diff --git a/go.mod b/go.mod index c7b69af..f05cdfc 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,3 @@ module github.com/alitto/pond go 1.14 - -require ( - github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 - github.com/panjf2000/ants/v2 v2.3.1 - github.com/stretchr/testify v1.5.1 -) diff --git a/pond.go b/pond.go index dc2ac7c..a2f2e9e 100644 --- a/pond.go +++ b/pond.go @@ -9,13 +9,17 @@ import ( ) const ( + // defaultIdleTimeout defines the default idle timeout to use when not explicitly specified + // via the IdleTimeout() option defaultIdleTimeout = 5 * time.Second ) +// defaultPanicHandler is the default panic handler func defaultPanicHandler(panic interface{}) { fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack())) } +// linearGrowthFn is a function that determines how many workers to create when backpressure is detected func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int { if workerCount < minWorkers { return minWorkers @@ -26,7 +30,7 @@ func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int { return 0 } -// Option represents an option that can be passed when building a worker pool to customize it +// Option represents an option that can be passed when instantiating a worker pool to customize it type Option func(*WorkerPool) // IdleTimeout allows to change the idle timeout for a worker pool @@ -66,18 +70,20 @@ type WorkerPool struct { growthFn func(int, int, int) int } -// New creates a worker pool with that can scale up to the given number of workers and capacity +// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). +// The maxCapacity parameter determines the number of tasks that can be submitted to this pool without blocking, +// because it defines the size of the buffered channel used to receive tasks. +// The options parameter can take a list of functions to customize configuration values on this worker pool. func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { + // Instantiate the pool pool := &WorkerPool{ - maxWorkers: maxWorkers, - maxCapacity: maxCapacity, - idleTimeout: defaultIdleTimeout, - tasks: make(chan func(), maxCapacity), - dispatchedTasks: make(chan func(), maxWorkers), - purgerQuit: make(chan struct{}), - panicHandler: defaultPanicHandler, - growthFn: linearGrowthFn, + maxWorkers: maxWorkers, + maxCapacity: maxCapacity, + idleTimeout: defaultIdleTimeout, + purgerQuit: make(chan struct{}), + panicHandler: defaultPanicHandler, + growthFn: linearGrowthFn, } // Apply all options @@ -92,6 +98,16 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { if pool.minWorkers > pool.maxWorkers { pool.minWorkers = pool.maxWorkers } + if pool.maxCapacity < 0 { + pool.maxCapacity = 0 + } + if pool.idleTimeout < 0 { + pool.idleTimeout = defaultIdleTimeout + } + + // Create channels + pool.tasks = make(chan func(), pool.maxCapacity) + pool.dispatchedTasks = make(chan func(), pool.maxWorkers) // Start dispatcher goroutine pool.waitGroup.Add(1) @@ -123,7 +139,7 @@ func (p *WorkerPool) Running() int { } // Submit sends a task to this worker pool for execution. If the queue is full, -// it will wait until the task can be enqueued. +// it will wait until the task can be enqueued func (p *WorkerPool) Submit(task func()) { if task == nil { return @@ -134,7 +150,7 @@ func (p *WorkerPool) Submit(task func()) { } // SubmitAndWait sends a task to this worker pool for execution and waits for it to complete -// before returning. +// before returning func (p *WorkerPool) SubmitAndWait(task func()) { if task == nil { return @@ -148,40 +164,83 @@ func (p *WorkerPool) SubmitAndWait(task func()) { <-done } +// SubmitBefore attempts to send a task for execution to this worker pool but aborts it +// if the task did not start before the given deadline +func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { + if task == nil { + return + } + + timer := time.NewTimer(deadline) + p.Submit(func() { + select { + case <-timer.C: + // Deadline was reached, abort the task + default: + // Deadline not reached, execute the task + defer timer.Stop() + + task() + } + }) +} + // Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit func (p *WorkerPool) Stop() { - p.stop(false) + p.stopOnce.Do(func() { + // Close the tasks channel to prevent receiving new tasks + close(p.tasks) + }) } // StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete func (p *WorkerPool) StopAndWait() { - p.stop(true) + p.Stop() + + // Wait for all goroutines to exit + p.waitGroup.Wait() } // dispatch represents the work done by the dispatcher goroutine func (p *WorkerPool) dispatch() { + batchSize := p.maxWorkers + batch := make([]func(), 0) + for task := range p.tasks { - if task == nil { - // Received the signal to exit gracefully - break + batch = append(batch, task) + + // Read up to batchSize - 1 tasks without blocking + BulkReceive: + for i := 0; i < batchSize; i++ { + select { + case t := <-p.tasks: + batch = append(batch, t) + default: + break BulkReceive + } } - select { - // Attempt to submit the task to a worker without blocking - case p.dispatchedTasks <- task: - if p.Running() == 0 { - p.startWorkers() - } - default: - // Create a new worker if we haven't reached the limit yet - if p.Running() < p.maxWorkers { - p.startWorkers() - } + for _, task := range batch { + select { + // Attempt to submit the task to a worker without blocking + case p.dispatchedTasks <- task: + if p.Running() == 0 { + p.startWorkers() + } + default: + // Create a new worker if we haven't reached the limit yet + if p.Running() < p.maxWorkers { + p.startWorkers() + } - // Block until a worker accepts this task - p.dispatchedTasks <- task + // Block until a worker accepts this task + p.dispatchedTasks <- task + } } + + // Clear batch slice + batch = nil } // Send signal to stop all workers @@ -215,21 +274,17 @@ func (p *WorkerPool) purge() { } } +// startWorkers launches worker goroutines according to the growth function func (p *WorkerPool) startWorkers() { count := p.growthFn(p.Running(), p.minWorkers, p.maxWorkers) - if count == 0 { - return - } - // Increment worker count atomic.AddInt32(&p.workerCount, int32(count)) // Increment waiting group semaphore p.waitGroup.Add(count) - //go func() { for i := 0; i < count; i++ { worker(p.dispatchedTasks, func() { @@ -241,28 +296,6 @@ func (p *WorkerPool) startWorkers() { }, p.panicHandler) } - //}() - -} - -// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit -func (p *WorkerPool) stop(wait bool) { - - p.stopOnce.Do(func() { - if wait { - // Make sure all queued tasks complete before stopping the dispatcher - p.tasks <- nil - - // Close the tasks channel to prevent receiving new tasks - close(p.tasks) - - // Wait for all goroutines to exit - p.waitGroup.Wait() - } else { - // Close the tasks channel to prevent receiving new tasks - close(p.tasks) - } - }) } // Group creates a new task group @@ -307,7 +340,7 @@ type TaskGroup struct { waitGroup sync.WaitGroup } -// Submit adds a task to this group and sends it to the worker pool to be executed. +// Submit adds a task to this group and sends it to the worker pool to be executed func (g *TaskGroup) Submit(task func()) { g.waitGroup.Add(1) g.pool.Submit(func() { @@ -316,8 +349,7 @@ func (g *TaskGroup) Submit(task func()) { }) } -// Wait waits until all the tasks in this group have completed. It returns -// a slice with all (non-nil) errors returned by tasks in this group. +// Wait waits until all the tasks in this group have completed func (g *TaskGroup) Wait() { // Wait for all tasks to complete diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go new file mode 100644 index 0000000..34b8072 --- /dev/null +++ b/pond_blackbox_test.go @@ -0,0 +1,340 @@ +package pond_test + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/alitto/pond" +) + +func assertEqual(t *testing.T, expected interface{}, actual interface{}) { + if expected != actual { + t.Helper() + t.Errorf("Expected %T(%v) but was %T(%v)", expected, expected, actual, actual) + } +} + +func TestSubmitAndStopWaiting(t *testing.T) { + + pool := pond.New(1, 5) + + // Submit tasks + var doneCount int32 + for i := 0; i < 17; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, int32(17), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) { + + pool := pond.New(18, 5) + + // Submit tasks + var doneCount int32 + for i := 0; i < 17; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, int32(17), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndStopWithoutWaiting(t *testing.T) { + + pool := pond.New(1, 5) + + // Submit tasks + started := make(chan bool) + completed := make(chan bool) + var doneCount int32 + for i := 0; i < 5; i++ { + pool.Submit(func() { + started <- true + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + <-completed + }) + } + + // Make sure the first task started + <-started + + // Stop without waiting for the rest of the tasks to start + pool.Stop() + + // Let the first task complete now + completed <- true + + // Only the first task should have been completed, the rest are discarded + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) + + // Make sure the exit lines in the worker pool are executed and covered + time.Sleep(6 * time.Millisecond) +} + +func TestSubmitWithNilTask(t *testing.T) { + + pool := pond.New(2, 5) + + // Submit nil task + pool.Submit(nil) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) +} + +func TestSubmitAndWait(t *testing.T) { + + pool := pond.New(1, 5) + defer pool.StopAndWait() + + // Submit a task and wait for it to complete + var doneCount int32 + pool.SubmitAndWait(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndWaitWithNilTask(t *testing.T) { + + pool := pond.New(2, 5) + + // Submit nil task + pool.SubmitAndWait(nil) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) +} + +func TestSubmitBefore(t *testing.T) { + + pool := pond.New(1, 5) + + // Submit a long-running task + var doneCount int32 + pool.SubmitBefore(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }, 1*time.Millisecond) + + // Submit a task that times out after 2ms + pool.SubmitBefore(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }, 2*time.Millisecond) + + pool.StopAndWait() + + // Only the first task must have executed + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitBeforeWithNilTask(t *testing.T) { + + pool := pond.New(3, 5) + + // Submit nil task + pool.SubmitBefore(nil, 1*time.Millisecond) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) +} + +func TestRunning(t *testing.T) { + + workerCount := 5 + taskCount := 10 + pool := pond.New(workerCount, taskCount) + + assertEqual(t, 0, pool.Running()) + + // Submit tasks + var started = make(chan struct{}, workerCount) + var completed = make(chan struct{}, workerCount) + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + started <- struct{}{} + time.Sleep(1 * time.Millisecond) + <-completed + }) + } + + // Wait until half the tasks have started + for i := 0; i < taskCount/2; i++ { + <-started + } + + assertEqual(t, workerCount, pool.Running()) + time.Sleep(1 * time.Millisecond) + + // Make sure half the tasks tasks complete + for i := 0; i < taskCount/2; i++ { + completed <- struct{}{} + } + + // Wait until the rest of the tasks have started + for i := 0; i < taskCount/2; i++ { + <-started + } + + // Make sure all tasks complete + for i := 0; i < taskCount/2; i++ { + completed <- struct{}{} + } + + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) +} + +func TestSubmitWithPanic(t *testing.T) { + + pool := pond.New(1, 5) + assertEqual(t, 0, pool.Running()) + + // Submit a task that panics + var doneCount int32 + pool.Submit(func() { + arr := make([]string, 0) + fmt.Printf("Out of range value: %s", arr[1]) + atomic.AddInt32(&doneCount, 1) + }) + + // Submit a task that completes normally + pool.Submit(func() { + time.Sleep(2 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) + assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) +} + +func TestPoolWithCustomIdleTimeout(t *testing.T) { + + pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond)) + + // Submit a task + started := make(chan bool) + completed := make(chan bool) + pool.Submit(func() { + <-started + time.Sleep(3 * time.Millisecond) + <-completed + }) + + // Make sure the first task has started + started <- true + + // There should be 1 worker running + assertEqual(t, 1, pool.Running()) + + // Let the task complete + completed <- true + + // Wait for idle timeout + 1ms + time.Sleep(3 * time.Millisecond) + + // Worker should have been killed + assertEqual(t, 0, pool.Running()) + + pool.StopAndWait() +} + +func TestPoolWithCustomPanicHandler(t *testing.T) { + + var capturedPanic interface{} = nil + panicHandler := func(panic interface{}) { + capturedPanic = panic + } + + pool := pond.New(1, 5, pond.PanicHandler(panicHandler)) + + // Submit a task that panics + pool.Submit(func() { + panic("panic now!") + }) + + pool.StopAndWait() + + // Panic should have been captured + assertEqual(t, "panic now!", capturedPanic) +} + +func TestPoolWithCustomMinWorkers(t *testing.T) { + + pool := pond.New(10, 5, pond.MinWorkers(10)) + + // Submit a task that panics + started := make(chan struct{}) + completed := make(chan struct{}) + pool.Submit(func() { + <-started + completed <- struct{}{} + }) + + started <- struct{}{} + + // 10 workers should have been started + assertEqual(t, 10, pool.Running()) + + <-completed + + pool.StopAndWait() + + assertEqual(t, 0, pool.Running()) +} + +func TestGroupSubmit(t *testing.T) { + + pool := pond.New(5, 5) + assertEqual(t, 0, pool.Running()) + + // Submit groups of tasks + var doneCount, taskCount int32 + var groups []*pond.TaskGroup + for i := 0; i < 5; i++ { + group := pool.Group() + for j := 0; j < i+5; j++ { + group.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + taskCount++ + } + groups = append(groups, group) + } + + // Wait for all groups to complete + for _, group := range groups { + group.Wait() + } + + assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount)) +} diff --git a/pond_test.go b/pond_test.go index c70e6d6..cc6c747 100644 --- a/pond_test.go +++ b/pond_test.go @@ -1,321 +1,38 @@ -package pond_test +package pond import ( - "fmt" - "sync/atomic" "testing" "time" - - "github.com/alitto/pond" - "github.com/stretchr/testify/assert" ) -func TestSubmitAndStopWaiting(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(1, 5) - - // Submit tasks - var doneCount int32 - for i := 0; i < 17; i++ { - pool.Submit(func() { - time.Sleep(1 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) +func assertEqual(t *testing.T, expected interface{}, actual interface{}) { + if expected != actual { + t.Helper() + t.Errorf("Expected %T(%v) but was %T(%v)", expected, expected, actual, actual) } - - // Wait until all submitted tasks complete - pool.StopAndWait() - - assert.Equal(int32(17), atomic.LoadInt32(&doneCount)) } -func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) { +func TestNew(t *testing.T) { - assert := assert.New(t) - - pool := pond.New(18, 5) - - // Submit tasks - var doneCount int32 - for i := 0; i < 17; i++ { - pool.Submit(func() { - time.Sleep(1 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) - } - - // Wait until all submitted tasks complete - pool.StopAndWait() - - assert.Equal(int32(17), atomic.LoadInt32(&doneCount)) + pool := New(17, 10, MinWorkers(2), IdleTimeout(1*time.Second)) + assertEqual(t, 17, pool.maxWorkers) + assertEqual(t, 10, pool.maxCapacity) + assertEqual(t, 2, pool.minWorkers) + assertEqual(t, 1*time.Second, pool.idleTimeout) } -func TestSubmitAndStopWithoutWaiting(t *testing.T) { +func TestNewWithInconsistentOptions(t *testing.T) { - assert := assert.New(t) - - pool := pond.New(1, 5) - - // Submit tasks - started := make(chan bool) - completed := make(chan bool) - var doneCount int32 - for i := 0; i < 5; i++ { - pool.Submit(func() { - started <- true - time.Sleep(5 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - <-completed - }) - } - - // Make sure the first task started - <-started - - // Stop without waiting for the rest of the tasks to start - pool.Stop() - - // Let the first task complete now - completed <- true - - // Only the first task should have been completed, the rest are discarded - assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) - - // Make sure the exit lines in the worker pool are executed and covered - time.Sleep(6 * time.Millisecond) + pool := New(-10, -5, MinWorkers(20), IdleTimeout(-1*time.Second)) + assertEqual(t, 1, pool.maxWorkers) + assertEqual(t, 0, pool.maxCapacity) + assertEqual(t, 1, pool.minWorkers) + assertEqual(t, defaultIdleTimeout, pool.idleTimeout) } -func TestSubmitWithNilTask(t *testing.T) { +func TestLinearGrowthFn(t *testing.T) { - assert := assert.New(t) - - pool := pond.New(2, 5) - - // Submit nil task - pool.Submit(nil) - - // Wait until all submitted tasks complete - pool.StopAndWait() - - assert.Equal(0, pool.Running()) -} - -func TestSubmitAndWait(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(1, 5) - defer pool.StopAndWait() - - // Submit a task and wait for it to complete - var doneCount int32 - pool.SubmitAndWait(func() { - time.Sleep(5 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) - - assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) -} - -func TestSubmitAndWaitWithNilTask(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(2, 5) - - // Submit nil task - pool.SubmitAndWait(nil) - - // Wait until all submitted tasks complete - pool.StopAndWait() - - assert.Equal(0, pool.Running()) -} - -func TestRunning(t *testing.T) { - - assert := assert.New(t) - - workerCount := 5 - taskCount := 10 - pool := pond.New(workerCount, taskCount) - - assert.Equal(0, pool.Running()) - - // Submit tasks - var started = make(chan struct{}, workerCount) - var completed = make(chan struct{}, workerCount) - for i := 0; i < taskCount; i++ { - pool.Submit(func() { - started <- struct{}{} - time.Sleep(1 * time.Millisecond) - <-completed - }) - } - - // Wait until half the tasks have started - for i := 0; i < taskCount/2; i++ { - <-started - } - - assert.Equal(workerCount, pool.Running()) - time.Sleep(1 * time.Millisecond) - - // Make sure half the tasks tasks complete - for i := 0; i < taskCount/2; i++ { - completed <- struct{}{} - } - - // Wait until the rest of the tasks have started - for i := 0; i < taskCount/2; i++ { - <-started - } - - // Make sure all tasks complete - for i := 0; i < taskCount/2; i++ { - completed <- struct{}{} - } - - pool.StopAndWait() - - assert.Equal(0, pool.Running()) -} - -func TestSubmitWithPanic(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(1, 5) - assert.Equal(0, pool.Running()) - - // Submit a task that panics - var doneCount int32 - pool.Submit(func() { - arr := make([]string, 0) - fmt.Printf("Out of range value: %s", arr[1]) - atomic.AddInt32(&doneCount, 1) - }) - - // Submit a task that completes normally - pool.Submit(func() { - time.Sleep(2 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) - - pool.StopAndWait() - assert.Equal(0, pool.Running()) - assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) -} - -func TestPoolWithCustomIdleTimeout(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond)) - - // Submit a task - started := make(chan bool) - completed := make(chan bool) - pool.Submit(func() { - <-started - time.Sleep(3 * time.Millisecond) - <-completed - }) - - // Make sure the first task has started - started <- true - - // There should be 1 worker running - assert.Equal(1, pool.Running()) - - // Let the task complete - completed <- true - - // Wait for idle timeout + 1ms - time.Sleep(3 * time.Millisecond) - - // Worker should have been killed - assert.Equal(0, pool.Running()) - - pool.StopAndWait() -} - -func TestPoolWithCustomPanicHandler(t *testing.T) { - - assert := assert.New(t) - - var capturedPanic interface{} = nil - panicHandler := func(panic interface{}) { - capturedPanic = panic - } - - pool := pond.New(1, 5, pond.PanicHandler(panicHandler)) - - // Submit a task that panics - pool.Submit(func() { - panic("panic now!") - }) - - pool.StopAndWait() - - // Panic should have been captured - assert.Equal("panic now!", capturedPanic) -} - -func TestPoolWithCustomMinWorkers(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(10, 5, pond.MinWorkers(10)) - - // Submit a task that panics - started := make(chan struct{}) - completed := make(chan struct{}) - pool.Submit(func() { - <-started - completed <- struct{}{} - }) - - started <- struct{}{} - - // 10 workers should have been started - assert.Equal(10, pool.Running()) - - <-completed - - pool.StopAndWait() - - assert.Equal(0, pool.Running()) -} - -func TestGroupSubmit(t *testing.T) { - - assert := assert.New(t) - - pool := pond.New(5, 5) - assert.Equal(0, pool.Running()) - - // Submit groups of tasks - var doneCount, taskCount int32 - var groups []*pond.TaskGroup - for i := 0; i < 5; i++ { - group := pool.Group() - for j := 0; j < i+5; j++ { - group.Submit(func() { - time.Sleep(1 * time.Millisecond) - atomic.AddInt32(&doneCount, 1) - }) - taskCount++ - } - groups = append(groups, group) - } - - // Wait for all groups to complete - for _, group := range groups { - group.Wait() - } - - assert.Equal(int32(taskCount), atomic.LoadInt32(&doneCount)) + assertEqual(t, 1, linearGrowthFn(0, 1, 1)) + assertEqual(t, 1, linearGrowthFn(0, 1, 2)) + assertEqual(t, 0, linearGrowthFn(3, 1, 3)) }