Add getters for metrics and configuration options

This commit is contained in:
alitto
2020-12-31 12:31:23 -03:00
parent 5f7cd69f18
commit bb00d6603a
5 changed files with 656 additions and 58 deletions
+123 -38
View File
@@ -65,8 +65,12 @@ type WorkerPool struct {
strategy ResizingStrategy
panicHandler func(interface{})
// Atomic counters
workerCount int32
idleWorkerCount int32
workerCount int32
idleWorkerCount int32
waitingTaskCount uint64
submittedTaskCount uint64
successfulTaskCount uint64
failedTaskCount uint64
// Private properties
tasks chan func()
purgerQuit chan struct{}
@@ -131,16 +135,64 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
return pool
}
// Running returns the number of running workers
func (p *WorkerPool) Running() int {
// RunningWorkers returns the number of running workers
func (p *WorkerPool) RunningWorkers() int {
return int(atomic.LoadInt32(&p.workerCount))
}
// Idle returns the number of idle workers
func (p *WorkerPool) Idle() int {
// IdleWorkers returns the number of idle workers
func (p *WorkerPool) IdleWorkers() int {
return int(atomic.LoadInt32(&p.idleWorkerCount))
}
// MinWorkers returns minimum number of worker goroutines
func (p *WorkerPool) MinWorkers() int {
return p.minWorkers
}
// MaxWorkers returns maximum number of worker goroutines
func (p *WorkerPool) MaxWorkers() int {
return p.maxWorkers
}
// MaxCapacity returns the maximum number of tasks that can be waiting in the queue
// at any given time (queue size)
func (p *WorkerPool) MaxCapacity() int {
return p.maxCapacity
}
// Strategy returns the configured pool resizing strategy
func (p *WorkerPool) Strategy() ResizingStrategy {
return p.strategy
}
// SubmittedTasks returns the number of tasks submitted since the pool was created
func (p *WorkerPool) SubmittedTasks() uint64 {
return atomic.LoadUint64(&p.submittedTaskCount)
}
// WaitingTasks returns the current number of tasks in the queue that are waiting to be executed
func (p *WorkerPool) WaitingTasks() uint64 {
return atomic.LoadUint64(&p.waitingTaskCount)
}
// SuccessfulTasks returns the number of tasks that have successfully completed their exection
// since the pool was created
func (p *WorkerPool) SuccessfulTasks() uint64 {
return atomic.LoadUint64(&p.successfulTaskCount)
}
// FailedTasks returns the number of tasks that completed with panic since the pool was created
func (p *WorkerPool) FailedTasks() uint64 {
return atomic.LoadUint64(&p.failedTaskCount)
}
// CompletedTasks returns the number of tasks that have completed their exection either successfully
// or with panic since the pool was created
func (p *WorkerPool) CompletedTasks() uint64 {
return p.SuccessfulTasks() + p.FailedTasks()
}
// Submit sends a task to this worker pool for execution. If the queue is full,
// it will wait until the task is dispatched to a worker goroutine.
func (p *WorkerPool) Submit(task func()) {
@@ -154,38 +206,58 @@ func (p *WorkerPool) TrySubmit(task func()) bool {
return p.submit(task, false)
}
func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted bool) {
if task == nil {
return false
}
runningWorkerCount := p.Running()
defer func() {
if submitted {
// Increment submitted task count
atomic.AddUint64(&p.submittedTaskCount, 1)
// Increment waiting task count
atomic.AddUint64(&p.waitingTaskCount, 1)
}
}()
runningWorkerCount := p.RunningWorkers()
// Attempt to dispatch to an idle worker without blocking
if runningWorkerCount > 0 && p.Idle() > 0 {
if runningWorkerCount > 0 && p.IdleWorkers() > 0 {
select {
case p.tasks <- task:
return true
submitted = true
return
default:
// No idle worker available, continue
}
}
maxWorkersReached := runningWorkerCount >= p.maxWorkers
// Exit if we have reached the max. number of workers and can't wait for an idle worker
if maxWorkersReached && !waitForIdle {
return false
// Start a worker as long as we haven't reached the limit
if runningWorkerCount < p.maxWorkers {
if ok := p.maybeStartWorker(task); ok {
submitted = true
return
}
}
// Start a worker as long as we haven't reached the limit
if ok := p.maybeStartWorker(task); ok {
return true
if !canWaitForIdleWorker {
select {
case p.tasks <- task:
submitted = true
return
default:
// Channel is full and can't wait for an idle worker, so need to exit
submitted = false
return
}
}
// Submit the task to the tasks channel and wait for it to be picked up by a worker
p.tasks <- task
return true
submitted = true
return
}
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
@@ -251,7 +323,7 @@ Purge:
select {
// Timed out waiting for any activity to happen, attempt to kill an idle worker
case <-idleTicker.C:
if p.Idle() > 0 && p.Running() > p.minWorkers {
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers {
p.tasks <- nil
}
case <-p.purgerQuit:
@@ -273,16 +345,38 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
}
// Launch worker
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.panicHandler)
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
return true
}
// executeTask executes the given task and updates task-related counters
func (p *WorkerPool) executeTask(task func()) {
defer func() {
if panic := recover(); panic != nil {
// Increment failed task count
atomic.AddUint64(&p.failedTaskCount, 1)
// Invoke panic handler
p.panicHandler(panic)
}
}()
// Decrement waiting task count
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
task()
// Increment successful task count
atomic.AddUint64(&p.successfulTaskCount, 1)
}
func (p *WorkerPool) incrementWorkerCount() bool {
// Attempt to increment worker count
p.mutex.Lock()
runningWorkerCount := p.Running()
runningWorkerCount := p.RunningWorkers()
// Execute the resizing strategy to determine if we can create more workers
if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers {
p.mutex.Unlock()
@@ -316,28 +410,19 @@ func (p *WorkerPool) Group() *TaskGroup {
}
// worker launches a worker goroutine
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), panicHandler func(interface{})) {
func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
defer func() {
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
if panic := recover(); panic != nil {
// Handle panic
panicHandler(panic)
// Restart goroutine
go worker(nil, tasks, idleWorkerCount, exitHandler, panicHandler)
} else {
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)
// Handle normal exit
exitHandler()
}
// Handle normal exit
exitHandler()
}()
// We have received a task, execute it
if firstTask != nil {
firstTask()
taskExecutor(firstTask)
}
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
@@ -352,7 +437,7 @@ func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHan
atomic.AddInt32(idleWorkerCount, -1)
// We have received a task, execute it
task()
taskExecutor(task)
// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)