From 7b9b533e58b903de15ec04482260978b148b15b0 Mon Sep 17 00:00:00 2001 From: adurantecredify Date: Tue, 24 Mar 2020 15:49:58 -0300 Subject: [PATCH] Initial commit --- go.mod | 9 ++ go.sum | 23 ++++ pond.go | 280 ++++++++++++++++++++++++++++++++++++++ pond_benchmark_test.go | 113 ++++++++++++++++ pond_test.go | 295 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 720 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pond.go create mode 100644 pond_benchmark_test.go create mode 100644 pond_test.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c7b69af --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond + +go 1.14 + +require ( + github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 + github.com/panjf2000/ants/v2 v2.3.1 + github.com/stretchr/testify v1.5.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d0f64de --- /dev/null +++ b/go.sum @@ -0,0 +1,23 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8SkmSO/O5WAIX/j79ll3kuqv5VdYt9J8= +github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= +github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38= +github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= +github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= +github.com/panjf2000/ants/v2 v2.3.1 h1:9iOZHO5XlSO1Gs5K7x06uDFy8bkicWlhOKGh/TufAZg= +github.com/panjf2000/ants/v2 v2.3.1/go.mod h1:LtwNaBX6OeF5qRtQlaeGndalVwJlS2ueur7uwoAHbPA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pond.go b/pond.go new file mode 100644 index 0000000..0928b88 --- /dev/null +++ b/pond.go @@ -0,0 +1,280 @@ +package pond + +import ( + "fmt" + "runtime/debug" + "sync" + "sync/atomic" + "time" +) + +const ( + defaultIdleTimeout = 5 * time.Second +) + +func defaultPanicHandler(panic interface{}) { + fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack())) +} + +// Option represents an option that can be passed when building a worker pool to customize it +type Option func(*WorkerPool) + +// IdleTimeout allows to change the idle timeout for a worker pool +func IdleTimeout(idleTimeout time.Duration) Option { + return func(pool *WorkerPool) { + pool.idleTimeout = idleTimeout + } +} + +// PanicHandler allows to change the panic handler function for a worker pool +func PanicHandler(panicHandler func(interface{})) Option { + return func(pool *WorkerPool) { + pool.panicHandler = panicHandler + } +} + +// WorkerPool models a pool of workers +type WorkerPool struct { + maxWorkers int + maxCapacity int + idleTimeout time.Duration + workerCount int32 + tasks chan func() + dispatchedTasks chan func() + purgerQuit chan struct{} + stopOnce sync.Once + waitGroup sync.WaitGroup + panicHandler func(interface{}) +} + +// New creates a worker pool with that can scale up to the given number of workers and capacity +func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { + + pool := &WorkerPool{ + maxWorkers: maxWorkers, + maxCapacity: maxCapacity, + idleTimeout: defaultIdleTimeout, + tasks: make(chan func(), maxCapacity), + dispatchedTasks: make(chan func(), maxWorkers), + purgerQuit: make(chan struct{}), + panicHandler: defaultPanicHandler, + } + + // Apply all options + for _, opt := range options { + opt(pool) + } + + // Start dispatcher goroutine + pool.waitGroup.Add(1) + go func() { + defer pool.waitGroup.Done() + + pool.dispatch() + }() + + // Start purger goroutine + pool.waitGroup.Add(1) + go func() { + defer pool.waitGroup.Done() + + pool.purge() + }() + + return pool +} + +// Running returns the number of running workers +func (p *WorkerPool) Running() int { + return int(atomic.LoadInt32(&p.workerCount)) +} + +// Submit sends a task to this worker pool for execution. If the queue is full, +// it will wait until the task can be enqueued. +func (p *WorkerPool) Submit(task func()) { + if task == nil { + return + } + + // Submit the task to the task channel + p.tasks <- task +} + +// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete +// before returning. +func (p *WorkerPool) SubmitAndWait(task func()) { + if task == nil { + return + } + + done := make(chan struct{}) + p.Submit(func() { + defer close(done) + task() + }) + <-done +} + +// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit +func (p *WorkerPool) Stop() { + p.stop(false) +} + +// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete +func (p *WorkerPool) StopAndWait() { + p.stop(true) +} + +// dispatch represents the work done by the dispatcher goroutine +func (p *WorkerPool) dispatch() { + + for task := range p.tasks { + if task == nil { + // Received the signal to exit gracefully + break + } + + select { + // Attempt to submit the task to a worker without blocking + case p.dispatchedTasks <- task: + if p.Running() == 0 { + p.startWorker() + } + default: + // Create a new worker if we haven't reached the limit yet + if p.Running() < p.maxWorkers { + p.startWorker() + } + + // Block until a worker accepts this task + p.dispatchedTasks <- task + } + } + + // Send signal to stop all workers + close(p.dispatchedTasks) + + // Send signal to stop the purger + close(p.purgerQuit) +} + +// purge represents the work done by the purger goroutine +func (p *WorkerPool) purge() { + ticker := time.NewTicker(p.idleTimeout) + defer ticker.Stop() + + for { + select { + // Timed out waiting for any activity to happen, attempt to stop an idle worker + case <-ticker.C: + if p.Running() > 0 { + select { + case p.dispatchedTasks <- nil: + default: + // If dispatchedTasks channel is full, no need to kill the worker + } + } + + // Received the signal to exit + case <-p.purgerQuit: + return + } + } +} + +func (p *WorkerPool) startWorker() { + // Increment worker count + atomic.AddInt32(&p.workerCount, 1) + + // Increment waiting group semaphore + p.waitGroup.Add(1) + + worker(p.dispatchedTasks, func() { + + // Decrement worker count + atomic.AddInt32(&p.workerCount, -1) + + // Decrement waiting group semaphore + p.waitGroup.Done() + + }, p.panicHandler) +} + +// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit +func (p *WorkerPool) stop(wait bool) { + + p.stopOnce.Do(func() { + if wait { + // Make sure all queued tasks complete before stopping the dispatcher + p.tasks <- nil + + // Close the tasks channel to prevent receiving new tasks + close(p.tasks) + + // Wait for all goroutines to exit + p.waitGroup.Wait() + } else { + // Close the tasks channel to prevent receiving new tasks + close(p.tasks) + } + }) +} + +// Group creates a new task group +func (p *WorkerPool) Group() *TaskGroup { + return &TaskGroup{ + pool: p, + } +} + +// worker launches a worker goroutine +func worker(tasks chan func(), exitHandler func(), panicHandler func(interface{})) { + + go func() { + defer func() { + if panic := recover(); panic != nil { + // Handle panic + panicHandler(panic) + + // Restart goroutine + worker(tasks, exitHandler, panicHandler) + } else { + // Handle exit + exitHandler() + } + }() + + for task := range tasks { + if task == nil { + // We have received a signal to quit + return + } + + // We have received a task, execute it + task() + } + }() +} + +// TaskGroup represents a group of related tasks +type TaskGroup struct { + pool *WorkerPool + waitGroup sync.WaitGroup +} + +// Submit adds a task to this group and sends it to the worker pool to be executed. +func (g *TaskGroup) Submit(task func()) { + g.waitGroup.Add(1) + g.pool.Submit(func() { + defer g.waitGroup.Done() + task() + }) +} + +// Wait waits until all the tasks in this group have completed. It returns +// a slice with all (non-nil) errors returned by tasks in this group. +func (g *TaskGroup) Wait() { + + // Wait for all tasks to complete + g.waitGroup.Wait() +} diff --git a/pond_benchmark_test.go b/pond_benchmark_test.go new file mode 100644 index 0000000..2d9f2a7 --- /dev/null +++ b/pond_benchmark_test.go @@ -0,0 +1,113 @@ +package pond_test + +import ( + "sync" + "testing" + "time" + + "github.com/alitto/pond" + "github.com/gammazero/workerpool" + "github.com/panjf2000/ants/v2" +) + +const ( + taskCount = 1000000 + taskDuration = 10 * time.Millisecond + workerCount = 200000 +) + +func BenchmarkPond(b *testing.B) { + var wg sync.WaitGroup + pool := pond.New(workerCount, taskCount) + defer pool.StopAndWait() + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + time.Sleep(taskDuration) + wg.Done() + }) + } + wg.Wait() + } + b.StopTimer() +} + +func BenchmarkPondGroup(b *testing.B) { + pool := pond.New(workerCount, taskCount) + defer pool.StopAndWait() + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + group := pool.Group() + for i := 0; i < taskCount; i++ { + group.Submit(func() { + time.Sleep(taskDuration) + }) + } + group.Wait() + } + b.StopTimer() +} + +func BenchmarkGoroutines(b *testing.B) { + var wg sync.WaitGroup + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + go func() { + time.Sleep(taskDuration) + wg.Done() + }() + } + wg.Wait() + } + b.StopTimer() +} + +func BenchmarkGammazeroWorkerpool(b *testing.B) { + var wg sync.WaitGroup + wp := workerpool.New(workerCount) + defer wp.StopWait() + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + wp.Submit(func() { + time.Sleep(taskDuration) + wg.Done() + }) + } + wg.Wait() + } + b.StopTimer() +} + +func BenchmarkAnts(b *testing.B) { + var wg sync.WaitGroup + p, _ := ants.NewPool(workerCount, ants.WithExpiryDuration(10*time.Second)) + defer p.Release() + + // Submit tasks + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + _ = p.Submit(func() { + time.Sleep(taskDuration) + wg.Done() + }) + } + wg.Wait() + } + b.StopTimer() +} diff --git a/pond_test.go b/pond_test.go new file mode 100644 index 0000000..1d018bb --- /dev/null +++ b/pond_test.go @@ -0,0 +1,295 @@ +package pond_test + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/alitto/pond" + "github.com/stretchr/testify/assert" +) + +func TestSubmitAndStopWaiting(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(1, 5) + + // Submit tasks + var doneCount int32 + for i := 0; i < 17; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assert.Equal(int32(17), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(18, 5) + + // Submit tasks + var doneCount int32 + for i := 0; i < 17; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assert.Equal(int32(17), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndStopWithoutWaiting(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(1, 5) + + // Submit tasks + started := make(chan bool) + completed := make(chan bool) + var doneCount int32 + for i := 0; i < 5; i++ { + pool.Submit(func() { + started <- true + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + <-completed + }) + } + + // Make sure the first task started + <-started + + // Stop without waiting for the rest of the tasks to start + pool.Stop() + + // Let the first task complete now + completed <- true + + // Only the first task should have been completed, the rest are discarded + assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) + + // Make sure the exit lines in the worker pool are executed and covered + time.Sleep(6 * time.Millisecond) +} + +func TestSubmitWithNilTask(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(2, 5) + + // Submit nil task + pool.Submit(nil) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assert.Equal(0, pool.Running()) +} + +func TestSubmitAndWait(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(1, 5) + defer pool.StopAndWait() + + // Submit a task and wait for it to complete + var doneCount int32 + pool.SubmitAndWait(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + + assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndWaitWithNilTask(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(2, 5) + + // Submit nil task + pool.SubmitAndWait(nil) + + // Wait until all submitted tasks complete + pool.StopAndWait() + + assert.Equal(0, pool.Running()) +} + +func TestRunning(t *testing.T) { + + assert := assert.New(t) + + workerCount := 5 + taskCount := 10 + pool := pond.New(workerCount, taskCount) + + assert.Equal(0, pool.Running()) + + // Submit tasks + var started = make(chan struct{}, workerCount) + var completed = make(chan struct{}, workerCount) + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + started <- struct{}{} + time.Sleep(1 * time.Millisecond) + <-completed + }) + } + + // Wait until half the tasks have started + for i := 0; i < taskCount/2; i++ { + <-started + } + + assert.Equal(workerCount, pool.Running()) + time.Sleep(1 * time.Millisecond) + + // Make sure half the tasks tasks complete + for i := 0; i < taskCount/2; i++ { + completed <- struct{}{} + } + + // Wait until the rest of the tasks have started + for i := 0; i < taskCount/2; i++ { + <-started + } + + // Make sure all tasks complete + for i := 0; i < taskCount/2; i++ { + completed <- struct{}{} + } + + pool.StopAndWait() + + assert.Equal(0, pool.Running()) +} + +func TestSubmitWithPanic(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(1, 5) + assert.Equal(0, pool.Running()) + + // Submit a task that panics + var doneCount int32 + pool.Submit(func() { + arr := make([]string, 0) + fmt.Printf("Out of range value: %s", arr[1]) + atomic.AddInt32(&doneCount, 1) + }) + + // Submit a task that completes normally + pool.Submit(func() { + time.Sleep(2 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + + pool.StopAndWait() + assert.Equal(0, pool.Running()) + assert.Equal(int32(1), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitWithIdleTimeout(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond)) + + // Submit a task + started := make(chan bool) + completed := make(chan bool) + pool.Submit(func() { + <-started + time.Sleep(3 * time.Millisecond) + <-completed + }) + + // Make sure the first task has started + started <- true + + // There should be 1 worker running + assert.Equal(1, pool.Running()) + + // Let the task complete + completed <- true + + // Wait for idle timeout + 1ms + time.Sleep(3 * time.Millisecond) + + // Worker should have been killed + assert.Equal(0, pool.Running()) + + pool.StopAndWait() +} + +func TestSubmitWithPanicHandler(t *testing.T) { + + assert := assert.New(t) + + var capturedPanic interface{} = nil + panicHandler := func(panic interface{}) { + capturedPanic = panic + } + + pool := pond.New(1, 5, pond.PanicHandler(panicHandler)) + + // Submit a task that panics + pool.Submit(func() { + panic("panic now!") + }) + + pool.StopAndWait() + + // Panic should have been captured + assert.Equal("panic now!", capturedPanic) +} + +func TestGroupSubmit(t *testing.T) { + + assert := assert.New(t) + + pool := pond.New(5, 5) + assert.Equal(0, pool.Running()) + + // Submit groups of tasks + var doneCount, taskCount int32 + var groups []*pond.TaskGroup + for i := 0; i < 5; i++ { + group := pool.Group() + for j := 0; j < i+5; j++ { + group.Submit(func() { + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + taskCount++ + } + groups = append(groups, group) + } + + // Wait for all groups to complete + for _, group := range groups { + group.Wait() + } + + assert.Equal(int32(taskCount), atomic.LoadInt32(&doneCount)) +}