Improve README.md
This commit is contained in:
@@ -9,13 +9,17 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultIdleTimeout defines the default idle timeout to use when not explicitly specified
|
||||
// via the IdleTimeout() option
|
||||
defaultIdleTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// defaultPanicHandler is the default panic handler
|
||||
func defaultPanicHandler(panic interface{}) {
|
||||
fmt.Printf("Worker exits from a panic: %v\nStack trace: %s\n", panic, string(debug.Stack()))
|
||||
}
|
||||
|
||||
// linearGrowthFn is a function that determines how many workers to create when backpressure is detected
|
||||
func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int {
|
||||
if workerCount < minWorkers {
|
||||
return minWorkers
|
||||
@@ -26,7 +30,7 @@ func linearGrowthFn(workerCount, minWorkers, maxWorkers int) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Option represents an option that can be passed when building a worker pool to customize it
|
||||
// Option represents an option that can be passed when instantiating a worker pool to customize it
|
||||
type Option func(*WorkerPool)
|
||||
|
||||
// IdleTimeout allows to change the idle timeout for a worker pool
|
||||
@@ -66,18 +70,20 @@ type WorkerPool struct {
|
||||
growthFn func(int, int, int) int
|
||||
}
|
||||
|
||||
// New creates a worker pool with that can scale up to the given number of workers and capacity
|
||||
// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
|
||||
// The maxCapacity parameter determines the number of tasks that can be submitted to this pool without blocking,
|
||||
// because it defines the size of the buffered channel used to receive tasks.
|
||||
// The options parameter can take a list of functions to customize configuration values on this worker pool.
|
||||
func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||
|
||||
// Instantiate the pool
|
||||
pool := &WorkerPool{
|
||||
maxWorkers: maxWorkers,
|
||||
maxCapacity: maxCapacity,
|
||||
idleTimeout: defaultIdleTimeout,
|
||||
tasks: make(chan func(), maxCapacity),
|
||||
dispatchedTasks: make(chan func(), maxWorkers),
|
||||
purgerQuit: make(chan struct{}),
|
||||
panicHandler: defaultPanicHandler,
|
||||
growthFn: linearGrowthFn,
|
||||
maxWorkers: maxWorkers,
|
||||
maxCapacity: maxCapacity,
|
||||
idleTimeout: defaultIdleTimeout,
|
||||
purgerQuit: make(chan struct{}),
|
||||
panicHandler: defaultPanicHandler,
|
||||
growthFn: linearGrowthFn,
|
||||
}
|
||||
|
||||
// Apply all options
|
||||
@@ -92,6 +98,16 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
|
||||
if pool.minWorkers > pool.maxWorkers {
|
||||
pool.minWorkers = pool.maxWorkers
|
||||
}
|
||||
if pool.maxCapacity < 0 {
|
||||
pool.maxCapacity = 0
|
||||
}
|
||||
if pool.idleTimeout < 0 {
|
||||
pool.idleTimeout = defaultIdleTimeout
|
||||
}
|
||||
|
||||
// Create channels
|
||||
pool.tasks = make(chan func(), pool.maxCapacity)
|
||||
pool.dispatchedTasks = make(chan func(), pool.maxWorkers)
|
||||
|
||||
// Start dispatcher goroutine
|
||||
pool.waitGroup.Add(1)
|
||||
@@ -123,7 +139,7 @@ func (p *WorkerPool) Running() int {
|
||||
}
|
||||
|
||||
// Submit sends a task to this worker pool for execution. If the queue is full,
|
||||
// it will wait until the task can be enqueued.
|
||||
// it will wait until the task can be enqueued
|
||||
func (p *WorkerPool) Submit(task func()) {
|
||||
if task == nil {
|
||||
return
|
||||
@@ -134,7 +150,7 @@ func (p *WorkerPool) Submit(task func()) {
|
||||
}
|
||||
|
||||
// SubmitAndWait sends a task to this worker pool for execution and waits for it to complete
|
||||
// before returning.
|
||||
// before returning
|
||||
func (p *WorkerPool) SubmitAndWait(task func()) {
|
||||
if task == nil {
|
||||
return
|
||||
@@ -148,40 +164,83 @@ func (p *WorkerPool) SubmitAndWait(task func()) {
|
||||
<-done
|
||||
}
|
||||
|
||||
// SubmitBefore attempts to send a task for execution to this worker pool but aborts it
|
||||
// if the task did not start before the given deadline
|
||||
func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
|
||||
if task == nil {
|
||||
return
|
||||
}
|
||||
|
||||
timer := time.NewTimer(deadline)
|
||||
p.Submit(func() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
// Deadline was reached, abort the task
|
||||
default:
|
||||
// Deadline not reached, execute the task
|
||||
defer timer.Stop()
|
||||
|
||||
task()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||
func (p *WorkerPool) Stop() {
|
||||
p.stop(false)
|
||||
p.stopOnce.Do(func() {
|
||||
// Close the tasks channel to prevent receiving new tasks
|
||||
close(p.tasks)
|
||||
})
|
||||
}
|
||||
|
||||
// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete
|
||||
func (p *WorkerPool) StopAndWait() {
|
||||
p.stop(true)
|
||||
p.Stop()
|
||||
|
||||
// Wait for all goroutines to exit
|
||||
p.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// dispatch represents the work done by the dispatcher goroutine
|
||||
func (p *WorkerPool) dispatch() {
|
||||
|
||||
batchSize := p.maxWorkers
|
||||
batch := make([]func(), 0)
|
||||
|
||||
for task := range p.tasks {
|
||||
if task == nil {
|
||||
// Received the signal to exit gracefully
|
||||
break
|
||||
batch = append(batch, task)
|
||||
|
||||
// Read up to batchSize - 1 tasks without blocking
|
||||
BulkReceive:
|
||||
for i := 0; i < batchSize; i++ {
|
||||
select {
|
||||
case t := <-p.tasks:
|
||||
batch = append(batch, t)
|
||||
default:
|
||||
break BulkReceive
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
// Attempt to submit the task to a worker without blocking
|
||||
case p.dispatchedTasks <- task:
|
||||
if p.Running() == 0 {
|
||||
p.startWorkers()
|
||||
}
|
||||
default:
|
||||
// Create a new worker if we haven't reached the limit yet
|
||||
if p.Running() < p.maxWorkers {
|
||||
p.startWorkers()
|
||||
}
|
||||
for _, task := range batch {
|
||||
select {
|
||||
// Attempt to submit the task to a worker without blocking
|
||||
case p.dispatchedTasks <- task:
|
||||
if p.Running() == 0 {
|
||||
p.startWorkers()
|
||||
}
|
||||
default:
|
||||
// Create a new worker if we haven't reached the limit yet
|
||||
if p.Running() < p.maxWorkers {
|
||||
p.startWorkers()
|
||||
}
|
||||
|
||||
// Block until a worker accepts this task
|
||||
p.dispatchedTasks <- task
|
||||
// Block until a worker accepts this task
|
||||
p.dispatchedTasks <- task
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch slice
|
||||
batch = nil
|
||||
}
|
||||
|
||||
// Send signal to stop all workers
|
||||
@@ -215,21 +274,17 @@ func (p *WorkerPool) purge() {
|
||||
}
|
||||
}
|
||||
|
||||
// startWorkers launches worker goroutines according to the growth function
|
||||
func (p *WorkerPool) startWorkers() {
|
||||
|
||||
count := p.growthFn(p.Running(), p.minWorkers, p.maxWorkers)
|
||||
|
||||
if count == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Increment worker count
|
||||
atomic.AddInt32(&p.workerCount, int32(count))
|
||||
|
||||
// Increment waiting group semaphore
|
||||
p.waitGroup.Add(count)
|
||||
|
||||
//go func() {
|
||||
for i := 0; i < count; i++ {
|
||||
worker(p.dispatchedTasks, func() {
|
||||
|
||||
@@ -241,28 +296,6 @@ func (p *WorkerPool) startWorkers() {
|
||||
|
||||
}, p.panicHandler)
|
||||
}
|
||||
//}()
|
||||
|
||||
}
|
||||
|
||||
// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit
|
||||
func (p *WorkerPool) stop(wait bool) {
|
||||
|
||||
p.stopOnce.Do(func() {
|
||||
if wait {
|
||||
// Make sure all queued tasks complete before stopping the dispatcher
|
||||
p.tasks <- nil
|
||||
|
||||
// Close the tasks channel to prevent receiving new tasks
|
||||
close(p.tasks)
|
||||
|
||||
// Wait for all goroutines to exit
|
||||
p.waitGroup.Wait()
|
||||
} else {
|
||||
// Close the tasks channel to prevent receiving new tasks
|
||||
close(p.tasks)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Group creates a new task group
|
||||
@@ -307,7 +340,7 @@ type TaskGroup struct {
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
// Submit adds a task to this group and sends it to the worker pool to be executed.
|
||||
// Submit adds a task to this group and sends it to the worker pool to be executed
|
||||
func (g *TaskGroup) Submit(task func()) {
|
||||
g.waitGroup.Add(1)
|
||||
g.pool.Submit(func() {
|
||||
@@ -316,8 +349,7 @@ func (g *TaskGroup) Submit(task func()) {
|
||||
})
|
||||
}
|
||||
|
||||
// Wait waits until all the tasks in this group have completed. It returns
|
||||
// a slice with all (non-nil) errors returned by tasks in this group.
|
||||
// Wait waits until all the tasks in this group have completed
|
||||
func (g *TaskGroup) Wait() {
|
||||
|
||||
// Wait for all tasks to complete
|
||||
|
||||
Reference in New Issue
Block a user