merge upstream

This commit is contained in:
2024-06-18 22:26:11 +03:00
parent 37ea64509a
commit 8d3e4a8232
11 changed files with 82 additions and 19 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/dynamic_size module git.company.lan/gopkg/pond/examples/dynamic_size
go 1.19 go 1.18
require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/fixed_size module git.company.lan/gopkg/pond/examples/fixed_size
go 1.19 go 1.18
require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/group_context module git.company.lan/gopkg/pond/examples/group_context
go 1.19 go 1.18
require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/pool_context module git.company.lan/gopkg/pond/examples/pool_context
go 1.19 go 1.18
require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/fixed_size module git.company.lan/gopkg/pond/examples/fixed_size
go 1.19 go 1.18
require ( require (
git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,6 +1,6 @@
module git.company.lan/gopkg/pond/examples/task_group module git.company.lan/gopkg/pond/examples/task_group
go 1.19 go 1.18
require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000 require git.company.lan/gopkg/pond v0.0.0-00010101000000-000000000000
+1 -1
View File
@@ -1,3 +1,3 @@
module git.company.lan/gopkg/pond module git.company.lan/gopkg/pond
go 1.19 go 1.18
+4 -4
View File
@@ -364,13 +364,13 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
// Terminate all workers & purger goroutine // Terminate all workers & purger goroutine
p.contextCancel() p.contextCancel()
// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()
// close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made) // close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made)
p.tasksCloseOnce.Do(func() { p.tasksCloseOnce.Do(func() {
close(p.tasks) close(p.tasks)
}) })
// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()
} }
// purge represents the work done by the purger goroutine // purge represents the work done by the purger goroutine
@@ -420,7 +420,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
} }
// Launch worker goroutine // Launch worker goroutine
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask) go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask, &p.tasksWaitGroup)
return true return true
} }
+41
View File
@@ -542,6 +542,47 @@ func TestSubmitWithContext(t *testing.T) {
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
} }
func TestSubmitWithContextCancelWithIdleTasks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pool := pond.New(1, 5, pond.Context(ctx))
var doneCount, taskCount int32
// Submit a long-running, cancellable task
pool.Submit(func() {
atomic.AddInt32(&taskCount, 1)
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
atomic.AddInt32(&doneCount, 1)
return
}
})
// Submit a long-running, cancellable task
pool.Submit(func() {
atomic.AddInt32(&taskCount, 1)
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
atomic.AddInt32(&doneCount, 1)
return
}
})
// Cancel the context
cancel()
pool.StopAndWait()
assertEqual(t, int32(1), atomic.LoadInt32(&taskCount))
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
}
func TestConcurrentStopAndWait(t *testing.T) { func TestConcurrentStopAndWait(t *testing.T) {
pool := pond.New(1, 5) pool := pond.New(1, 5)
+5
View File
@@ -39,6 +39,9 @@ func TestPurgeAfterPoolStopped(t *testing.T) {
pool.SubmitAndWait(func() { pool.SubmitAndWait(func() {
atomic.AddInt32(&doneCount, 1) atomic.AddInt32(&doneCount, 1)
}) })
time.Sleep(10 * time.Millisecond)
assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) assertEqual(t, int32(1), atomic.LoadInt32(&doneCount))
assertEqual(t, 1, pool.RunningWorkers()) assertEqual(t, 1, pool.RunningWorkers())
@@ -59,6 +62,8 @@ func TestPurgeDuringSubmit(t *testing.T) {
atomic.AddInt32(&doneCount, 1) atomic.AddInt32(&doneCount, 1)
}) })
time.Sleep(10 * time.Millisecond)
assertEqual(t, 1, pool.IdleWorkers()) assertEqual(t, 1, pool.IdleWorkers())
// Stop an idle worker right before submitting another task // Stop an idle worker right before submitting another task
+25 -8
View File
@@ -6,7 +6,7 @@ import (
) )
// worker represents a worker goroutine // worker represents a worker goroutine
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) { func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool), taskWaitGroup *sync.WaitGroup) {
// If provided, execute the first task immediately, before listening to the tasks channel // If provided, execute the first task immediately, before listening to the tasks channel
if firstTask != nil { if firstTask != nil {
@@ -20,16 +20,33 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func()
for { for {
select { select {
case <-context.Done(): case <-context.Done():
// Pool context was cancelled, exit // Pool context was cancelled, empty tasks channel and exit
drainTasks(tasks, taskWaitGroup)
return return
case task, ok := <-tasks: case task, ok := <-tasks:
if task == nil || !ok { // Prioritize context.Done statement (https://stackoverflow.com/questions/46200343/force-priority-of-go-select-statement)
// We have received a signal to exit select {
return case <-context.Done():
} if task != nil && ok {
// We have received a task, ignore it
taskWaitGroup.Done()
}
default:
if task == nil || !ok {
// We have received a signal to exit
return
}
// We have received a task, execute it // We have received a task, execute it
taskExecutor(task, false) taskExecutor(task, false)
}
} }
} }
} }
// drainPendingTasks discards queued tasks and decrements the corresponding wait group
func drainTasks(tasks <-chan func(), tasksWaitGroup *sync.WaitGroup) {
for _ = range tasks {
tasksWaitGroup.Done()
}
}