diff --git a/pond.go b/pond.go index 0928b88..dc2ac7c 100644 --- a/pond.go +++ b/pond.go @@ -16,6 +16,16 @@ func defaultPanicHandler(panic interface{}) { fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack())) } +func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int { + if workerCount < minWorkers { + return minWorkers + } + if workerCount < maxWorkers { + return 1 + } + return 0 +} + // Option represents an option that can be passed when building a worker pool to customize it type Option func(*WorkerPool) @@ -33,8 +43,16 @@ func PanicHandler(panicHandler func(interface{})) Option { } } +// MinWorkers allows to change the minimum number of workers of a worker pool +func MinWorkers(minWorkers int) Option { + return func(pool *WorkerPool) { + pool.minWorkers = minWorkers + } +} + // WorkerPool models a pool of workers type WorkerPool struct { + minWorkers int maxWorkers int maxCapacity int idleTimeout time.Duration @@ -45,6 +63,7 @@ type WorkerPool struct { stopOnce sync.Once waitGroup sync.WaitGroup panicHandler func(interface{}) + growthFn func(int, int, int) int } // New creates a worker pool with that can scale up to the given number of workers and capacity @@ -58,6 +77,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { dispatchedTasks: make(chan func(), maxWorkers), purgerQuit: make(chan struct{}), panicHandler: defaultPanicHandler, + growthFn: linearGrowthFn, } // Apply all options @@ -65,6 +85,14 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { opt(pool) } + // Make sure options are consistent + if pool.maxWorkers <= 0 { + pool.maxWorkers = 1 + } + if pool.minWorkers > pool.maxWorkers { + pool.minWorkers = pool.maxWorkers + } + // Start dispatcher goroutine pool.waitGroup.Add(1) go func() { @@ -81,6 +109,11 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.purge() }() + // Start minWorkers workers + if pool.minWorkers > 0 { + pool.startWorkers() + } + return pool } @@ -138,12 +171,12 @@ func (p *WorkerPool) dispatch() { // Attempt to submit the task to a worker without blocking case p.dispatchedTasks <- task: if p.Running() == 0 { - p.startWorker() + p.startWorkers() } default: // Create a new worker if we haven't reached the limit yet if p.Running() < p.maxWorkers { - p.startWorker() + p.startWorkers() } // Block until a worker accepts this task @@ -167,7 +200,7 @@ func (p *WorkerPool) purge() { select { // Timed out waiting for any activity to happen, attempt to stop an idle worker case <-ticker.C: - if p.Running() > 0 { + if p.Running() > p.minWorkers { select { case p.dispatchedTasks <- nil: default: @@ -182,22 +215,34 @@ func (p *WorkerPool) purge() { } } -func (p *WorkerPool) startWorker() { +func (p *WorkerPool) startWorkers() { + + count := p.growthFn(p.Running(), p.minWorkers, p.maxWorkers) + + if count == 0 { + return + } + // Increment worker count - atomic.AddInt32(&p.workerCount, 1) + atomic.AddInt32(&p.workerCount, int32(count)) // Increment waiting group semaphore - p.waitGroup.Add(1) + p.waitGroup.Add(count) - worker(p.dispatchedTasks, func() { + //go func() { + for i := 0; i < count; i++ { + worker(p.dispatchedTasks, func() { - // Decrement worker count - atomic.AddInt32(&p.workerCount, -1) + // Decrement worker count + atomic.AddInt32(&p.workerCount, -1) - // Decrement waiting group semaphore - p.waitGroup.Done() + // Decrement waiting group semaphore + p.waitGroup.Done() + + }, p.panicHandler) + } + //}() - }, p.panicHandler) } // Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit diff --git a/pond_benchmark_test.go b/pond_benchmark_test.go index 2d9f2a7..cebb7bf 100644 --- a/pond_benchmark_test.go +++ b/pond_benchmark_test.go @@ -11,9 +11,9 @@ import ( ) const ( - taskCount = 1000000 - taskDuration = 10 * time.Millisecond - workerCount = 200000 + taskCount = 10000 + taskDuration = 1 * time.Millisecond + workerCount = 100 ) func BenchmarkPond(b *testing.B) { @@ -36,6 +36,26 @@ func BenchmarkPond(b *testing.B) { b.StopTimer() } +func BenchmarkPondMinWorkers(b *testing.B) { + var wg sync.WaitGroup + pool := pond.New(workerCount, taskCount, pond.MinWorkers(workerCount)) + defer pool.StopAndWait() + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + time.Sleep(taskDuration) + wg.Done() + }) + } + wg.Wait() + } + b.StopTimer() +} + func BenchmarkPondGroup(b *testing.B) { pool := pond.New(workerCount, taskCount) defer pool.StopAndWait() @@ -72,6 +92,64 @@ func BenchmarkGoroutines(b *testing.B) { b.StopTimer() } +func BenchmarkGoroutinePool(b *testing.B) { + var wg sync.WaitGroup + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + taskChan := make(chan func()) + wg.Add(workerCount) + // Start worker goroutines + for i := 0; i < workerCount; i++ { + go func() { + for task := range taskChan { + task() + } + wg.Done() + }() + } + // Submit tasks + for i := 0; i < taskCount; i++ { + taskChan <- func() { + time.Sleep(taskDuration) + } + } + close(taskChan) + wg.Wait() + } + b.StopTimer() +} + +func BenchmarkBufferedGoroutinePool(b *testing.B) { + var wg sync.WaitGroup + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + taskChan := make(chan func(), taskCount) + wg.Add(workerCount) + // Start worker goroutines + for i := 0; i < workerCount; i++ { + go func() { + for task := range taskChan { + task() + } + wg.Done() + }() + } + // Submit tasks + for i := 0; i < taskCount; i++ { + taskChan <- func() { + time.Sleep(taskDuration) + } + } + close(taskChan) + wg.Wait() + } + b.StopTimer() +} + func BenchmarkGammazeroWorkerpool(b *testing.B) { var wg sync.WaitGroup wp := workerpool.New(workerCount) diff --git a/pond_test.go b/pond_test.go index 1d018bb..c70e6d6 100644 --- a/pond_test.go +++ b/pond_test.go @@ -209,7 +209,7 @@ func TestSubmitWithPanic(t *testing.T) { assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) } -func TestSubmitWithIdleTimeout(t *testing.T) { +func TestPoolWithCustomIdleTimeout(t *testing.T) { assert := assert.New(t) @@ -242,7 +242,7 @@ func TestSubmitWithIdleTimeout(t *testing.T) { pool.StopAndWait() } -func TestSubmitWithPanicHandler(t *testing.T) { +func TestPoolWithCustomPanicHandler(t *testing.T) { assert := assert.New(t) @@ -264,6 +264,32 @@ func TestSubmitWithPanicHandler(t *testing.T) { assert.Equal("panic now!", capturedPanic) } +func TestPoolWithCustomMinWorkers(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(10, 5, pond.MinWorkers(10)) + + // Submit a task that panics + started := make(chan struct{}) + completed := make(chan struct{}) + pool.Submit(func() { + <-started + completed <- struct{}{} + }) + + started <- struct{}{} + + // 10 workers should have been started + assert.Equal(10, pool.Running()) + + <-completed + + pool.StopAndWait() + + assert.Equal(0, pool.Running()) +} + func TestGroupSubmit(t *testing.T) { assert := assert.New(t)