Merge pull request #24 from alitto/feature/1.7.0
Pool context option & stop with timeout
This commit is contained in:
@@ -0,0 +1,41 @@
|
|||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
- main
|
||||||
|
pull_request:
|
||||||
|
name: Build
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: Test
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
go-version: [1.15.x, 1.16.x, 1.17.x]
|
||||||
|
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
steps:
|
||||||
|
- name: Install Go
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: ${{ matrix.go-version }}
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: Test
|
||||||
|
run: go test -race -v ./
|
||||||
|
codecov:
|
||||||
|
name: Upload coverage report to Codecov
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Install Go
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: ${{ matrix.go-version }}
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: Test
|
||||||
|
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./
|
||||||
|
- uses: codecov/codecov-action@v2
|
||||||
|
with:
|
||||||
|
files: ./coverage.txt
|
||||||
|
fail_ci_if_error: true
|
||||||
|
verbose: true
|
||||||
-28
@@ -1,28 +0,0 @@
|
|||||||
language: go
|
|
||||||
|
|
||||||
os:
|
|
||||||
- linux
|
|
||||||
- osx
|
|
||||||
- windows
|
|
||||||
|
|
||||||
go:
|
|
||||||
- 1.11.x
|
|
||||||
- 1.12.x
|
|
||||||
- 1.13.x
|
|
||||||
- 1.14.x
|
|
||||||
- 1.15.x
|
|
||||||
- 1.16.x
|
|
||||||
- 1.17.x
|
|
||||||
|
|
||||||
# Enable Go Modules
|
|
||||||
env:
|
|
||||||
- GO111MODULE=on
|
|
||||||
|
|
||||||
# Skip go get
|
|
||||||
install: true
|
|
||||||
|
|
||||||
script:
|
|
||||||
- go test ./ -race -coverprofile=coverage.txt -covermode=atomic -v
|
|
||||||
|
|
||||||
after_success:
|
|
||||||
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then curl -s https://codecov.io/bash > .codecov && chmod +x .codecov && ./.codecov; else bash <(curl -s https://codecov.io/bash); fi
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
<a title="Build Status" target="_blank" href="https://travis-ci.com/alitto/pond"><img src="https://travis-ci.com/alitto/pond.svg?branch=master&status=passed"></a>
|
<img alt="Build status" src="https://github.com/alitto/pond/actions/workflows/main.yml/badge.svg?branch=master&event=push">
|
||||||
<a title="Codecov" target="_blank" href="https://codecov.io/gh/alitto/pond"><img src="https://codecov.io/gh/alitto/pond/branch/master/graph/badge.svg"></a>
|
<a title="Codecov" target="_blank" href="https://codecov.io/gh/alitto/pond"><img src="https://codecov.io/gh/alitto/pond/branch/master/graph/badge.svg"></a>
|
||||||
<a title="Release" target="_blank" href="https://github.com/alitto/pond/releases"><img src="https://img.shields.io/github/v/release/alitto/pond"></a>
|
<a title="Release" target="_blank" href="https://github.com/alitto/pond/releases"><img src="https://img.shields.io/github/v/release/alitto/pond"></a>
|
||||||
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/alitto/pond"><img src="https://goreportcard.com/badge/github.com/alitto/pond"></a>
|
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/alitto/pond"><img src="https://goreportcard.com/badge/github.com/alitto/pond"></a>
|
||||||
@@ -31,8 +31,9 @@ Some common scenarios include:
|
|||||||
- Task panics are handled gracefully (configurable panic handler)
|
- Task panics are handled gracefully (configurable panic handler)
|
||||||
- Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
|
- Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
|
||||||
- Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See [benchmarks](./benchmark/README.md))
|
- Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See [benchmarks](./benchmark/README.md))
|
||||||
- **New (since v1.3.0)**: configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
|
- Configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
|
||||||
- **New (since v1.5.0)**: complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring).
|
- Complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring).
|
||||||
|
- **New (since v1.7.0)**: configurable parent context and graceful shutdown with deadline.
|
||||||
- [API reference](https://pkg.go.dev/github.com/alitto/pond)
|
- [API reference](https://pkg.go.dev/github.com/alitto/pond)
|
||||||
|
|
||||||
## How to install
|
## How to install
|
||||||
@@ -168,6 +169,11 @@ eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager()))
|
|||||||
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
|
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
|
||||||
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))
|
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))
|
||||||
```
|
```
|
||||||
|
- **Context**: Configures a parent context on this pool to stop all workers when it is cancelled. The default value `context.Background()`. Example:
|
||||||
|
``` go
|
||||||
|
// This creates a pool that is stopped when myCtx is cancelled
|
||||||
|
pool := pond.New(10, 1000, pond.Context(myCtx))
|
||||||
|
```
|
||||||
|
|
||||||
### Resizing strategies
|
### Resizing strategies
|
||||||
|
|
||||||
@@ -177,6 +183,13 @@ The following chart illustrates the behaviour of the different pool resizing str
|
|||||||
|
|
||||||
As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs ([GOMAXPROCS](https://golang.org/pkg/runtime/#GOMAXPROCS)). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.
|
As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs ([GOMAXPROCS](https://golang.org/pkg/runtime/#GOMAXPROCS)). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.
|
||||||
|
|
||||||
|
### Stopping a pool
|
||||||
|
|
||||||
|
There are 3 methods available to stop a pool and release associated resources:
|
||||||
|
- `pool.Stop()`: stop accepting new tasks and signal all workers to stop processing new tasks. Tasks being processed by workers will continue until completion unless the process is terminated.
|
||||||
|
- `pool.StopAndWait()`: stop accepting new tasks and wait until all running and queued tasks have completed before returning.
|
||||||
|
- `pool.StopAndWaitFor(deadline time.Duration)`: similar to `StopAndWait` but with a deadline to prevent waiting indefinitely.
|
||||||
|
|
||||||
### Metrics & monitoring
|
### Metrics & monitoring
|
||||||
|
|
||||||
Each worker pool instance exposes useful metrics that can be queried through the following methods:
|
Each worker pool instance exposes useful metrics that can be queried through the following methods:
|
||||||
@@ -194,6 +207,14 @@ Each worker pool instance exposes useful metrics that can be queried through the
|
|||||||
|
|
||||||
In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how to configure collectors for these metrics and expose them to Prometheus.
|
In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how to configure collectors for these metrics and expose them to Prometheus.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
- [Creating a worker pool with dynamic size](./examples/dynamic_size/dynamic_size.go)
|
||||||
|
- [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go)
|
||||||
|
- [Creating a worker pool with a Context](./examples/pool_context/pool_context.go)
|
||||||
|
- [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go)
|
||||||
|
- [Submitting groups of related tasks](./examples/task_group/task_group.go)
|
||||||
|
|
||||||
## API Reference
|
## API Reference
|
||||||
|
|
||||||
Full API reference is available at https://pkg.go.dev/github.com/alitto/pond
|
Full API reference is available at https://pkg.go.dev/github.com/alitto/pond
|
||||||
|
|||||||
+1
-1
@@ -3,7 +3,7 @@ module github.com/alitto/pond/benchmark
|
|||||||
go 1.17
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.3.0
|
github.com/alitto/pond v1.6.1
|
||||||
github.com/gammazero/workerpool v1.1.2
|
github.com/gammazero/workerpool v1.1.2
|
||||||
github.com/panjf2000/ants/v2 v2.4.7
|
github.com/panjf2000/ants/v2 v2.4.7
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/dynamic_size
|
|||||||
go 1.17
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.5.1
|
github.com/alitto/pond v1.6.1
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/alitto/pond => ../../
|
replace github.com/alitto/pond => ../../
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size
|
|||||||
go 1.17
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.5.1
|
github.com/alitto/pond v1.6.1
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/alitto/pond => ../../
|
replace github.com/alitto/pond => ../../
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
module github.com/alitto/pond/examples/pool_context
|
||||||
|
|
||||||
|
go 1.17
|
||||||
|
|
||||||
|
require github.com/alitto/pond v1.6.1
|
||||||
|
|
||||||
|
replace github.com/alitto/pond => ../../
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alitto/pond"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Pressing Ctrl+C while this program is running will cause the program to terminate gracefully.
|
||||||
|
// Tasks being processed will continue until they finish, but queued tasks are cancelled.
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
// Create a context that will be cancelled when the user presses Ctrl+C (process receives termination signal).
|
||||||
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
// Create a pool and pass the context to it.
|
||||||
|
pool := pond.New(1, 1000, pond.Context(ctx))
|
||||||
|
defer pool.StopAndWait()
|
||||||
|
|
||||||
|
// Submit several long runnning tasks
|
||||||
|
var count int = 100
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
n := i
|
||||||
|
pool.Submit(func() {
|
||||||
|
fmt.Printf("Task #%d started\n", n)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
fmt.Printf("Task #%d finished\n", n)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size
|
|||||||
go 1.17
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.5.1
|
github.com/alitto/pond v1.6.1
|
||||||
github.com/prometheus/client_golang v1.9.0
|
github.com/prometheus/client_golang v1.9.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/task_group
|
|||||||
go 1.17
|
go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alitto/pond v1.5.1
|
github.com/alitto/pond v1.6.1
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/alitto/pond => ../../
|
replace github.com/alitto/pond => ../../
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package pond
|
package pond
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
@@ -54,22 +55,31 @@ func Strategy(strategy ResizingStrategy) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PanicHandler allows to change the panic handler function for a worker pool
|
// PanicHandler allows to change the panic handler function of a worker pool
|
||||||
func PanicHandler(panicHandler func(interface{})) Option {
|
func PanicHandler(panicHandler func(interface{})) Option {
|
||||||
return func(pool *WorkerPool) {
|
return func(pool *WorkerPool) {
|
||||||
pool.panicHandler = panicHandler
|
pool.panicHandler = panicHandler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Context configures a parent context on a worker pool to stop all workers when it is cancelled
|
||||||
|
func Context(parentCtx context.Context) Option {
|
||||||
|
return func(pool *WorkerPool) {
|
||||||
|
pool.context, pool.contextCancel = context.WithCancel(parentCtx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WorkerPool models a pool of workers
|
// WorkerPool models a pool of workers
|
||||||
type WorkerPool struct {
|
type WorkerPool struct {
|
||||||
// Configurable settings
|
// Configurable settings
|
||||||
maxWorkers int
|
maxWorkers int
|
||||||
maxCapacity int
|
maxCapacity int
|
||||||
minWorkers int
|
minWorkers int
|
||||||
idleTimeout time.Duration
|
idleTimeout time.Duration
|
||||||
strategy ResizingStrategy
|
strategy ResizingStrategy
|
||||||
panicHandler func(interface{})
|
panicHandler func(interface{})
|
||||||
|
context context.Context
|
||||||
|
contextCancel context.CancelFunc
|
||||||
// Atomic counters
|
// Atomic counters
|
||||||
workerCount int32
|
workerCount int32
|
||||||
idleWorkerCount int32
|
idleWorkerCount int32
|
||||||
@@ -78,12 +88,11 @@ type WorkerPool struct {
|
|||||||
successfulTaskCount uint64
|
successfulTaskCount uint64
|
||||||
failedTaskCount uint64
|
failedTaskCount uint64
|
||||||
// Private properties
|
// Private properties
|
||||||
tasks chan func()
|
tasks chan func()
|
||||||
purgerQuit chan struct{}
|
stopOnce sync.Once
|
||||||
stopOnce sync.Once
|
waitGroup sync.WaitGroup
|
||||||
waitGroup sync.WaitGroup
|
mutex sync.Mutex
|
||||||
mutex sync.Mutex
|
stopped bool
|
||||||
stopped bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
||||||
@@ -120,17 +129,16 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
|||||||
pool.idleTimeout = defaultIdleTimeout
|
pool.idleTimeout = defaultIdleTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create internal channels
|
// Initialize base context (if not already set)
|
||||||
|
if pool.context == nil {
|
||||||
|
Context(context.Background())(pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create tasks channel
|
||||||
pool.tasks = make(chan func(), pool.maxCapacity)
|
pool.tasks = make(chan func(), pool.maxCapacity)
|
||||||
pool.purgerQuit = make(chan struct{})
|
|
||||||
|
|
||||||
// Start purger goroutine
|
// Start purger goroutine
|
||||||
pool.waitGroup.Add(1)
|
go pool.purge()
|
||||||
go func() {
|
|
||||||
defer pool.waitGroup.Done()
|
|
||||||
|
|
||||||
pool.purge()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Start minWorkers workers
|
// Start minWorkers workers
|
||||||
if pool.minWorkers > 0 {
|
if pool.minWorkers > 0 {
|
||||||
@@ -298,7 +306,7 @@ func (p *WorkerPool) SubmitAndWait(task func()) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SubmitBefore attempts to send a task for execution to this worker pool but aborts it
|
// SubmitBefore attempts to send a task for execution to this worker pool but aborts it
|
||||||
// if the task did not start before the given deadline
|
// if the task did not start before the given deadline.
|
||||||
func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
||||||
if task == nil {
|
if task == nil {
|
||||||
return
|
return
|
||||||
@@ -318,23 +326,59 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
// Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks.
|
||||||
|
// Tasks being processed by workers will continue until completion unless the process is terminated.
|
||||||
|
// This method can only be called once.
|
||||||
func (p *WorkerPool) Stop() {
|
func (p *WorkerPool) Stop() {
|
||||||
p.stopOnce.Do(func() {
|
p.stopOnce.Do(func() {
|
||||||
// Mark pool as stopped
|
// Mark pool as stopped
|
||||||
p.stopped = true
|
p.stopped = true
|
||||||
|
|
||||||
// Send the signal to stop the purger goroutine
|
// Stop accepting new tasks
|
||||||
close(p.purgerQuit)
|
close(p.tasks)
|
||||||
|
|
||||||
|
// Terminate all workers & purger goroutine
|
||||||
|
p.contextCancel()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete
|
// StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue
|
||||||
|
// to complete before returning. This method can only be called once.
|
||||||
func (p *WorkerPool) StopAndWait() {
|
func (p *WorkerPool) StopAndWait() {
|
||||||
p.Stop()
|
p.stopOnce.Do(func() {
|
||||||
|
// Mark pool as stopped
|
||||||
|
p.stopped = true
|
||||||
|
|
||||||
// Wait for all goroutines to exit
|
// Stop accepting new tasks
|
||||||
p.waitGroup.Wait()
|
close(p.tasks)
|
||||||
|
|
||||||
|
// Wait for all workers to exit
|
||||||
|
p.waitGroup.Wait()
|
||||||
|
|
||||||
|
// Terminate all workers & purger goroutine
|
||||||
|
p.contextCancel()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning
|
||||||
|
// or until the given deadline is reached, whichever comes first. This method can only be called once.
|
||||||
|
func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) {
|
||||||
|
|
||||||
|
// Detect if worker pool is already stopped
|
||||||
|
workersDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
p.StopAndWait()
|
||||||
|
workersDone <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait until either all workers have exited or the deadline is reached
|
||||||
|
select {
|
||||||
|
case <-workersDone:
|
||||||
|
return
|
||||||
|
case <-time.After(deadline):
|
||||||
|
p.contextCancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// purge represents the work done by the purger goroutine
|
// purge represents the work done by the purger goroutine
|
||||||
@@ -343,7 +387,6 @@ func (p *WorkerPool) purge() {
|
|||||||
idleTicker := time.NewTicker(p.idleTimeout)
|
idleTicker := time.NewTicker(p.idleTimeout)
|
||||||
defer idleTicker.Stop()
|
defer idleTicker.Stop()
|
||||||
|
|
||||||
Purge:
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// Timed out waiting for any activity to happen, attempt to kill an idle worker
|
// Timed out waiting for any activity to happen, attempt to kill an idle worker
|
||||||
@@ -351,14 +394,11 @@ Purge:
|
|||||||
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers {
|
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers {
|
||||||
p.tasks <- nil
|
p.tasks <- nil
|
||||||
}
|
}
|
||||||
case <-p.purgerQuit:
|
// Pool context was cancelled, exit
|
||||||
break Purge
|
case <-p.context.Done():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send signal to stop all workers
|
|
||||||
close(p.tasks)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// startWorkers creates new worker goroutines to run the given tasks
|
// startWorkers creates new worker goroutines to run the given tasks
|
||||||
@@ -370,7 +410,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Launch worker
|
// Launch worker
|
||||||
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
|
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -435,7 +475,7 @@ func (p *WorkerPool) Group() *TaskGroup {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// worker launches a worker goroutine
|
// worker launches a worker goroutine
|
||||||
func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
|
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// Decrement idle count
|
// Decrement idle count
|
||||||
@@ -452,20 +492,26 @@ func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitH
|
|||||||
// Increment idle count
|
// Increment idle count
|
||||||
atomic.AddInt32(idleWorkerCount, 1)
|
atomic.AddInt32(idleWorkerCount, 1)
|
||||||
|
|
||||||
for task := range tasks {
|
for {
|
||||||
if task == nil {
|
select {
|
||||||
// We have received a signal to quit
|
case <-context.Done():
|
||||||
|
// Pool context was cancelled, exit
|
||||||
return
|
return
|
||||||
|
case task, ok := <-tasks:
|
||||||
|
if task == nil || !ok {
|
||||||
|
// We have received a signal to quit
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decrement idle count
|
||||||
|
atomic.AddInt32(idleWorkerCount, -1)
|
||||||
|
|
||||||
|
// We have received a task, execute it
|
||||||
|
taskExecutor(task)
|
||||||
|
|
||||||
|
// Increment idle count
|
||||||
|
atomic.AddInt32(idleWorkerCount, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrement idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, -1)
|
|
||||||
|
|
||||||
// We have received a task, execute it
|
|
||||||
taskExecutor(task)
|
|
||||||
|
|
||||||
// Increment idle count
|
|
||||||
atomic.AddInt32(idleWorkerCount, 1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+76
-4
@@ -1,6 +1,7 @@
|
|||||||
package pond_test
|
package pond_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -23,7 +24,7 @@ func assertNotEqual(t *testing.T, expected interface{}, actual interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSubmitAndStopWaiting(t *testing.T) {
|
func TestSubmitAndStopWait(t *testing.T) {
|
||||||
|
|
||||||
pool := pond.New(1, 5)
|
pool := pond.New(1, 5)
|
||||||
|
|
||||||
@@ -42,6 +43,42 @@ func TestSubmitAndStopWaiting(t *testing.T) {
|
|||||||
assertEqual(t, int32(17), atomic.LoadInt32(&doneCount))
|
assertEqual(t, int32(17), atomic.LoadInt32(&doneCount))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndStopWaitFor(t *testing.T) {
|
||||||
|
|
||||||
|
pool := pond.New(1, 10)
|
||||||
|
|
||||||
|
// Submit a long running task
|
||||||
|
var doneCount int32
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wait 100ms for the task to complete
|
||||||
|
pool.StopAndWaitFor(50 * time.Millisecond)
|
||||||
|
|
||||||
|
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubmitAndStopWaitForWithEnoughDeadline(t *testing.T) {
|
||||||
|
|
||||||
|
pool := pond.New(1, 10)
|
||||||
|
|
||||||
|
// Submit tasks
|
||||||
|
var doneCount int32
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
pool.Submit(func() {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all submitted tasks complete
|
||||||
|
pool.StopAndWaitFor(1 * time.Second)
|
||||||
|
|
||||||
|
assertEqual(t, int32(10), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) {
|
func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) {
|
||||||
|
|
||||||
pool := pond.New(18, 5)
|
pool := pond.New(18, 5)
|
||||||
@@ -146,16 +183,22 @@ func TestSubmitBefore(t *testing.T) {
|
|||||||
atomic.AddInt32(&doneCount, 1)
|
atomic.AddInt32(&doneCount, 1)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Submit a task that times out after 2ms
|
// Submit a task that times out after 5ms
|
||||||
pool.SubmitBefore(func() {
|
pool.SubmitBefore(func() {
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
atomic.AddInt32(&doneCount, 1)
|
atomic.AddInt32(&doneCount, 1)
|
||||||
}, 5*time.Millisecond)
|
}, 5*time.Millisecond)
|
||||||
|
|
||||||
|
// Submit a task that times out after 1s
|
||||||
|
pool.SubmitBefore(func() {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
}, 1*time.Second)
|
||||||
|
|
||||||
pool.StopAndWait()
|
pool.StopAndWait()
|
||||||
|
|
||||||
// Only the first task must have executed
|
// Only 2 tasks must have executed
|
||||||
assertEqual(t, int32(1), atomic.LoadInt32(&doneCount))
|
assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSubmitBeforeWithNilTask(t *testing.T) {
|
func TestSubmitBeforeWithNilTask(t *testing.T) {
|
||||||
@@ -496,3 +539,32 @@ func TestMetricsAndGetters(t *testing.T) {
|
|||||||
assertEqual(t, uint64(17), pool.CompletedTasks())
|
assertEqual(t, uint64(17), pool.CompletedTasks())
|
||||||
assertEqual(t, uint64(0), pool.WaitingTasks())
|
assertEqual(t, uint64(0), pool.WaitingTasks())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubmitWithContext(t *testing.T) {
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
pool := pond.New(1, 5, pond.Context(ctx))
|
||||||
|
|
||||||
|
var doneCount, taskCount int32
|
||||||
|
|
||||||
|
// Submit a long-running, cancellable task
|
||||||
|
pool.Submit(func() {
|
||||||
|
atomic.AddInt32(&taskCount, 1)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(1 * time.Minute):
|
||||||
|
atomic.AddInt32(&doneCount, 1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Cancel the context
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
pool.StopAndWait()
|
||||||
|
|
||||||
|
assertEqual(t, int32(1), atomic.LoadInt32(&taskCount))
|
||||||
|
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user