@@ -30,7 +30,7 @@ Some common scenarios include:
|
||||
- Stopping a worker pool
|
||||
- Task panics are handled gracefully (configurable panic handler)
|
||||
- Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
|
||||
- Very high performance under heavy workloads (See [benchmarks](#benchmarks))
|
||||
- Very high performance under heavy workloads (See [benchmarks](./benchmark/README.md))
|
||||
- **New (since v1.3.0)**: configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
|
||||
- [API reference](https://pkg.go.dev/github.com/alitto/pond)
|
||||
|
||||
@@ -158,70 +158,31 @@ panicHandler := func(p interface{}) {
|
||||
pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))
|
||||
```
|
||||
- **Strategy**: Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the `pond.ResizingStrategy` interface or choose one of the 3 presets:
|
||||
- **Eager**: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks.
|
||||
- **Balanced**: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time. This is the default strategy.
|
||||
- **Eager**: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks. This is the default strategy.
|
||||
- **Balanced**: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time.
|
||||
- **Lazy**: maximizes throughput at the expense of responsiveness. This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
|
||||
``` go
|
||||
// Example: create pools with different resizing strategies
|
||||
eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager))
|
||||
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced))
|
||||
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy))
|
||||
eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager()))
|
||||
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
|
||||
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))
|
||||
```
|
||||
|
||||
### Resizing strategies
|
||||
|
||||
The following chart illustrates the behaviour of the different pool resizing strategies as the number of submitted tasks increases. Each line represents the number of worker goroutines in the pool (pool size) and the x-axis reflects the number of submitted tasks (cumulative).
|
||||
|
||||

|
||||
|
||||
As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs ([GOMAXPROCS](https://golang.org/pkg/runtime/#GOMAXPROCS)). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.
|
||||
|
||||
## API Reference
|
||||
|
||||
Full API reference is available at https://pkg.go.dev/github.com/alitto/pond
|
||||
|
||||
## Benchmarks
|
||||
|
||||
We ran a few [benchmarks](benchmark/benchmark_test.go) to see how _pond_'s performance compares against some of the most popular worker pool libraries available for Go ([ants](https://github.com/panjf2000/ants/) and [gammazero's workerpool](https://github.com/gammazero/workerpool)), as well as just launching unbounded goroutines and manually creating a goroutine worker pool (inspired by [gobyexample.com](https://gobyexample.com/worker-pools)), using either a buffered or an unbuffered channel to dispatch tasks.
|
||||
|
||||
The test consists of submitting 3 different workloads to each worker pool:
|
||||
- *1M-10ms*: 1 million tasks that sleep for 10 milliseconds (`time.Sleep(10*time.Millisecond)`)
|
||||
- *100k-500ms*: 100 thousand tasks that sleep for 500 milliseconds (`time.Sleep(500*time.Millisecond)`)
|
||||
- *10k-1000ms*: 10 thousand tasks that sleep for 1 second (`time.Sleep(1*time.Second)`)
|
||||
|
||||
All pools are configured to use a maximum of 200k workers and initialization times are taken into account.
|
||||
|
||||
Here are the results:
|
||||
|
||||
```bash
|
||||
goos: linux
|
||||
goarch: amd64
|
||||
pkg: github.com/alitto/pond/benchmark
|
||||
BenchmarkAll/1M-10ms/Pond-Eager-8 2 620347142 ns/op 82768720 B/op 1086686 allocs/op
|
||||
BenchmarkAll/1M-10ms/Pond-Balanced-8 2 578973910 ns/op 81339088 B/op 1083203 allocs/op
|
||||
BenchmarkAll/1M-10ms/Pond-Lazy-8 2 613344573 ns/op 84347248 B/op 1084987 allocs/op
|
||||
BenchmarkAll/1M-10ms/Goroutines-8 2 540765682 ns/op 98457168 B/op 1060433 allocs/op
|
||||
BenchmarkAll/1M-10ms/GoroutinePool-8 1 1157705614 ns/op 68137088 B/op 1409763 allocs/op
|
||||
BenchmarkAll/1M-10ms/BufferedPool-8 1 1158068370 ns/op 76426272 B/op 1412739 allocs/op
|
||||
BenchmarkAll/1M-10ms/Gammazero-8 1 1330312458 ns/op 34524328 B/op 1029692 allocs/op
|
||||
BenchmarkAll/1M-10ms/AntsPool-8 2 724231628 ns/op 37870404 B/op 1077297 allocs/op
|
||||
BenchmarkAll/100k-500ms/Pond-Eager-8 2 604180003 ns/op 31523028 B/op 349877 allocs/op
|
||||
BenchmarkAll/100k-500ms/Pond-Balanced-8 1 1060079592 ns/op 35520416 B/op 398779 allocs/op
|
||||
BenchmarkAll/100k-500ms/Pond-Lazy-8 1 1053705909 ns/op 35040512 B/op 392696 allocs/op
|
||||
BenchmarkAll/100k-500ms/Goroutines-8 2 551869174 ns/op 8000016 B/op 100001 allocs/op
|
||||
BenchmarkAll/100k-500ms/GoroutinePool-8 2 635442074 ns/op 20764560 B/op 299632 allocs/op
|
||||
BenchmarkAll/100k-500ms/BufferedPool-8 2 641683384 ns/op 21647840 B/op 299661 allocs/op
|
||||
BenchmarkAll/100k-500ms/Gammazero-8 2 667449574 ns/op 16241864 B/op 249664 allocs/op
|
||||
BenchmarkAll/100k-500ms/AntsPool-8 2 659853037 ns/op 37300372 B/op 549784 allocs/op
|
||||
BenchmarkAll/10k-1000ms/Pond-Eager-8 1 1014320653 ns/op 12135080 B/op 39692 allocs/op
|
||||
BenchmarkAll/10k-1000ms/Pond-Balanced-8 1 1015979207 ns/op 12083704 B/op 39518 allocs/op
|
||||
BenchmarkAll/10k-1000ms/Pond-Lazy-8 1 1036374161 ns/op 12046632 B/op 39366 allocs/op
|
||||
BenchmarkAll/10k-1000ms/Goroutines-8 1 1007837894 ns/op 800016 B/op 10001 allocs/op
|
||||
BenchmarkAll/10k-1000ms/GoroutinePool-8 1 1149536612 ns/op 21393024 B/op 222458 allocs/op
|
||||
BenchmarkAll/10k-1000ms/BufferedPool-8 1 1127286218 ns/op 20343584 B/op 219359 allocs/op
|
||||
BenchmarkAll/10k-1000ms/Gammazero-8 1 1023249222 ns/op 2019688 B/op 29374 allocs/op
|
||||
BenchmarkAll/10k-1000ms/AntsPool-8 1 1016280850 ns/op 4155904 B/op 59487 allocs/op
|
||||
PASS
|
||||
ok github.com/alitto/pond/benchmark 37.331s
|
||||
```
|
||||
|
||||
As you can see, _pond_'s resizing strategies (Eager, Balanced or Lazy) behave differently under different workloads and generally one of them outperforms the other worker pool implementations, except for launching unbounded goroutines.
|
||||
|
||||
Leaving aside the fact that launching unlimited goroutines defeats the goal of limiting concurrency over a resource, its performance is highly dependant on how much resources (CPU and memory) are available at a given time, which make it unpredictable and likely to cause starvation. In other words, it's generally not a good idea for production applications.
|
||||
|
||||
These tests were executed on a laptop with an 8-core CPU (Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz) and 16GB of RAM.
|
||||
See [Benchmarks](./benchmark/README.md).
|
||||
|
||||
## Resources
|
||||
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
# pond - Benchmarks
|
||||
|
||||
We ran a few [benchmarks](./benchmark_test.go) to see how _pond_'s performance compares against some of the most popular worker pool libraries available for Go ([ants](https://github.com/panjf2000/ants/) and [gammazero's workerpool](https://github.com/gammazero/workerpool)), as well as just launching unbounded goroutines or manually creating a goroutine worker pool (inspired by [gobyexample.com](https://gobyexample.com/worker-pools)), using either a buffered or an unbuffered channel to dispatch tasks.
|
||||
|
||||
The test consists of simulating 5 different workload scenarios:
|
||||
- *1u-10Mt*: 1 user (goroutine) submitting 1 million tasks
|
||||
- *100u-10Kt*: 100 users submitting 10 thousand tasks each
|
||||
- *1Ku-1Kt*: 1000 users submitting 1000 tasks each (with a 10ms interval between tasks)
|
||||
- *10Ku-100t*: 10 thousand users submitting 100 tasks each (with a 10ms interval interval between tasks)
|
||||
- *1Mu-1t*: 1 million users submitting 1 task each
|
||||
|
||||
All pools are configured to use a maximum of 200k workers and initialization times are taken into account.
|
||||
|
||||
Here are the results of the benchmark when submitting an asynchronous task that just sleeps for 10ms (`time.Sleep(10 * time.Millisecond)`):
|
||||
|
||||
```bash
|
||||
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllSleep.*)$' -benchtime=3x -cpu=8
|
||||
goos: linux
|
||||
goarch: amd64
|
||||
pkg: github.com/alitto/pond/benchmark
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Eager-8 3 658233743 ns/op 13177872 B/op 83966 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Balanced-8 3 542151290 ns/op 12843525 B/op 80628 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Lazy-8 3 453957719 ns/op 13619120 B/op 88093 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Goroutines-8 3 452322853 ns/op 81737360 B/op 1009514 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/GoroutinePool-8 3 942987920 ns/op 22749680 B/op 270289 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/BufferedPool-8 3 971651135 ns/op 31039664 B/op 273297 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Gammazero-8 3 1165440763 ns/op 2003037 B/op 24369 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/AntsPool-8 3 640480375 ns/op 6467578 B/op 85681 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Eager-8 3 674275336 ns/op 34262096 B/op 420257 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Balanced-8 3 668055373 ns/op 26164368 B/op 325129 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Lazy-8 3 570578768 ns/op 19205968 B/op 200380 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Goroutines-8 3 566593280 ns/op 100025210 B/op 1049624 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/GoroutinePool-8 3 1037432189 ns/op 22381360 B/op 266476 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/BufferedPool-8 3 901981176 ns/op 30382512 B/op 266452 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Gammazero-8 3 1008801867 ns/op 2371378 B/op 28852 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/AntsPool-8 3 632685132 ns/op 12738146 B/op 170562 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Eager-8 3 860672256 ns/op 33584048 B/op 466470 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Balanced-8 3 605668097 ns/op 33586224 B/op 466493 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Lazy-8 3 462462043 ns/op 12380016 B/op 79389 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Goroutines-8 3 516023441 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/GoroutinePool-8 3 1059879992 ns/op 22381552 B/op 266478 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/BufferedPool-8 3 990989556 ns/op 30387472 B/op 266503 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Gammazero-8 3 1062265159 ns/op 2400296 B/op 29203 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/AntsPool-8 3 668318364 ns/op 9868850 B/op 131752 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Eager-8 3 532311024 ns/op 13046192 B/op 91585 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Balanced-8 3 524989416 ns/op 10928912 B/op 53280 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Lazy-8 3 724182859 ns/op 10228752 B/op 40577 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Goroutines-8 3 459821171 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/GoroutinePool-8 3 1007354405 ns/op 22378448 B/op 266446 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/BufferedPool-8 3 924880667 ns/op 30383568 B/op 266463 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Gammazero-8 3 1302254736 ns/op 2091101 B/op 25209 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/AntsPool-8 3 735473470 ns/op 11536946 B/op 150942 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Eager-8 3 829461779 ns/op 10932880 B/op 53259 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Balanced-8 3 794574849 ns/op 10551632 B/op 46156 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Lazy-8 3 685584019 ns/op 10807088 B/op 50711 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Goroutines-8 3 422483752 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/GoroutinePool-8 3 1138926533 ns/op 22380240 B/op 266464 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/BufferedPool-8 3 1139537728 ns/op 30385872 B/op 266487 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Gammazero-8 3 1580322536 ns/op 36548029 B/op 334860 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/AntsPool-8 3 1005205084 ns/op 34527010 B/op 455767 allocs/op
|
||||
PASS
|
||||
ok github.com/alitto/pond/benchmark 138.009s
|
||||
```
|
||||
|
||||
And these are the results of the benchmark when submitting a synchronous task that just calculates a random float64 number between 0 and 1 (`rand.Float64()`):
|
||||
|
||||
```bash
|
||||
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllRand.*)$' -benchtime=3x -cpu=8
|
||||
goos: linux
|
||||
goarch: amd64
|
||||
pkg: github.com/alitto/pond/benchmark
|
||||
BenchmarkAllRandFloat64/1u-10Mt/Pond-Eager-8 3 221147440 ns/op 8024589 B/op 156 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/Pond-Balanced-8 3 206112267 ns/op 8014616 B/op 96 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/Pond-Lazy-8 3 228397720 ns/op 8014010 B/op 120 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/Goroutines-8 3 255098957 ns/op 173648 B/op 602 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/GoroutinePool-8 3 608808567 ns/op 6381850 B/op 66476 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/BufferedPool-8 3 646646774 ns/op 14388720 B/op 66516 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/Gammazero-8 3 556528977 ns/op 765 B/op 14 allocs/op
|
||||
BenchmarkAllRandFloat64/1u-10Mt/AntsPool-8 3 511636669 ns/op 1884922 B/op 25377 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/Pond-Eager-8 3 240239174 ns/op 8056522 B/op 1514 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/Pond-Balanced-8 3 244726370 ns/op 8009328 B/op 293 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/Pond-Lazy-8 3 302086894 ns/op 8322613 B/op 5889 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/Goroutines-8 3 426853026 ns/op 20887664 B/op 46804 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/GoroutinePool-8 3 713127672 ns/op 6380848 B/op 66471 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/BufferedPool-8 3 595125950 ns/op 14385456 B/op 66482 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/Gammazero-8 3 977617485 ns/op 13714 B/op 147 allocs/op
|
||||
BenchmarkAllRandFloat64/100u-10Kt/AntsPool-8 3 474116870 ns/op 122397 B/op 1797 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/Pond-Eager-8 3 338964329 ns/op 15464181 B/op 133763 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/Pond-Balanced-8 3 289530482 ns/op 8005776 B/op 110 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/Pond-Lazy-8 3 294471089 ns/op 8016704 B/op 419 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/Goroutines-8 3 490915673 ns/op 33891216 B/op 79199 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/GoroutinePool-8 3 732660817 ns/op 6405008 B/op 66722 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/BufferedPool-8 3 612328640 ns/op 14381232 B/op 66438 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/Gammazero-8 3 973129161 ns/op 78610 B/op 823 allocs/op
|
||||
BenchmarkAllRandFloat64/1Ku-1Kt/AntsPool-8 3 498528201 ns/op 458680 B/op 6466 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/Pond-Eager-8 3 292830436 ns/op 8006517 B/op 145 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/Pond-Balanced-8 3 290014999 ns/op 8008160 B/op 230 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/Pond-Lazy-8 3 288724307 ns/op 8004794 B/op 45 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/Goroutines-8 3 359906263 ns/op 6014704 B/op 62654 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/GoroutinePool-8 3 727553640 ns/op 6463184 B/op 67328 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/BufferedPool-8 3 753007382 ns/op 14385616 B/op 66484 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/Gammazero-8 3 1101229554 ns/op 400072 B/op 4172 allocs/op
|
||||
BenchmarkAllRandFloat64/10Ku-100t/AntsPool-8 3 559372453 ns/op 3569413 B/op 48919 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/Pond-Eager-8 3 571780271 ns/op 8006266 B/op 104 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/Pond-Balanced-8 3 593644124 ns/op 8005317 B/op 76 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/Pond-Lazy-8 3 556206022 ns/op 8006197 B/op 83 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/Goroutines-8 3 327126810 ns/op 784 B/op 9 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/GoroutinePool-8 3 833617738 ns/op 6382672 B/op 66490 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/BufferedPool-8 3 823267182 ns/op 14384528 B/op 66473 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/Gammazero-8 3 1494572327 ns/op 44507208 B/op 333672 allocs/op
|
||||
BenchmarkAllRandFloat64/1Mu-1t/AntsPool-8 3 745656046 ns/op 13846717 B/op 187657 allocs/op
|
||||
PASS
|
||||
ok github.com/alitto/pond/benchmark 93.386s
|
||||
```
|
||||
|
||||
As you can see, _pond_'s resizing strategies (Eager, Balanced or Lazy) behave differently under different workloads and generally one or more of these strategies outperform all the other worker pool implementations, including unbounded goroutines under some specific circumstances.
|
||||
|
||||
When running this benchmark with fewer available CPUs, the difference becomes even more significant. For instance, when using only 4 CPUs, _pond_ consistently outperforms launching unbounded goroutines.
|
||||
|
||||
Here are the results when using 4 CPUs and submitting the asynchronous task:
|
||||
|
||||
```bash
|
||||
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllSleep.*)$' -benchtime=3x -cpu=4
|
||||
goos: linux
|
||||
goarch: amd64
|
||||
pkg: github.com/alitto/pond/benchmark
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Eager-4 3 638667560 ns/op 15940784 B/op 121808 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Balanced-4 3 509435739 ns/op 14681840 B/op 104546 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Pond-Lazy-4 3 487545047 ns/op 12332784 B/op 72090 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Goroutines-4 3 622864989 ns/op 81063184 B/op 1005595 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/GoroutinePool-4 3 993853972 ns/op 23036848 B/op 273264 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/BufferedPool-4 3 1021714262 ns/op 30799696 B/op 270772 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/Gammazero-4 3 1130426539 ns/op 2032146 B/op 24702 allocs/op
|
||||
BenchmarkAllSleep10ms/1u-10Mt/AntsPool-4 3 665803493 ns/op 5754085 B/op 76788 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Eager-4 3 818847588 ns/op 35801530 B/op 414175 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Balanced-4 3 723634296 ns/op 33384848 B/op 419312 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Pond-Lazy-4 3 626540143 ns/op 21113296 B/op 233346 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Goroutines-4 3 695757011 ns/op 110921904 B/op 1071440 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/GoroutinePool-4 3 1099938785 ns/op 22391664 B/op 266583 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/BufferedPool-4 3 933443662 ns/op 30395696 B/op 266589 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/Gammazero-4 3 1082238814 ns/op 2753992 B/op 33478 allocs/op
|
||||
BenchmarkAllSleep10ms/100u-10Kt/AntsPool-4 3 753962177 ns/op 11095693 B/op 145928 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Eager-4 3 873225582 ns/op 33593424 B/op 466568 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Balanced-4 3 706629572 ns/op 32765488 B/op 449318 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Pond-Lazy-4 3 522098721 ns/op 15020368 B/op 126808 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Goroutines-4 3 648364814 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/GoroutinePool-4 3 1180177588 ns/op 22390960 B/op 266576 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/BufferedPool-4 3 1025670406 ns/op 30396112 B/op 266593 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/Gammazero-4 3 1129172776 ns/op 3025912 B/op 36777 allocs/op
|
||||
BenchmarkAllSleep10ms/1Ku-1Kt/AntsPool-4 3 758293957 ns/op 7629184 B/op 102991 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Eager-4 3 645076503 ns/op 16746768 B/op 159207 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Balanced-4 3 554829418 ns/op 13056176 B/op 91944 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Pond-Lazy-4 3 514715338 ns/op 11595344 B/op 65146 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Goroutines-4 3 618594640 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/GoroutinePool-4 3 1009094159 ns/op 22391344 B/op 266580 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/BufferedPool-4 3 888272088 ns/op 30393360 B/op 266565 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/Gammazero-4 3 1264327736 ns/op 2760306 B/op 33330 allocs/op
|
||||
BenchmarkAllSleep10ms/10Ku-100t/AntsPool-4 3 825426536 ns/op 12383949 B/op 166217 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Eager-4 3 1133354687 ns/op 10757456 B/op 50026 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Balanced-4 3 1040441038 ns/op 10220784 B/op 40357 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Pond-Lazy-4 3 1015159406 ns/op 10172880 B/op 39428 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Goroutines-4 3 687813489 ns/op 80000048 B/op 1000002 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/GoroutinePool-4 3 1196809101 ns/op 30276560 B/op 287129 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/BufferedPool-4 3 1258868445 ns/op 30664016 B/op 267309 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/Gammazero-4 3 1620357479 ns/op 33538610 B/op 345372 allocs/op
|
||||
BenchmarkAllSleep10ms/1Mu-1t/AntsPool-4 3 1123553898 ns/op 30075709 B/op 391551 allocs/op
|
||||
PASS
|
||||
ok github.com/alitto/pond/benchmark 152.726s
|
||||
```
|
||||
|
||||
These tests were executed on a laptop with an 8-core CPU (Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz) and 16GB of RAM.
|
||||
+200
-156
@@ -2,6 +2,7 @@ package benchmark
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -11,193 +12,236 @@ import (
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
type subject struct {
|
||||
name string
|
||||
factory poolFactory
|
||||
}
|
||||
|
||||
type poolSubmit func(func())
|
||||
type poolTeardown func()
|
||||
type poolFactory func() (poolSubmit, poolTeardown)
|
||||
|
||||
type workload struct {
|
||||
name string
|
||||
userCount int
|
||||
taskCount int
|
||||
taskDuration time.Duration
|
||||
taskInterval time.Duration
|
||||
task func()
|
||||
}
|
||||
|
||||
type subject struct {
|
||||
name string
|
||||
test poolTest
|
||||
config poolConfig
|
||||
}
|
||||
var maxWorkers = 200000
|
||||
|
||||
type poolConfig struct {
|
||||
minWorkers int
|
||||
maxWorkers int
|
||||
maxCapacity int
|
||||
strategy pond.ResizingStrategy
|
||||
}
|
||||
|
||||
type poolTest func(taskCount int, taskFunc func(), config poolConfig)
|
||||
|
||||
var workloads = []workload{
|
||||
{"1M-10ms", 1000000, 10 * time.Millisecond},
|
||||
{"100k-500ms", 100000, 500 * time.Millisecond},
|
||||
{"10k-1000ms", 10000, 1000 * time.Millisecond},
|
||||
}
|
||||
|
||||
var defaultPoolConfig = poolConfig{
|
||||
maxWorkers: 200000,
|
||||
}
|
||||
var workloads = []workload{{
|
||||
name: "1u-10Mt",
|
||||
userCount: 1,
|
||||
taskCount: 1000000,
|
||||
taskInterval: 0,
|
||||
}, {
|
||||
name: "100u-10Kt",
|
||||
userCount: 100,
|
||||
taskCount: 10000,
|
||||
taskInterval: 0,
|
||||
}, {
|
||||
name: "1Ku-1Kt",
|
||||
userCount: 1000,
|
||||
taskCount: 1000,
|
||||
taskInterval: 0,
|
||||
}, {
|
||||
name: "10Ku-100t",
|
||||
userCount: 10000,
|
||||
taskCount: 100,
|
||||
taskInterval: 0,
|
||||
}, {
|
||||
name: "1Mu-1t",
|
||||
userCount: 1000000,
|
||||
taskCount: 1,
|
||||
taskInterval: 0,
|
||||
}}
|
||||
|
||||
var pondSubjects = []subject{
|
||||
{"Pond-Eager", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Eager}},
|
||||
{"Pond-Balanced", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Balanced}},
|
||||
{"Pond-Lazy", pondPool, poolConfig{maxWorkers: defaultPoolConfig.maxWorkers, maxCapacity: 1000000, strategy: pond.Lazy}},
|
||||
{
|
||||
name: "Pond-Eager",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
pool := pond.New(maxWorkers, 1000000, pond.Strategy(pond.Eager()))
|
||||
|
||||
return pool.Submit, pool.StopAndWait
|
||||
},
|
||||
}, {
|
||||
name: "Pond-Balanced",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
pool := pond.New(maxWorkers, 1000000, pond.Strategy(pond.Balanced()))
|
||||
|
||||
return pool.Submit, pool.StopAndWait
|
||||
},
|
||||
}, {
|
||||
name: "Pond-Lazy",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
pool := pond.New(maxWorkers, 1000000, pond.Strategy(pond.Lazy()))
|
||||
|
||||
return pool.Submit, pool.StopAndWait
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var otherSubjects = []subject{
|
||||
{"Goroutines", unboundedGoroutines, defaultPoolConfig},
|
||||
{"GoroutinePool", goroutinePool, defaultPoolConfig},
|
||||
{"BufferedPool", bufferedGoroutinePool, defaultPoolConfig},
|
||||
{"Gammazero", gammazeroWorkerpool, defaultPoolConfig},
|
||||
{"AntsPool", antsPool, defaultPoolConfig},
|
||||
{
|
||||
name: "Goroutines",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
submit := func(taskFunc func()) {
|
||||
go func() {
|
||||
taskFunc()
|
||||
}()
|
||||
}
|
||||
return submit, func() {}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "GoroutinePool",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
|
||||
var poolWg sync.WaitGroup
|
||||
taskChan := make(chan func())
|
||||
poolWg.Add(maxWorkers)
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
go func() {
|
||||
for task := range taskChan {
|
||||
task()
|
||||
}
|
||||
poolWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
submit := func(task func()) {
|
||||
taskChan <- task
|
||||
}
|
||||
teardown := func() {
|
||||
close(taskChan)
|
||||
poolWg.Wait()
|
||||
}
|
||||
|
||||
return submit, teardown
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "BufferedPool",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
|
||||
var poolWg sync.WaitGroup
|
||||
taskChan := make(chan func(), 1000000)
|
||||
poolWg.Add(maxWorkers)
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
go func() {
|
||||
for task := range taskChan {
|
||||
task()
|
||||
}
|
||||
poolWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
submit := func(task func()) {
|
||||
taskChan <- task
|
||||
}
|
||||
teardown := func() {
|
||||
close(taskChan)
|
||||
poolWg.Wait()
|
||||
}
|
||||
|
||||
return submit, teardown
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Gammazero",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
pool := workerpool.New(maxWorkers)
|
||||
return pool.Submit, pool.StopWait
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AntsPool",
|
||||
factory: func() (poolSubmit, poolTeardown) {
|
||||
pool, _ := ants.NewPool(maxWorkers, ants.WithExpiryDuration(10*time.Second))
|
||||
submit := func(task func()) {
|
||||
pool.Submit(task)
|
||||
}
|
||||
return submit, pool.Release
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func BenchmarkPond(b *testing.B) {
|
||||
runBenchmarks(b, workloads, pondSubjects)
|
||||
func BenchmarkPondSleep10ms(b *testing.B) {
|
||||
sleep10ms := func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
runBenchmarks(b, workloads, pondSubjects, sleep10ms)
|
||||
}
|
||||
|
||||
func BenchmarkAll(b *testing.B) {
|
||||
allSubjects := make([]subject, 0)
|
||||
allSubjects = append(allSubjects, pondSubjects...)
|
||||
allSubjects = append(allSubjects, otherSubjects...)
|
||||
runBenchmarks(b, workloads, allSubjects)
|
||||
func BenchmarkPondRandFloat64(b *testing.B) {
|
||||
randFloat64 := func() {
|
||||
rand.Float64()
|
||||
}
|
||||
runBenchmarks(b, workloads, pondSubjects, randFloat64)
|
||||
}
|
||||
|
||||
func runBenchmarks(b *testing.B, workloads []workload, subjects []subject) {
|
||||
func BenchmarkAllSleep10ms(b *testing.B) {
|
||||
subjects := make([]subject, 0)
|
||||
subjects = append(subjects, pondSubjects...)
|
||||
subjects = append(subjects, otherSubjects...)
|
||||
sleep10ms := func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
runBenchmarks(b, workloads, subjects, sleep10ms)
|
||||
}
|
||||
|
||||
func BenchmarkAllRandFloat64(b *testing.B) {
|
||||
subjects := make([]subject, 0)
|
||||
subjects = append(subjects, pondSubjects...)
|
||||
subjects = append(subjects, otherSubjects...)
|
||||
randFloat64 := func() {
|
||||
rand.Float64()
|
||||
}
|
||||
runBenchmarks(b, workloads, subjects, randFloat64)
|
||||
}
|
||||
|
||||
func runBenchmarks(b *testing.B, workloads []workload, subjects []subject, task func()) {
|
||||
for _, workload := range workloads {
|
||||
taskFunc := func() {
|
||||
time.Sleep(workload.taskDuration)
|
||||
}
|
||||
for _, subject := range subjects {
|
||||
name := fmt.Sprintf("%s/%s", workload.name, subject.name)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
testName := fmt.Sprintf("%s/%s", workload.name, subject.name)
|
||||
b.Run(testName, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
subject.test(workload.taskCount, taskFunc, subject.config)
|
||||
simulateWorkload(&workload, subject.factory, task)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pondPool(taskCount int, taskFunc func(), config poolConfig) {
|
||||
var wg sync.WaitGroup
|
||||
pool := pond.New(config.maxWorkers, config.maxCapacity,
|
||||
pond.MinWorkers(config.minWorkers),
|
||||
pond.Strategy(config.strategy))
|
||||
// Submit tasks
|
||||
wg.Add(taskCount)
|
||||
for n := 0; n < taskCount; n++ {
|
||||
pool.Submit(func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
pool.StopAndWait()
|
||||
}
|
||||
func simulateWorkload(workload *workload, poolFactoy poolFactory, task func()) {
|
||||
|
||||
func unboundedGoroutines(taskCount int, taskFunc func(), config poolConfig) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(taskCount)
|
||||
for i := 0; i < taskCount; i++ {
|
||||
go func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func goroutinePool(taskCount int, taskFunc func(), config poolConfig) {
|
||||
// Start worker goroutines
|
||||
var poolWg sync.WaitGroup
|
||||
taskChan := make(chan func())
|
||||
poolWg.Add(config.maxWorkers)
|
||||
for i := 0; i < config.maxWorkers; i++ {
|
||||
go func() {
|
||||
for task := range taskChan {
|
||||
task()
|
||||
}
|
||||
poolWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit tasks and wait for completion
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(taskCount)
|
||||
for i := 0; i < taskCount; i++ {
|
||||
taskChan <- func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
close(taskChan)
|
||||
wg.Wait()
|
||||
poolWg.Wait()
|
||||
}
|
||||
|
||||
func bufferedGoroutinePool(taskCount int, taskFunc func(), config poolConfig) {
|
||||
// Start worker goroutines
|
||||
var poolWg sync.WaitGroup
|
||||
taskChan := make(chan func(), taskCount)
|
||||
poolWg.Add(config.maxWorkers)
|
||||
for i := 0; i < config.maxWorkers; i++ {
|
||||
go func() {
|
||||
for task := range taskChan {
|
||||
task()
|
||||
}
|
||||
poolWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit tasks and wait for completion
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(taskCount)
|
||||
for i := 0; i < taskCount; i++ {
|
||||
taskChan <- func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
close(taskChan)
|
||||
wg.Wait()
|
||||
poolWg.Wait()
|
||||
}
|
||||
|
||||
func gammazeroWorkerpool(taskCount int, taskFunc func(), config poolConfig) {
|
||||
// Create pool
|
||||
wp := workerpool.New(config.maxWorkers)
|
||||
defer wp.StopWait()
|
||||
poolSubmit, poolTeardown := poolFactoy()
|
||||
|
||||
// Submit tasks and wait for completion
|
||||
// Spawn one goroutine per simulated user
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(taskCount)
|
||||
for i := 0; i < taskCount; i++ {
|
||||
wp.Submit(func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
})
|
||||
wg.Add(workload.userCount * workload.taskCount)
|
||||
|
||||
testFunc := func() {
|
||||
task()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
for i := 0; i < workload.userCount; i++ {
|
||||
go func() {
|
||||
// Every user submits tasksPerUser at the specified frequency
|
||||
for i := 0; i < workload.taskCount; i++ {
|
||||
poolSubmit(testFunc)
|
||||
if workload.taskInterval > 0 {
|
||||
time.Sleep(workload.taskInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func antsPool(taskCount int, taskFunc func(), config poolConfig) {
|
||||
// Create pool
|
||||
pool, _ := ants.NewPool(config.maxWorkers, ants.WithExpiryDuration(10*time.Second))
|
||||
defer pool.Release()
|
||||
|
||||
// Submit tasks and wait for completion
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(taskCount)
|
||||
for i := 0; i < taskCount; i++ {
|
||||
_ = pool.Submit(func() {
|
||||
taskFunc()
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
// Tear down
|
||||
poolTeardown()
|
||||
}
|
||||
|
||||
File diff suppressed because one or more lines are too long
|
After Width: | Height: | Size: 264 KiB |
@@ -2,7 +2,6 @@ package pond
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -22,7 +21,7 @@ func defaultPanicHandler(panic interface{}) {
|
||||
|
||||
// ResizingStrategy represents a pool resizing strategy
|
||||
type ResizingStrategy interface {
|
||||
Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, delta time.Duration) int
|
||||
Resize(runningWorkers, minWorkers, maxWorkers int) bool
|
||||
}
|
||||
|
||||
// Option represents an option that can be passed when instantiating a worker pool to customize it
|
||||
@@ -66,18 +65,13 @@ type WorkerPool struct {
|
||||
strategy ResizingStrategy
|
||||
panicHandler func(interface{})
|
||||
// Atomic counters
|
||||
workerCount int32
|
||||
idleWorkerCount int32
|
||||
completedTaskCount uint64
|
||||
workerCount int32
|
||||
idleWorkerCount int32
|
||||
// Private properties
|
||||
tasks chan func()
|
||||
dispatchedTasks chan func()
|
||||
purgerQuit chan struct{}
|
||||
stopOnce sync.Once
|
||||
waitGroup sync.WaitGroup
|
||||
// Debug information
|
||||
debug bool
|
||||
maxWorkerCount int
|
||||
tasks chan func()
|
||||
purgerQuit chan struct{}
|
||||
stopOnce sync.Once
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
||||
@@ -91,7 +85,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||
maxWorkers: maxWorkers,
|
||||
maxCapacity: maxCapacity,
|
||||
idleTimeout: defaultIdleTimeout,
|
||||
strategy: Balanced,
|
||||
strategy: Eager(),
|
||||
panicHandler: defaultPanicHandler,
|
||||
}
|
||||
|
||||
@@ -116,17 +110,8 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||
|
||||
// Create internal channels
|
||||
pool.tasks = make(chan func(), pool.maxCapacity)
|
||||
pool.dispatchedTasks = make(chan func(), pool.maxWorkers)
|
||||
pool.purgerQuit = make(chan struct{})
|
||||
|
||||
// Start dispatcher goroutine
|
||||
pool.waitGroup.Add(1)
|
||||
go func() {
|
||||
defer pool.waitGroup.Done()
|
||||
|
||||
pool.dispatch()
|
||||
}()
|
||||
|
||||
// Start purger goroutine
|
||||
pool.waitGroup.Add(1)
|
||||
go func() {
|
||||
@@ -137,7 +122,9 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||
|
||||
// Start minWorkers workers
|
||||
if pool.minWorkers > 0 {
|
||||
pool.startWorkers(pool.minWorkers, nil)
|
||||
for i := 0; i < pool.minWorkers; i++ {
|
||||
pool.startWorker(nil)
|
||||
}
|
||||
}
|
||||
|
||||
return pool
|
||||
@@ -154,14 +141,51 @@ func (p *WorkerPool) Idle() int {
|
||||
}
|
||||
|
||||
// Submit sends a task to this worker pool for execution. If the queue is full,
|
||||
// it will wait until the task can be enqueued
|
||||
// it will wait until the task is dispatched to a worker goroutine.
|
||||
func (p *WorkerPool) Submit(task func()) {
|
||||
p.submit(task, true)
|
||||
}
|
||||
|
||||
// TrySubmit attempts to send a task to this worker pool for execution. If the queue is full,
|
||||
// it will not wait for a worker to become idle. It returns true if it was able to dispatch
|
||||
// the task and false otherwise.
|
||||
func (p *WorkerPool) TrySubmit(task func()) bool {
|
||||
return p.submit(task, false)
|
||||
}
|
||||
|
||||
func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
|
||||
if task == nil {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// Submit the task to the task channel
|
||||
runningWorkerCount := p.Running()
|
||||
|
||||
// Attempt to dispatch to an idle worker without blocking
|
||||
if runningWorkerCount > 0 && p.Idle() > 0 {
|
||||
select {
|
||||
case p.tasks <- task:
|
||||
return true
|
||||
default:
|
||||
// No idle worker available, continue
|
||||
}
|
||||
}
|
||||
|
||||
maxWorkersReached := runningWorkerCount >= p.maxWorkers
|
||||
|
||||
// Exit if we have reached the max. number of workers and can't wait for an idle worker
|
||||
if maxWorkersReached && !waitForIdle {
|
||||
return false
|
||||
}
|
||||
|
||||
// Start a worker as long as we haven't reached the limit
|
||||
if !maxWorkersReached && p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) {
|
||||
p.startWorker(task)
|
||||
return true
|
||||
}
|
||||
|
||||
// Submit the task to the tasks channel and wait for it to be picked up by a worker
|
||||
p.tasks <- task
|
||||
return true
|
||||
}
|
||||
|
||||
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
|
||||
@@ -203,7 +227,7 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
||||
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||
func (p *WorkerPool) Stop() {
|
||||
p.stopOnce.Do(func() {
|
||||
// Send signal to stop the purger
|
||||
// Send the signal to stop the purger goroutine
|
||||
close(p.purgerQuit)
|
||||
})
|
||||
}
|
||||
@@ -216,225 +240,56 @@ func (p *WorkerPool) StopAndWait() {
|
||||
p.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// dispatch represents the work done by the dispatcher goroutine
|
||||
func (p *WorkerPool) dispatch() {
|
||||
// purge represents the work done by the purger goroutine
|
||||
func (p *WorkerPool) purge() {
|
||||
|
||||
batch := make([]func(), 0)
|
||||
batchSize := int(math.Max(float64(p.minWorkers), 1000))
|
||||
var lastCompletedTasks uint64 = 0
|
||||
var lastCycle time.Time = time.Now()
|
||||
idleTicker := time.NewTicker(p.idleTimeout)
|
||||
defer idleTicker.Stop()
|
||||
|
||||
for task := range p.tasks {
|
||||
|
||||
idleCount := p.Idle()
|
||||
dispatchedImmediately := 0
|
||||
|
||||
// Dispatch up to idleCount tasks without blocking
|
||||
nextTask := task
|
||||
ImmediateDispatch:
|
||||
for i := 0; i < idleCount; i++ {
|
||||
|
||||
// Attempt to dispatch
|
||||
select {
|
||||
case p.dispatchedTasks <- nextTask:
|
||||
dispatchedImmediately++
|
||||
default:
|
||||
break ImmediateDispatch
|
||||
}
|
||||
|
||||
// Attempt to receive another task
|
||||
select {
|
||||
case t, ok := <-p.tasks:
|
||||
if !ok {
|
||||
// Nothing to dispatch
|
||||
nextTask = nil
|
||||
break ImmediateDispatch
|
||||
}
|
||||
nextTask = t
|
||||
default:
|
||||
nextTask = nil
|
||||
break ImmediateDispatch
|
||||
Purge:
|
||||
for {
|
||||
select {
|
||||
// Timed out waiting for any activity to happen, attempt to kill an idle worker
|
||||
case <-idleTicker.C:
|
||||
if p.Idle() > 0 {
|
||||
p.tasks <- nil
|
||||
}
|
||||
case <-p.purgerQuit:
|
||||
break Purge
|
||||
}
|
||||
if nextTask == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Start batching tasks
|
||||
batch = append(batch, nextTask)
|
||||
|
||||
// Read up to batchSize tasks without blocking
|
||||
BulkReceive:
|
||||
for i := 0; i < batchSize-1; i++ {
|
||||
select {
|
||||
case t, ok := <-p.tasks:
|
||||
if !ok {
|
||||
break BulkReceive
|
||||
}
|
||||
if t != nil {
|
||||
batch = append(batch, t)
|
||||
}
|
||||
default:
|
||||
break BulkReceive
|
||||
}
|
||||
}
|
||||
|
||||
// Resize the pool
|
||||
now := time.Now()
|
||||
delta := now.Sub(lastCycle)
|
||||
workload := len(batch)
|
||||
runningCount := p.Running()
|
||||
lastCycle = now
|
||||
currentCompletedTasks := atomic.LoadUint64(&p.completedTaskCount)
|
||||
completedTasks := int(currentCompletedTasks - lastCompletedTasks)
|
||||
if completedTasks < 0 {
|
||||
completedTasks = 0
|
||||
}
|
||||
lastCompletedTasks = currentCompletedTasks
|
||||
targetDelta := p.calculatePoolSizeDelta(runningCount, idleCount, workload+dispatchedImmediately, completedTasks, delta)
|
||||
|
||||
// Start up to targetDelta workers
|
||||
dispatched := 0
|
||||
if targetDelta > 0 {
|
||||
p.startWorkers(targetDelta, batch)
|
||||
dispatched = workload
|
||||
if targetDelta < workload {
|
||||
dispatched = targetDelta
|
||||
}
|
||||
} else if targetDelta < 0 {
|
||||
// Kill targetDelta workers
|
||||
for i := 0; i < -targetDelta; i++ {
|
||||
p.dispatchedTasks <- nil
|
||||
}
|
||||
}
|
||||
|
||||
dispatchedBlocking := 0
|
||||
|
||||
if workload > dispatched {
|
||||
for _, task := range batch[dispatched:] {
|
||||
// Attempt to dispatch the task without blocking
|
||||
select {
|
||||
case p.dispatchedTasks <- task:
|
||||
default:
|
||||
// Block until a worker accepts this task
|
||||
p.dispatchedTasks <- task
|
||||
dispatchedBlocking++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust batch size
|
||||
if dispatchedBlocking > 0 {
|
||||
if batchSize > 1 {
|
||||
batchSize = 1
|
||||
}
|
||||
} else {
|
||||
maxBatchSize := runningCount + targetDelta
|
||||
batchSize = batchSize * 2
|
||||
if batchSize > maxBatchSize {
|
||||
batchSize = maxBatchSize
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch slice
|
||||
batch = nil
|
||||
}
|
||||
|
||||
// Send signal to stop all workers
|
||||
close(p.dispatchedTasks)
|
||||
}
|
||||
close(p.tasks)
|
||||
|
||||
// purge represents the work done by the purger goroutine
|
||||
func (p *WorkerPool) purge() {
|
||||
ticker := time.NewTicker(p.idleTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
// Timed out waiting for any activity to happen, attempt to resize the pool
|
||||
case <-ticker.C:
|
||||
if p.Idle() > 0 {
|
||||
select {
|
||||
case p.tasks <- nil:
|
||||
default:
|
||||
// If tasks channel is full, there's no need to resize the pool
|
||||
}
|
||||
}
|
||||
|
||||
// Received the signal to exit
|
||||
case <-p.purgerQuit:
|
||||
|
||||
// Close the tasks channel to prevent receiving new tasks
|
||||
close(p.tasks)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculatePoolSizeDelta calculates what's the delta to reach the ideal pool size based on the current size and workload
|
||||
func (p *WorkerPool) calculatePoolSizeDelta(runningWorkers, idleWorkers,
|
||||
incomingTasks, completedTasks int, duration time.Duration) int {
|
||||
|
||||
delta := p.strategy.Resize(runningWorkers, idleWorkers, p.minWorkers, p.maxWorkers,
|
||||
incomingTasks, completedTasks, duration)
|
||||
|
||||
targetSize := runningWorkers + delta
|
||||
|
||||
// Cannot go below minWorkers
|
||||
if targetSize < p.minWorkers {
|
||||
targetSize = p.minWorkers
|
||||
}
|
||||
// Cannot go above maxWorkers
|
||||
if targetSize > p.maxWorkers {
|
||||
targetSize = p.maxWorkers
|
||||
}
|
||||
|
||||
if p.debug {
|
||||
// Print debugging information
|
||||
durationSecs := duration.Seconds()
|
||||
inputRate := float64(incomingTasks) / durationSecs
|
||||
outputRate := float64(completedTasks) / durationSecs
|
||||
message := fmt.Sprintf("%d\t%d\t%d\t%d\t\"%f\"\t\"%f\"\t%d\t\"%f\"\n",
|
||||
runningWorkers, idleWorkers, incomingTasks, completedTasks,
|
||||
inputRate, outputRate,
|
||||
delta, durationSecs)
|
||||
fmt.Printf(message)
|
||||
}
|
||||
|
||||
return targetSize - runningWorkers
|
||||
}
|
||||
|
||||
// startWorkers creates new worker goroutines to run the given tasks
|
||||
func (p *WorkerPool) startWorkers(count int, firstTasks []func()) {
|
||||
func (p *WorkerPool) startWorker(firstTask func()) {
|
||||
|
||||
// Increment worker count
|
||||
workerCount := atomic.AddInt32(&p.workerCount, int32(count))
|
||||
p.incrementWorkerCount()
|
||||
|
||||
// Collect debug information
|
||||
if p.debug && int(workerCount) > p.maxWorkerCount {
|
||||
p.maxWorkerCount = int(workerCount)
|
||||
}
|
||||
// Launch worker
|
||||
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.panicHandler)
|
||||
}
|
||||
|
||||
func (p *WorkerPool) incrementWorkerCount() {
|
||||
|
||||
// Increment worker count
|
||||
atomic.AddInt32(&p.workerCount, 1)
|
||||
|
||||
// Increment waiting group semaphore
|
||||
p.waitGroup.Add(count)
|
||||
p.waitGroup.Add(1)
|
||||
}
|
||||
|
||||
// Launch workers
|
||||
for i := 0; i < count; i++ {
|
||||
var firstTask func() = nil
|
||||
if i < len(firstTasks) {
|
||||
firstTask = firstTasks[i]
|
||||
}
|
||||
worker(firstTask, p.dispatchedTasks, &p.idleWorkerCount, &p.completedTaskCount, func() {
|
||||
func (p *WorkerPool) decrementWorkerCount() {
|
||||
|
||||
// Decrement worker count
|
||||
atomic.AddInt32(&p.workerCount, -1)
|
||||
// Decrement worker count
|
||||
atomic.AddInt32(&p.workerCount, -1)
|
||||
|
||||
// Decrement waiting group semaphore
|
||||
p.waitGroup.Done()
|
||||
|
||||
}, p.panicHandler)
|
||||
}
|
||||
// Decrement waiting group semaphore
|
||||
p.waitGroup.Done()
|
||||
}
|
||||
|
||||
// Group creates a new task group
|
||||
@@ -445,55 +300,47 @@ func (p *WorkerPool) Group() *TaskGroup {
|
||||
}
|
||||
|
||||
// worker launches a worker goroutine
|
||||
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, completedTaskCount *uint64, exitHandler func(), panicHandler func(interface{})) {
|
||||
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), panicHandler func(interface{})) {
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if panic := recover(); panic != nil {
|
||||
// Handle panic
|
||||
panicHandler(panic)
|
||||
defer func() {
|
||||
|
||||
// Restart goroutine
|
||||
worker(nil, tasks, idleWorkerCount, completedTaskCount, exitHandler, panicHandler)
|
||||
} else {
|
||||
// Handle exit
|
||||
exitHandler()
|
||||
}
|
||||
}()
|
||||
if panic := recover(); panic != nil {
|
||||
// Handle panic
|
||||
panicHandler(panic)
|
||||
|
||||
// We have received a task, execute it
|
||||
func() {
|
||||
// Increment idle count
|
||||
defer atomic.AddInt32(idleWorkerCount, 1)
|
||||
if firstTask != nil {
|
||||
// Increment completed task count
|
||||
defer atomic.AddUint64(completedTaskCount, 1)
|
||||
|
||||
firstTask()
|
||||
}
|
||||
}()
|
||||
|
||||
for task := range tasks {
|
||||
if task == nil {
|
||||
// We have received a signal to quit
|
||||
return
|
||||
}
|
||||
// Restart goroutine
|
||||
go worker(nil, tasks, idleWorkerCount, exitHandler, panicHandler)
|
||||
} else {
|
||||
// Handle normal exit
|
||||
exitHandler()
|
||||
|
||||
// Decrement idle count
|
||||
atomic.AddInt32(idleWorkerCount, -1)
|
||||
|
||||
// We have received a task, execute it
|
||||
func() {
|
||||
// Increment idle count
|
||||
defer atomic.AddInt32(idleWorkerCount, 1)
|
||||
|
||||
// Increment completed task count
|
||||
defer atomic.AddUint64(completedTaskCount, 1)
|
||||
|
||||
task()
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
// We have received a task, execute it
|
||||
if firstTask != nil {
|
||||
firstTask()
|
||||
}
|
||||
// Increment idle count
|
||||
atomic.AddInt32(idleWorkerCount, 1)
|
||||
|
||||
for task := range tasks {
|
||||
if task == nil {
|
||||
// We have received a signal to quit
|
||||
return
|
||||
}
|
||||
|
||||
// Decrement idle count
|
||||
atomic.AddInt32(idleWorkerCount, -1)
|
||||
|
||||
// We have received a task, execute it
|
||||
task()
|
||||
|
||||
// Increment idle count
|
||||
atomic.AddInt32(idleWorkerCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// TaskGroup represents a group of related tasks
|
||||
|
||||
+52
-3
@@ -164,6 +164,32 @@ func TestSubmitBeforeWithNilTask(t *testing.T) {
|
||||
assertEqual(t, 0, pool.Running())
|
||||
}
|
||||
|
||||
func TestTrySubmit(t *testing.T) {
|
||||
|
||||
pool := pond.New(1, 5)
|
||||
|
||||
// Submit a long-running task
|
||||
var doneCount int32
|
||||
pool.Submit(func() {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
atomic.AddInt32(&doneCount, 1)
|
||||
})
|
||||
|
||||
// Attempt to submit a task without blocking
|
||||
dispatched := pool.TrySubmit(func() {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
atomic.AddInt32(&doneCount, 1)
|
||||
})
|
||||
|
||||
// Task was not dispatched because the pool was full
|
||||
assertEqual(t, false, dispatched)
|
||||
|
||||
pool.StopAndWait()
|
||||
|
||||
// Only the first task must have executed
|
||||
assertEqual(t, int32(1), atomic.LoadInt32(&doneCount))
|
||||
}
|
||||
|
||||
func TestRunning(t *testing.T) {
|
||||
|
||||
workerCount := 5
|
||||
@@ -238,7 +264,7 @@ func TestSubmitWithPanic(t *testing.T) {
|
||||
|
||||
func TestPoolWithCustomIdleTimeout(t *testing.T) {
|
||||
|
||||
pool := pond.New(1, 5, pond.IdleTimeout(2*time.Millisecond))
|
||||
pool := pond.New(1, 5, pond.IdleTimeout(1*time.Millisecond))
|
||||
|
||||
// Submit a task
|
||||
started := make(chan bool)
|
||||
@@ -258,8 +284,8 @@ func TestPoolWithCustomIdleTimeout(t *testing.T) {
|
||||
// Let the task complete
|
||||
completed <- true
|
||||
|
||||
// Wait for idle timeout + 1ms
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
// Wait for some time
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Worker should have been killed
|
||||
assertEqual(t, 0, pool.Running())
|
||||
@@ -338,3 +364,26 @@ func TestGroupSubmit(t *testing.T) {
|
||||
|
||||
assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount))
|
||||
}
|
||||
|
||||
func TestPoolWithCustomStrategy(t *testing.T) {
|
||||
|
||||
pool := pond.New(3, 3, pond.Strategy(pond.RatedResizer(2)))
|
||||
|
||||
// Submit 3 tasks
|
||||
group := pool.Group()
|
||||
for i := 0; i < 3; i++ {
|
||||
group.Submit(func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for them to complete
|
||||
group.Wait()
|
||||
|
||||
// 2 workers should have been started
|
||||
assertEqual(t, 2, pool.Running())
|
||||
|
||||
pool.StopAndWait()
|
||||
|
||||
assertEqual(t, 0, pool.Running())
|
||||
}
|
||||
|
||||
+27
-133
@@ -1,162 +1,56 @@
|
||||
package pond
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
"math"
|
||||
"time"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var maxProcs = runtime.GOMAXPROCS(0)
|
||||
|
||||
// Preset pool resizing strategies
|
||||
var (
|
||||
// Eager maximizes responsiveness at the expense of higher resource usage,
|
||||
// which can reduce throughput under certain conditions.
|
||||
// This strategy is meant for worker pools that will operate at a small percentage of their capacity
|
||||
// most of the time and may occasionally receive bursts of tasks.
|
||||
Eager = DynamicResizer(1, 0.01)
|
||||
// most of the time and may occasionally receive bursts of tasks. It's the default strategy.
|
||||
Eager = func() ResizingStrategy { return RatedResizer(1) }
|
||||
// Balanced tries to find a balance between responsiveness and throughput.
|
||||
// It's the default strategy and it's suitable for general purpose worker pools or those
|
||||
// It's suitable for general purpose worker pools or those
|
||||
// that will operate close to 50% of their capacity most of the time.
|
||||
Balanced = DynamicResizer(3, 0.01)
|
||||
Balanced = func() ResizingStrategy { return RatedResizer(maxProcs / 2) }
|
||||
// Lazy maximizes throughput at the expense of responsiveness.
|
||||
// This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
|
||||
Lazy = DynamicResizer(5, 0.01)
|
||||
Lazy = func() ResizingStrategy { return RatedResizer(maxProcs) }
|
||||
)
|
||||
|
||||
// dynamicResizer implements a configurable dynamic resizing strategy
|
||||
type dynamicResizer struct {
|
||||
windowSize int
|
||||
tolerance float64
|
||||
incomingTasks *ring.Ring
|
||||
completedTasks *ring.Ring
|
||||
duration *ring.Ring
|
||||
// ratedResizer implements a rated resizing strategy
|
||||
type ratedResizer struct {
|
||||
rate int
|
||||
hits int32
|
||||
}
|
||||
|
||||
// DynamicResizer creates a dynamic resizing strategy that gradually increases or decreases
|
||||
// the size of the pool to match the rate of incoming tasks (input rate) with the rate of
|
||||
// completed tasks (output rate).
|
||||
// windowSize: determines how many cycles to consider when calculating input and output rates.
|
||||
// tolerance: defines a percentage (between 0 and 1)
|
||||
func DynamicResizer(windowSize int, tolerance float64) ResizingStrategy {
|
||||
// RatedResizer creates a resizing strategy which can be configured
|
||||
// to create workers at a specific rate when the pool has no idle workers.
|
||||
// rate: determines the number of tasks to receive before creating an extra worker.
|
||||
// A value of 3 can be interpreted as: "Create a new worker every 3 tasks".
|
||||
func RatedResizer(rate int) ResizingStrategy {
|
||||
|
||||
if windowSize < 1 {
|
||||
windowSize = 1
|
||||
}
|
||||
if tolerance < 0 {
|
||||
tolerance = 0
|
||||
if rate < 1 {
|
||||
rate = 1
|
||||
}
|
||||
|
||||
dynamicResizer := &dynamicResizer{
|
||||
windowSize: windowSize,
|
||||
tolerance: tolerance,
|
||||
}
|
||||
dynamicResizer.reset()
|
||||
return dynamicResizer
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) reset() {
|
||||
// Create rings
|
||||
r.incomingTasks = ring.New(r.windowSize)
|
||||
r.completedTasks = ring.New(r.windowSize)
|
||||
r.duration = ring.New(r.windowSize)
|
||||
|
||||
// Initialize with 0s
|
||||
for i := 0; i < r.windowSize; i++ {
|
||||
r.incomingTasks.Value = 0
|
||||
r.completedTasks.Value = 0
|
||||
r.duration.Value = 0 * time.Second
|
||||
r.incomingTasks = r.incomingTasks.Next()
|
||||
r.completedTasks = r.completedTasks.Next()
|
||||
r.duration = r.duration.Next()
|
||||
return &ratedResizer{
|
||||
rate: rate,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) totalIncomingTasks() int {
|
||||
var valueSum int = 0
|
||||
r.incomingTasks.Do(func(value interface{}) {
|
||||
valueSum += value.(int)
|
||||
})
|
||||
return valueSum
|
||||
}
|
||||
func (r *ratedResizer) Resize(runningWorkers, minWorkers, maxWorkers int) bool {
|
||||
|
||||
func (r *dynamicResizer) totalCompletedTasks() int {
|
||||
var valueSum int = 0
|
||||
r.completedTasks.Do(func(value interface{}) {
|
||||
valueSum += value.(int)
|
||||
})
|
||||
return valueSum
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) totalDuration() time.Duration {
|
||||
var valueSum time.Duration = 0
|
||||
r.duration.Do(func(value interface{}) {
|
||||
valueSum += value.(time.Duration)
|
||||
})
|
||||
return valueSum
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) push(incomingTasks int, completedTasks int, duration time.Duration) {
|
||||
r.incomingTasks.Value = incomingTasks
|
||||
r.completedTasks.Value = completedTasks
|
||||
r.duration.Value = duration
|
||||
r.incomingTasks = r.incomingTasks.Next()
|
||||
r.completedTasks = r.completedTasks.Next()
|
||||
r.duration = r.duration.Next()
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) Resize(runningWorkers, idleWorkers, minWorkers, maxWorkers, incomingTasks, completedTasks int, duration time.Duration) int {
|
||||
|
||||
r.push(incomingTasks, completedTasks, duration)
|
||||
|
||||
windowIncomingTasks := r.totalIncomingTasks()
|
||||
windowCompletedTasks := r.totalCompletedTasks()
|
||||
windowSecs := r.totalDuration().Seconds()
|
||||
windowInputRate := float64(windowIncomingTasks) / windowSecs
|
||||
windowOutputRate := float64(windowCompletedTasks) / windowSecs
|
||||
|
||||
if runningWorkers == 0 || windowCompletedTasks == 0 {
|
||||
// No workers yet, create as many workers ar.incomingTasks-idleWorkers
|
||||
delta := incomingTasks - idleWorkers
|
||||
return r.fitDelta(delta, runningWorkers, minWorkers, maxWorkers)
|
||||
if r.rate == 1 {
|
||||
return true
|
||||
}
|
||||
|
||||
deltaRate := windowInputRate - windowOutputRate
|
||||
hits := int(atomic.AddInt32(&r.hits, 1))
|
||||
|
||||
// No changes, do not resize
|
||||
if deltaRate == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// If delta % is below the defined tolerance, do not resize
|
||||
|
||||
if r.tolerance > 0 {
|
||||
deltaPercentage := math.Abs(deltaRate / windowInputRate)
|
||||
if deltaPercentage < r.tolerance {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
if deltaRate > 0 {
|
||||
// Need to grow the pool
|
||||
workerRate := windowOutputRate / float64(runningWorkers)
|
||||
ratio := windowSecs / float64(r.windowSize)
|
||||
delta := int(ratio*(deltaRate/workerRate)) - idleWorkers
|
||||
if deltaRate > 0 && delta < 1 {
|
||||
delta = 1
|
||||
}
|
||||
return r.fitDelta(delta, runningWorkers, minWorkers, maxWorkers)
|
||||
} else if deltaRate < 0 && idleWorkers > 0 {
|
||||
// Need to shrink the pool
|
||||
return r.fitDelta(-1, runningWorkers, minWorkers, maxWorkers)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *dynamicResizer) fitDelta(delta, current, min, max int) int {
|
||||
if current+delta < min {
|
||||
delta = -(current - min)
|
||||
}
|
||||
if current+delta > max {
|
||||
delta = max - current
|
||||
}
|
||||
return delta
|
||||
return hits%r.rate == 1
|
||||
}
|
||||
|
||||
+30
-26
@@ -2,39 +2,43 @@ package pond
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestResize(t *testing.T) {
|
||||
func TestRatedResizer(t *testing.T) {
|
||||
|
||||
resizer := DynamicResizer(3, 0.1)
|
||||
resizer := RatedResizer(3)
|
||||
|
||||
// First resize should grow the pool proportionally
|
||||
assertEqual(t, 10, resizer.Resize(0, 0, 1, 100, 10, 0, 1*time.Second))
|
||||
|
||||
// Now the input rate grows but below the tolerance (10%)
|
||||
assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 1, 10, 1*time.Second))
|
||||
|
||||
// Now the input rate grows more
|
||||
assertEqual(t, 90, resizer.Resize(10, 10, 1, 100, 100000, 11, 1*time.Second))
|
||||
|
||||
// Now there's no new tasks for 3 cycles
|
||||
assertEqual(t, 0, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second))
|
||||
assertEqual(t, -1, resizer.Resize(10, 10, 1, 100, 0, 100011, 1*time.Second))
|
||||
assertEqual(t, 0, resizer.Resize(1, 1, 1, 100, 0, 100011, 10*time.Second))
|
||||
assertEqual(t, true, resizer.Resize(0, 0, 10))
|
||||
assertEqual(t, false, resizer.Resize(1, 0, 10))
|
||||
assertEqual(t, false, resizer.Resize(2, 0, 10))
|
||||
assertEqual(t, true, resizer.Resize(3, 0, 10))
|
||||
}
|
||||
|
||||
func TestEagerPool(t *testing.T) {
|
||||
pool := New(100, 1000, Strategy(Eager))
|
||||
pool.debug = true
|
||||
func TestRatedResizerWithRate1(t *testing.T) {
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
pool.Submit(func() {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
})
|
||||
}
|
||||
resizer := RatedResizer(1)
|
||||
|
||||
pool.StopAndWait()
|
||||
assertEqual(t, true, resizer.Resize(0, 0, 10))
|
||||
assertEqual(t, true, resizer.Resize(1, 0, 10))
|
||||
assertEqual(t, true, resizer.Resize(2, 0, 10))
|
||||
}
|
||||
|
||||
assertEqual(t, 100, pool.maxWorkerCount)
|
||||
func TestRatedResizerWithInvalidRate(t *testing.T) {
|
||||
|
||||
resizer := RatedResizer(0)
|
||||
|
||||
assertEqual(t, true, resizer.Resize(0, 0, 10))
|
||||
assertEqual(t, true, resizer.Resize(1, 0, 10))
|
||||
assertEqual(t, true, resizer.Resize(2, 0, 10))
|
||||
}
|
||||
|
||||
func TestPresetRatedResizers(t *testing.T) {
|
||||
|
||||
eager := Eager()
|
||||
balanced := Balanced()
|
||||
lazy := Lazy()
|
||||
|
||||
assertEqual(t, true, eager.Resize(0, 0, 10))
|
||||
assertEqual(t, true, balanced.Resize(0, 0, 10))
|
||||
assertEqual(t, true, lazy.Resize(0, 0, 10))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user