diff --git a/README.md b/README.md index 1cc987f..ced26fa 100644 --- a/README.md +++ b/README.md @@ -31,11 +31,12 @@ Some common scenarios include: - Task panics are handled gracefully (configurable panic handler) - Supports Non-blocking and Blocking task submission modes (buffered / unbuffered) - Very high performance under heavy workloads (See [benchmarks](#benchmarks)) +- **New (since v1.3.0)**: configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy. - [API reference](https://pkg.go.dev/github.com/alitto/pond) ## How to install -```powershell +```bash go get -u github.com/alitto/pond ``` @@ -155,6 +156,16 @@ panicHandler := func(p interface{}) { // This will create a pool that will handle panics using a custom panic handler pool := pond.New(10, 1000, pond.PanicHandler(panicHandler))) +``` +- **Strategy**: Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the `pond.ResizingStrategy` interface or choose one of the 3 presets: + - **Eager**: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks. + - **Balanced**: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time. This is the default strategy. + - **Lazy**: maximizes throughput at the expense of responsiveness. This strategy is meant for worker pools that will operate close to their max. capacity most of the time. +``` go +// Example: create pools with different resizing strategies +eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager)) +balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced)) +lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy)) ``` ## API Reference @@ -163,30 +174,50 @@ Full API reference is available at https://pkg.go.dev/github.com/alitto/pond ## Benchmarks -We ran a few [benchmarks](benchmark/benchmark_test.go) to show how _pond_'s performance compares against some of the most popular worker pool libraries available for Go ([ants](https://github.com/panjf2000/ants/) and [gammazero's workerpool](https://github.com/gammazero/workerpool)). +We ran a few [benchmarks](benchmark/benchmark_test.go) to see how _pond_'s performance compares against some of the most popular worker pool libraries available for Go ([ants](https://github.com/panjf2000/ants/) and [gammazero's workerpool](https://github.com/gammazero/workerpool)), as well as just launching unbounded goroutines and manually creating a goroutine worker pool (inspired by [gobyexample.com](https://gobyexample.com/worker-pools)), using either a buffered or an unbuffered channel to dispatch tasks. -We also included benchmarks to compare it against just launching 1M goroutines and manually creating a goroutine worker pool (inspired by [gobyexample.com](https://gobyexample.com/worker-pools)), using either a buffered or an unbuffered channel to dispatch tasks. +The test consists of submitting 3 different workloads to each worker pool: +- *1M-10ms*: 1 million tasks that sleep for 10 milliseconds (`time.Sleep(10*time.Millisecond)`) +- *100k-500ms*: 100 thousand tasks that sleep for 500 milliseconds (`time.Sleep(500*time.Millisecond)`) +- *10k-1000ms*: 10 thousand tasks that sleep for 1 second (`time.Sleep(1*time.Second)`) -The test consists of submitting 1 million tasks to the pool, each of them simulating a 10ms operation by executing `time.Sleep(10 * time.Millisecond)`. All pools are configured to use a maximum of 200k workers and initialization times are not taken into account. + All pools are configured to use a maximum of 200k workers and initialization times are taken into account. Here are the results: -```powershell +```bash goos: linux goarch: amd64 pkg: github.com/alitto/pond/benchmark -BenchmarkPond-8 2 503513856 ns/op 65578500 B/op 1057273 allocs/op -BenchmarkGoroutines-8 3 444264750 ns/op 81560042 B/op 1003312 allocs/op -BenchmarkGoroutinePool-8 1 1035752534 ns/op 79889952 B/op 512480 allocs/op -BenchmarkBufferedGoroutinePool-8 2 968502858 ns/op 51945376 B/op 419122 allocs/op -BenchmarkGammazeroWorkerpool-8 1 1413724148 ns/op 18018800 B/op 1023746 allocs/op -BenchmarkAnts-8 2 665947820 ns/op 19401172 B/op 1046906 allocs/op +BenchmarkAll/1M-10ms/Pond-Eager-8 2 620347142 ns/op 82768720 B/op 1086686 allocs/op +BenchmarkAll/1M-10ms/Pond-Balanced-8 2 578973910 ns/op 81339088 B/op 1083203 allocs/op +BenchmarkAll/1M-10ms/Pond-Lazy-8 2 613344573 ns/op 84347248 B/op 1084987 allocs/op +BenchmarkAll/1M-10ms/Goroutines-8 2 540765682 ns/op 98457168 B/op 1060433 allocs/op +BenchmarkAll/1M-10ms/GoroutinePool-8 1 1157705614 ns/op 68137088 B/op 1409763 allocs/op +BenchmarkAll/1M-10ms/BufferedPool-8 1 1158068370 ns/op 76426272 B/op 1412739 allocs/op +BenchmarkAll/1M-10ms/Gammazero-8 1 1330312458 ns/op 34524328 B/op 1029692 allocs/op +BenchmarkAll/1M-10ms/AntsPool-8 2 724231628 ns/op 37870404 B/op 1077297 allocs/op +BenchmarkAll/100k-500ms/Pond-Eager-8 2 604180003 ns/op 31523028 B/op 349877 allocs/op +BenchmarkAll/100k-500ms/Pond-Balanced-8 1 1060079592 ns/op 35520416 B/op 398779 allocs/op +BenchmarkAll/100k-500ms/Pond-Lazy-8 1 1053705909 ns/op 35040512 B/op 392696 allocs/op +BenchmarkAll/100k-500ms/Goroutines-8 2 551869174 ns/op 8000016 B/op 100001 allocs/op +BenchmarkAll/100k-500ms/GoroutinePool-8 2 635442074 ns/op 20764560 B/op 299632 allocs/op +BenchmarkAll/100k-500ms/BufferedPool-8 2 641683384 ns/op 21647840 B/op 299661 allocs/op +BenchmarkAll/100k-500ms/Gammazero-8 2 667449574 ns/op 16241864 B/op 249664 allocs/op +BenchmarkAll/100k-500ms/AntsPool-8 2 659853037 ns/op 37300372 B/op 549784 allocs/op +BenchmarkAll/10k-1000ms/Pond-Eager-8 1 1014320653 ns/op 12135080 B/op 39692 allocs/op +BenchmarkAll/10k-1000ms/Pond-Balanced-8 1 1015979207 ns/op 12083704 B/op 39518 allocs/op +BenchmarkAll/10k-1000ms/Pond-Lazy-8 1 1036374161 ns/op 12046632 B/op 39366 allocs/op +BenchmarkAll/10k-1000ms/Goroutines-8 1 1007837894 ns/op 800016 B/op 10001 allocs/op +BenchmarkAll/10k-1000ms/GoroutinePool-8 1 1149536612 ns/op 21393024 B/op 222458 allocs/op +BenchmarkAll/10k-1000ms/BufferedPool-8 1 1127286218 ns/op 20343584 B/op 219359 allocs/op +BenchmarkAll/10k-1000ms/Gammazero-8 1 1023249222 ns/op 2019688 B/op 29374 allocs/op +BenchmarkAll/10k-1000ms/AntsPool-8 1 1016280850 ns/op 4155904 B/op 59487 allocs/op PASS -ok github.com/alitto/pond/benchmark 12.109s -Success: Benchmarks passed. +ok github.com/alitto/pond/benchmark 37.331s ``` -As you can see, _pond_ (503.5ms) outperforms _ants_ (665.9ms), _Gammazero's workerpool_ (1413.7ms), unbuffered goruotine pool (1035.8ms) and buffered goroutine pool (968.5ms) but it falls behind unlimited goroutines (444.3ms). +As you can see, _pond_'s resizing strategies (Eager, Balanced or Lazy) behave differently under different workloads and generally one of them outperforms the other worker pool implementations, except for launching unbounded goroutines. Leaving aside the fact that launching unlimited goroutines defeats the goal of limiting concurrency over a resource, its performance is highly dependant on how much resources (CPU and memory) are available at a given time, which make it unpredictable and likely to cause starvation. In other words, it's generally not a good idea for production applications. diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index 8014954..dab5b9f 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -1,6 +1,7 @@ package benchmark import ( + "fmt" "sync" "testing" "time" @@ -10,144 +11,193 @@ import ( "github.com/panjf2000/ants/v2" ) -const ( - taskCount = 1000000 - taskDuration = 10 * time.Millisecond - workerCount = 200000 -) +type workload struct { + name string + taskCount int + taskDuration time.Duration +} -func testFunc() { - time.Sleep(taskDuration) +type subject struct { + name string + test poolTest + config poolConfig +} + +type poolConfig struct { + minWorkers int + maxWorkers int + maxCapacity int + strategy pond.ResizingStrategy +} + +type poolTest func(taskCount int, taskFunc func(), config poolConfig) + +var workloads = []workload{ + {"1M-10ms", 1000000, 10 * time.Millisecond}, + {"100k-500ms", 100000, 500 * time.Millisecond}, + {"10k-1000ms", 10000, 1000 * time.Millisecond}, +} + +var defaultPoolConfig = poolConfig{ + maxWorkers: 200000, +} + +var pondSubjects = []subject{ + {"Pond-Eager", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Eager}}, + {"Pond-Balanced", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Balanced}}, + {"Pond-Lazy", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Lazy}}, +} + +var otherSubjects = []subject{ + {"Goroutines", unboundedGoroutines, defaultPoolConfig}, + {"GoroutinePool", goroutinePool, defaultPoolConfig}, + {"BufferedPool", bufferedGoroutinePool, defaultPoolConfig}, + {"Gammazero", gammazeroWorkerpool, defaultPoolConfig}, + {"AntsPool", antsPool, defaultPoolConfig}, } func BenchmarkPond(b *testing.B) { - var wg sync.WaitGroup - pool := pond.New(workerCount, taskCount) - defer pool.StopAndWait() + runBenchmarks(b, workloads, pondSubjects) +} - // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(taskCount) - for i := 0; i < taskCount; i++ { - pool.Submit(func() { - testFunc() - wg.Done() +func BenchmarkAll(b *testing.B) { + allSubjects := make([]subject, 0) + allSubjects = append(allSubjects, pondSubjects...) + allSubjects = append(allSubjects, otherSubjects...) + runBenchmarks(b, workloads, allSubjects) +} + +func runBenchmarks(b *testing.B, workloads []workload, subjects []subject) { + for _, workload := range workloads { + taskFunc := func() { + time.Sleep(workload.taskDuration) + } + for _, subject := range subjects { + name := fmt.Sprintf("%s/%s", workload.name, subject.name) + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + subject.test(workload.taskCount, taskFunc, subject.config) + } }) } - wg.Wait() } - b.StopTimer() } -func BenchmarkGoroutines(b *testing.B) { +func pondPool(taskCount int, taskFunc func(), config poolConfig) { var wg sync.WaitGroup - + pool := pond.New(config.maxWorkers, config.maxCapacity, + pond.MinWorkers(config.minWorkers), + pond.Strategy(config.strategy)) // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(taskCount) - for i := 0; i < taskCount; i++ { - go func() { - testFunc() - wg.Done() - }() - } - wg.Wait() + wg.Add(taskCount) + for n := 0; n < taskCount; n++ { + pool.Submit(func() { + taskFunc() + wg.Done() + }) } - b.StopTimer() + wg.Wait() + pool.StopAndWait() } -func BenchmarkGoroutinePool(b *testing.B) { +func unboundedGoroutines(taskCount int, taskFunc func(), config poolConfig) { var wg sync.WaitGroup - - // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - taskChan := make(chan func()) - wg.Add(workerCount) - // Start worker goroutines - for i := 0; i < workerCount; i++ { - go func() { - for task := range taskChan { - task() - } - wg.Done() - }() - } - // Submit tasks - for i := 0; i < taskCount; i++ { - taskChan <- testFunc - } - close(taskChan) - wg.Wait() + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + go func() { + taskFunc() + wg.Done() + }() } - b.StopTimer() + wg.Wait() } -func BenchmarkBufferedGoroutinePool(b *testing.B) { - var wg sync.WaitGroup - - // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - taskChan := make(chan func(), taskCount) - wg.Add(workerCount) - // Start worker goroutines - for i := 0; i < workerCount; i++ { - go func() { - for task := range taskChan { - task() - } - wg.Done() - }() - } - // Submit tasks - for i := 0; i < taskCount; i++ { - taskChan <- testFunc - } - close(taskChan) - wg.Wait() +func goroutinePool(taskCount int, taskFunc func(), config poolConfig) { + // Start worker goroutines + var poolWg sync.WaitGroup + taskChan := make(chan func()) + poolWg.Add(config.maxWorkers) + for i := 0; i < config.maxWorkers; i++ { + go func() { + for task := range taskChan { + task() + } + poolWg.Done() + }() } - b.StopTimer() + + // Submit tasks and wait for completion + var wg sync.WaitGroup + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + taskChan <- func() { + taskFunc() + wg.Done() + } + } + close(taskChan) + wg.Wait() + poolWg.Wait() } -func BenchmarkGammazeroWorkerpool(b *testing.B) { +func bufferedGoroutinePool(taskCount int, taskFunc func(), config poolConfig) { + // Start worker goroutines + var poolWg sync.WaitGroup + taskChan := make(chan func(), taskCount) + poolWg.Add(config.maxWorkers) + for i := 0; i < config.maxWorkers; i++ { + go func() { + for task := range taskChan { + task() + } + poolWg.Done() + }() + } + + // Submit tasks and wait for completion var wg sync.WaitGroup - wp := workerpool.New(workerCount) + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + taskChan <- func() { + taskFunc() + wg.Done() + } + } + close(taskChan) + wg.Wait() + poolWg.Wait() +} + +func gammazeroWorkerpool(taskCount int, taskFunc func(), config poolConfig) { + // Create pool + wp := workerpool.New(config.maxWorkers) defer wp.StopWait() - // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(taskCount) - for i := 0; i < taskCount; i++ { - wp.Submit(func() { - testFunc() - wg.Done() - }) - } - wg.Wait() - } - b.StopTimer() -} - -func BenchmarkAnts(b *testing.B) { + // Submit tasks and wait for completion var wg sync.WaitGroup - p, _ := ants.NewPool(workerCount, ants.WithExpiryDuration(10*time.Second)) - defer p.Release() - - // Submit tasks - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(taskCount) - for i := 0; i < taskCount; i++ { - _ = p.Submit(func() { - testFunc() - wg.Done() - }) - } - wg.Wait() + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + wp.Submit(func() { + taskFunc() + wg.Done() + }) } - b.StopTimer() + wg.Wait() +} + +func antsPool(taskCount int, taskFunc func(), config poolConfig) { + // Create pool + pool, _ := ants.NewPool(config.maxWorkers, ants.WithExpiryDuration(10*time.Second)) + defer pool.Release() + + // Submit tasks and wait for completion + var wg sync.WaitGroup + wg.Add(taskCount) + for i := 0; i < taskCount; i++ { + _ = pool.Submit(func() { + taskFunc() + wg.Done() + }) + } + wg.Wait() } diff --git a/benchmark/go.mod b/benchmark/go.mod index e30eefa..0d016be 100644 --- a/benchmark/go.mod +++ b/benchmark/go.mod @@ -3,7 +3,9 @@ module github.com/alitto/pond/benchmark go 1.14 require ( - github.com/alitto/pond v1.2.0 + github.com/alitto/pond v1.3.0 github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 - github.com/panjf2000/ants/v2 v2.3.1 + github.com/panjf2000/ants/v2 v2.4.0 ) + +replace github.com/alitto/pond => ../ diff --git a/benchmark/go.sum b/benchmark/go.sum index 9d6bbe3..840d8cd 100644 --- a/benchmark/go.sum +++ b/benchmark/go.sum @@ -1,5 +1,3 @@ -github.com/alitto/pond v1.2.0 h1:5WFAG2MbRflLIMyoIT8MpyNDzI7k7bNsuZzVflS/yIg= -github.com/alitto/pond v1.2.0/go.mod h1:VkhnWZhFBtkzZgpjXEyQ4Skyf8nfcGXsZ7PTklN29a4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -7,8 +5,8 @@ github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46 h1:iX4+rD9Fjdx8Skm github.com/gammazero/deque v0.0.0-20200227231300-1e9af0e52b46/go.mod h1:D90+MBHVc9Sk1lJAbEVgws0eYEurY4mv2TDso3Nxh3w= github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6 h1:1Cy/haf7XO4OyrkGid0Wq5CMluIErbvDptVAt8UTy38= github.com/gammazero/workerpool v0.0.0-20200311205957-7b00833861c6/go.mod h1:/XWO2YAUUpPi3smDlFBl0vpX0JHwUomDM/oRMwRmnSs= -github.com/panjf2000/ants/v2 v2.3.1 h1:9iOZHO5XlSO1Gs5K7x06uDFy8bkicWlhOKGh/TufAZg= -github.com/panjf2000/ants/v2 v2.3.1/go.mod h1:LtwNaBX6OeF5qRtQlaeGndalVwJlS2ueur7uwoAHbPA= +github.com/panjf2000/ants/v2 v2.4.0 h1:embKPQeNWMRbnrRKURv4TXJwjQRWMEAfqZT6Pe5hZNc= +github.com/panjf2000/ants/v2 v2.4.0/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/examples/dynamic_size.go b/examples/dynamic_size/dynamic_size.go similarity index 100% rename from examples/dynamic_size.go rename to examples/dynamic_size/dynamic_size.go diff --git a/examples/dynamic_size/go.mod b/examples/dynamic_size/go.mod new file mode 100644 index 0000000..00f8792 --- /dev/null +++ b/examples/dynamic_size/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond/examples/dynamic_size + +go 1.14 + +require ( + github.com/alitto/pond v1.3.0 +) + +replace github.com/alitto/pond => ../../ diff --git a/examples/fixed_size.go b/examples/fixed_size/fixed_size.go similarity index 100% rename from examples/fixed_size.go rename to examples/fixed_size/fixed_size.go diff --git a/examples/fixed_size/go.mod b/examples/fixed_size/go.mod new file mode 100644 index 0000000..51566e3 --- /dev/null +++ b/examples/fixed_size/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond/examples/fixed_size + +go 1.14 + +require ( + github.com/alitto/pond v1.3.0 +) + +replace github.com/alitto/pond => ../../ diff --git a/examples/task_group/go.mod b/examples/task_group/go.mod new file mode 100644 index 0000000..fd4fcad --- /dev/null +++ b/examples/task_group/go.mod @@ -0,0 +1,9 @@ +module github.com/alitto/pond/examples/task_group + +go 1.14 + +require ( + github.com/alitto/pond v1.3.0 +) + +replace github.com/alitto/pond => ../../ diff --git a/examples/task_group.go b/examples/task_group/task_group.go similarity index 100% rename from examples/task_group.go rename to examples/task_group/task_group.go diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/pond.go b/pond.go index b4c9ac1..80bd88f 100644 --- a/pond.go +++ b/pond.go @@ -2,6 +2,7 @@ package pond import ( "fmt" + "math" "runtime/debug" "sync" "sync/atomic" @@ -19,15 +20,9 @@ func defaultPanicHandler(panic interface{}) { fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack())) } -// linearGrowthFn is a function that determines how many workers to create when backpressure is detected -func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int { - if workerCount < minWorkers { - return minWorkers - } - if workerCount < maxWorkers { - return 1 - } - return 0 +// ResizingStrategy represents a pool resizing strategy +type ResizingStrategy interface { + Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, delta time.Duration) int } // Option represents an option that can be passed when instantiating a worker pool to customize it @@ -40,13 +35,6 @@ func IdleTimeout(idleTimeout time.Duration) Option { } } -// PanicHandler allows to change the panic handler function for a worker pool -func PanicHandler(panicHandler func(interface{})) Option { - return func(pool *WorkerPool) { - pool.panicHandler = panicHandler - } -} - // MinWorkers allows to change the minimum number of workers of a worker pool func MinWorkers(minWorkers int) Option { return func(pool *WorkerPool) { @@ -54,20 +42,42 @@ func MinWorkers(minWorkers int) Option { } } +// Strategy allows to change the strategy used to resize the pool +func Strategy(strategy ResizingStrategy) Option { + return func(pool *WorkerPool) { + pool.strategy = strategy + } +} + +// PanicHandler allows to change the panic handler function for a worker pool +func PanicHandler(panicHandler func(interface{})) Option { + return func(pool *WorkerPool) { + pool.panicHandler = panicHandler + } +} + // WorkerPool models a pool of workers type WorkerPool struct { - minWorkers int - maxWorkers int - maxCapacity int - idleTimeout time.Duration - workerCount int32 + // Configurable settings + maxWorkers int + maxCapacity int + minWorkers int + idleTimeout time.Duration + strategy ResizingStrategy + panicHandler func(interface{}) + // Atomic counters + workerCount int32 + idleWorkerCount int32 + completedTaskCount uint64 + // Private properties tasks chan func() dispatchedTasks chan func() purgerQuit chan struct{} stopOnce sync.Once waitGroup sync.WaitGroup - panicHandler func(interface{}) - growthFn func(int, int, int) int + // Debug information + debug bool + maxWorkerCount int } // New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). @@ -81,9 +91,8 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { maxWorkers: maxWorkers, maxCapacity: maxCapacity, idleTimeout: defaultIdleTimeout, - purgerQuit: make(chan struct{}), + strategy: Balanced, panicHandler: defaultPanicHandler, - growthFn: linearGrowthFn, } // Apply all options @@ -105,9 +114,10 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.idleTimeout = defaultIdleTimeout } - // Create channels + // Create internal channels pool.tasks = make(chan func(), pool.maxCapacity) pool.dispatchedTasks = make(chan func(), pool.maxWorkers) + pool.purgerQuit = make(chan struct{}) // Start dispatcher goroutine pool.waitGroup.Add(1) @@ -127,7 +137,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { // Start minWorkers workers if pool.minWorkers > 0 { - pool.startWorkers() + pool.startWorkers(pool.minWorkers, nil) } return pool @@ -138,6 +148,11 @@ func (p *WorkerPool) Running() int { return int(atomic.LoadInt32(&p.workerCount)) } +// Idle returns the number of idle workers +func (p *WorkerPool) Idle() int { + return int(atomic.LoadInt32(&p.idleWorkerCount)) +} + // Submit sends a task to this worker pool for execution. If the queue is full, // it will wait until the task can be enqueued func (p *WorkerPool) Submit(task func()) { @@ -188,8 +203,8 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { // Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit func (p *WorkerPool) Stop() { p.stopOnce.Do(func() { - // Close the tasks channel to prevent receiving new tasks - close(p.tasks) + // Send signal to stop the purger + close(p.purgerQuit) }) } @@ -204,38 +219,120 @@ func (p *WorkerPool) StopAndWait() { // dispatch represents the work done by the dispatcher goroutine func (p *WorkerPool) dispatch() { - batchSize := p.maxWorkers batch := make([]func(), 0) + batchSize := int(math.Max(float64(p.minWorkers), 1000)) + var lastCompletedTasks uint64 = 0 + var lastCycle time.Time = time.Now() for task := range p.tasks { - batch = append(batch, task) - // Read up to batchSize - 1 tasks without blocking - BulkReceive: - for i := 0; i < batchSize; i++ { + idleCount := p.Idle() + dispatchedImmediately := 0 + + // Dispatch up to idleCount tasks without blocking + nextTask := task + ImmediateDispatch: + for i := 0; i < idleCount; i++ { + + // Attempt to dispatch select { - case t := <-p.tasks: - batch = append(batch, t) + case p.dispatchedTasks <- nextTask: + dispatchedImmediately++ + default: + break ImmediateDispatch + } + + // Attempt to receive another task + select { + case t, ok := <-p.tasks: + if !ok { + // Nothing to dispatch + nextTask = nil + break ImmediateDispatch + } + nextTask = t + default: + nextTask = nil + break ImmediateDispatch + } + } + if nextTask == nil { + continue + } + + // Start batching tasks + batch = append(batch, nextTask) + + // Read up to batchSize tasks without blocking + BulkReceive: + for i := 0; i < batchSize-1; i++ { + select { + case t, ok := <-p.tasks: + if !ok { + break BulkReceive + } + if t != nil { + batch = append(batch, t) + } default: break BulkReceive } } - for _, task := range batch { - select { - // Attempt to submit the task to a worker without blocking - case p.dispatchedTasks <- task: - if p.Running() == 0 { - p.startWorkers() - } - default: - // Create a new worker if we haven't reached the limit yet - if p.Running() < p.maxWorkers { - p.startWorkers() - } + // Resize the pool + now := time.Now() + delta := now.Sub(lastCycle) + workload := len(batch) + runningCount := p.Running() + lastCycle = now + currentCompletedTasks := atomic.LoadUint64(&p.completedTaskCount) + completedTasks := int(currentCompletedTasks - lastCompletedTasks) + if completedTasks < 0 { + completedTasks = 0 + } + lastCompletedTasks = currentCompletedTasks + targetDelta := p.calculatePoolSizeDelta(runningCount, idleCount, workload+dispatchedImmediately, completedTasks, delta) - // Block until a worker accepts this task - p.dispatchedTasks <- task + // Start up to targetDelta workers + dispatched := 0 + if targetDelta > 0 { + p.startWorkers(targetDelta, batch) + dispatched = workload + if targetDelta < workload { + dispatched = targetDelta + } + } else if targetDelta < 0 { + // Kill targetDelta workers + for i := 0; i < -targetDelta; i++ { + p.dispatchedTasks <- nil + } + } + + dispatchedBlocking := 0 + + if workload > dispatched { + for _, task := range batch[dispatched:] { + // Attempt to dispatch the task without blocking + select { + case p.dispatchedTasks <- task: + default: + // Block until a worker accepts this task + p.dispatchedTasks <- task + dispatchedBlocking++ + } + } + } + + // Adjust batch size + if dispatchedBlocking > 0 { + if batchSize > 1 { + batchSize = 1 + } + } else { + maxBatchSize := runningCount + targetDelta + batchSize = batchSize * 2 + if batchSize > maxBatchSize { + batchSize = maxBatchSize } } @@ -243,8 +340,8 @@ func (p *WorkerPool) dispatch() { batch = nil } - // Send signal to stop the purger - close(p.purgerQuit) + // Send signal to stop all workers + close(p.dispatchedTasks) } // purge represents the work done by the purger goroutine @@ -254,40 +351,81 @@ func (p *WorkerPool) purge() { for { select { - // Timed out waiting for any activity to happen, attempt to stop an idle worker + // Timed out waiting for any activity to happen, attempt to resize the pool case <-ticker.C: - if p.Running() > p.minWorkers { + if p.Idle() > 0 { select { - case p.dispatchedTasks <- nil: + case p.tasks <- nil: default: - // If dispatchedTasks channel is full, no need to kill the worker + // If tasks channel is full, there's no need to resize the pool } } // Received the signal to exit case <-p.purgerQuit: - // Send signal to stop all workers - close(p.dispatchedTasks) + // Close the tasks channel to prevent receiving new tasks + close(p.tasks) return } } } -// startWorkers launches worker goroutines according to the growth function -func (p *WorkerPool) startWorkers() { +// calculatePoolSizeDelta calculates what's the delta to reach the ideal pool size based on the current size and workload +func (p *WorkerPool) calculatePoolSizeDelta(runningWorkers, idleWorkers, + incomingTasks, completedTasks int, duration time.Duration) int { - count := p.growthFn(p.Running(), p.minWorkers, p.maxWorkers) + delta := p.strategy.Resize(runningWorkers, idleWorkers, p.minWorkers, p.maxWorkers, + incomingTasks, completedTasks, duration) + + targetSize := runningWorkers + delta + + // Cannot go below minWorkers + if targetSize < p.minWorkers { + targetSize = p.minWorkers + } + // Cannot go above maxWorkers + if targetSize > p.maxWorkers { + targetSize = p.maxWorkers + } + + if p.debug { + // Print debugging information + durationSecs := duration.Seconds() + inputRate := float64(incomingTasks) / durationSecs + outputRate := float64(completedTasks) / durationSecs + message := fmt.Sprintf("%d\t%d\t%d\t%d\t\"%f\"\t\"%f\"\t%d\t\"%f\"\n", + runningWorkers, idleWorkers, incomingTasks, completedTasks, + inputRate, outputRate, + delta, durationSecs) + fmt.Printf(message) + } + + return targetSize - runningWorkers +} + +// startWorkers creates new worker goroutines to run the given tasks +func (p *WorkerPool) startWorkers(count int, firstTasks []func()) { // Increment worker count - atomic.AddInt32(&p.workerCount, int32(count)) + workerCount := atomic.AddInt32(&p.workerCount, int32(count)) + + // Collect debug information + if p.debug && int(workerCount) > p.maxWorkerCount { + p.maxWorkerCount = int(workerCount) + } // Increment waiting group semaphore p.waitGroup.Add(count) + // Launch workers for i := 0; i < count; i++ { - worker(p.dispatchedTasks, func() { + var firstTask func() = nil + if i < len(firstTasks) { + firstTask = firstTasks[i] + } + worker(firstTask, p.dispatchedTasks, &p.idleWorkerCount, &p.completedTaskCount, func() { // Decrement worker count atomic.AddInt32(&p.workerCount, -1) @@ -307,7 +445,7 @@ func (p *WorkerPool) Group() *TaskGroup { } // worker launches a worker goroutine -func worker(tasks chan func(), exitHandler func(), panicHandler func(interface{})) { +func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, completedTaskCount *uint64, exitHandler func(), panicHandler func(interface{})) { go func() { defer func() { @@ -316,21 +454,44 @@ func worker(tasks chan func(), exitHandler func(), panicHandler func(interface{} panicHandler(panic) // Restart goroutine - worker(tasks, exitHandler, panicHandler) + worker(nil, tasks, idleWorkerCount, completedTaskCount, exitHandler, panicHandler) } else { // Handle exit exitHandler() } }() + // We have received a task, execute it + func() { + // Increment idle count + defer atomic.AddInt32(idleWorkerCount, 1) + if firstTask != nil { + // Increment completed task count + defer atomic.AddUint64(completedTaskCount, 1) + + firstTask() + } + }() + for task := range tasks { if task == nil { // We have received a signal to quit return } + // Decrement idle count + atomic.AddInt32(idleWorkerCount, -1) + // We have received a task, execute it - task() + func() { + // Increment idle count + defer atomic.AddInt32(idleWorkerCount, 1) + + // Increment completed task count + defer atomic.AddUint64(completedTaskCount, 1) + + task() + }() } }() } diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index 34b8072..c4a7f51 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -313,7 +313,7 @@ func TestPoolWithCustomMinWorkers(t *testing.T) { func TestGroupSubmit(t *testing.T) { - pool := pond.New(5, 5) + pool := pond.New(5, 25) assertEqual(t, 0, pool.Running()) // Submit groups of tasks diff --git a/pond_test.go b/pond_test.go index cc6c747..e56aca2 100644 --- a/pond_test.go +++ b/pond_test.go @@ -29,10 +29,3 @@ func TestNewWithInconsistentOptions(t *testing.T) { assertEqual(t, 1, pool.minWorkers) assertEqual(t, defaultIdleTimeout, pool.idleTimeout) } - -func TestLinearGrowthFn(t *testing.T) { - - assertEqual(t, 1, linearGrowthFn(0, 1, 1)) - assertEqual(t, 1, linearGrowthFn(0, 1, 2)) - assertEqual(t, 0, linearGrowthFn(3, 1, 3)) -} diff --git a/resizer.go b/resizer.go new file mode 100644 index 0000000..74fe870 --- /dev/null +++ b/resizer.go @@ -0,0 +1,162 @@ +package pond + +import ( + "container/ring" + "math" + "time" +) + +// Preset pool resizing strategies +var ( + // Eager maximizes responsiveness at the expense of higher resource usage, + // which can reduce throughput under certain conditions. + // This strategy is meant for worker pools that will operate at a small percentage of their capacity + // most of the time and may occasionally receive bursts of tasks. + Eager = DynamicResizer(1, 0.01) + // Balanced tries to find a balance between responsiveness and throughput. + // It's the default strategy and it's suitable for general purpose worker pools or those + // that will operate close to 50% of their capacity most of the time. + Balanced = DynamicResizer(3, 0.01) + // Lazy maximizes throughput at the expense of responsiveness. + // This strategy is meant for worker pools that will operate close to their max. capacity most of the time. + Lazy = DynamicResizer(5, 0.01) +) + +// dynamicResizer implements a configurable dynamic resizing strategy +type dynamicResizer struct { + windowSize int + tolerance float64 + incomingTasks *ring.Ring + completedTasks *ring.Ring + duration *ring.Ring +} + +// DynamicResizer creates a dynamic resizing strategy that gradually increases or decreases +// the size of the pool to match the rate of incoming tasks (input rate) with the rate of +// completed tasks (output rate). +// windowSize: determines how many cycles to consider when calculating input and output rates. +// tolerance: defines a percentage (between 0 and 1) +func DynamicResizer(windowSize int, tolerance float64) ResizingStrategy { + + if windowSize < 1 { + windowSize = 1 + } + if tolerance < 0 { + tolerance = 0 + } + + dynamicResizer := &dynamicResizer{ + windowSize: windowSize, + tolerance: tolerance, + } + dynamicResizer.reset() + return dynamicResizer +} + +func (r *dynamicResizer) reset() { + // Create rings + r.incomingTasks = ring.New(r.windowSize) + r.completedTasks = ring.New(r.windowSize) + r.duration = ring.New(r.windowSize) + + // Initialize with 0s + for i := 0; i < r.windowSize; i++ { + r.incomingTasks.Value = 0 + r.completedTasks.Value = 0 + r.duration.Value = 0 * time.Second + r.incomingTasks = r.incomingTasks.Next() + r.completedTasks = r.completedTasks.Next() + r.duration = r.duration.Next() + } +} + +func (r *dynamicResizer) totalIncomingTasks() int { + var valueSum int = 0 + r.incomingTasks.Do(func(value interface{}) { + valueSum += value.(int) + }) + return valueSum +} + +func (r *dynamicResizer) totalCompletedTasks() int { + var valueSum int = 0 + r.completedTasks.Do(func(value interface{}) { + valueSum += value.(int) + }) + return valueSum +} + +func (r *dynamicResizer) totalDuration() time.Duration { + var valueSum time.Duration = 0 + r.duration.Do(func(value interface{}) { + valueSum += value.(time.Duration) + }) + return valueSum +} + +func (r *dynamicResizer) push(incomingTasks int, completedTasks int, duration time.Duration) { + r.incomingTasks.Value = incomingTasks + r.completedTasks.Value = completedTasks + r.duration.Value = duration + r.incomingTasks = r.incomingTasks.Next() + r.completedTasks = r.completedTasks.Next() + r.duration = r.duration.Next() +} + +func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, duration time.Duration) int { + + r.push(incomingTasks, completedTasks, duration) + + windowIncomingTasks := r.totalIncomingTasks() + windowCompletedTasks := r.totalCompletedTasks() + windowSecs := r.totalDuration().Seconds() + windowInputRate := float64(windowIncomingTasks) / windowSecs + windowOutputRate := float64(windowCompletedTasks) / windowSecs + + if runningWorkers == 0 || windowCompletedTasks == 0 { + // No workers yet, create as many workers ar.incomingTasks-idleWorkers + delta := incomingTasks - idleWorkers + return r.fitDelta(delta, runningWorkers, minWorkers, maxWorkers) + } + + deltaRate := windowInputRate - windowOutputRate + + // No changes, do not resize + if deltaRate == 0 { + return 0 + } + + // If delta % is below the defined tolerance, do not resize + + if r.tolerance > 0 { + deltaPercentage := math.Abs(deltaRate / windowInputRate) + if deltaPercentage < r.tolerance { + return 0 + } + } + + if deltaRate > 0 { + // Need to grow the pool + workerRate := windowOutputRate / float64(runningWorkers) + ratio := windowSecs / float64(r.windowSize) + delta := int(ratio*(deltaRate/workerRate)) - idleWorkers + if deltaRate > 0 && delta < 1 { + delta = 1 + } + return r.fitDelta(delta, runningWorkers, minWorkers, maxWorkers) + } else if deltaRate < 0 && idleWorkers > 0 { + // Need to shrink the pool + return r.fitDelta(-1, runningWorkers, minWorkers, maxWorkers) + } + return 0 +} + +func (r *dynamicResizer) fitDelta(delta, current, min, max int) int { + if current+delta < min { + delta = -(current - min) + } + if current+delta > max { + delta = max - current + } + return delta +} diff --git a/resizer_test.go b/resizer_test.go new file mode 100644 index 0000000..c2d7901 --- /dev/null +++ b/resizer_test.go @@ -0,0 +1,40 @@ +package pond + +import ( + "testing" + "time" +) + +func TestResize(t *testing.T) { + + resizer := DynamicResizer(3, 0.1) + + // First resize should grow the pool proportionally + assertEqual(t, 10, resizer.Resize(0, 0, 1, 100, 10, 0, 1*time.Second)) + + // Now the input rate grows but below the tolerance (10%) + assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 1, 10, 1*time.Second)) + + // Now the input rate grows more + assertEqual(t, 90, resizer.Resize(10, 10, 1, 100, 100000, 11, 1*time.Second)) + + // Now there's no new tasks for 3 cycles + assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second)) + assertEqual(t, -1, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second)) + assertEqual(t, 0, resizer.Resize(1, 1, 1, 100, 0, 100011, 10*time.Second)) +} + +func TestEagerPool(t *testing.T) { + pool := New(100, 1000, Strategy(Eager)) + pool.debug = true + + for i := 0; i < 100; i++ { + pool.Submit(func() { + time.Sleep(1 * time.Millisecond) + }) + } + + pool.StopAndWait() + + assertEqual(t, 100, pool.maxWorkerCount) +}