Group context

This commit is contained in:
Alejandro Durante
2022-05-09 19:06:05 -03:00
parent d4c09d4973
commit 3968f4003c
14 changed files with 338 additions and 66 deletions
+1 -1
View File
@@ -10,7 +10,7 @@ jobs:
name: Test
strategy:
matrix:
go-version: [1.15.x, 1.16.x, 1.17.x]
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
+59 -4
View File
@@ -25,7 +25,8 @@ Some common scenarios include:
- Submitting tasks to a pool in a fire-and-forget fashion
- Submitting tasks to a pool and waiting for them to complete
- Submitting tasks to a pool with a deadline
- Submitting a group of related tasks and waiting for them to complete
- Submitting a group of tasks and waiting for them to complete
- Submitting a group of tasks associated to a Context
- Getting the number of running workers (goroutines)
- Stopping a worker pool
- Task panics are handled gracefully (configurable panic handler)
@@ -104,7 +105,7 @@ func main() {
}
```
### Submitting groups of related tasks
### Submitting a group of tasks
``` go
package main
@@ -124,7 +125,7 @@ func main() {
// Create a task group
group := pool.Group()
// Submit a group of related tasks
// Submit a group of tasks
for i := 0; i < 20; i++ {
n := i
group.Submit(func() {
@@ -137,6 +138,59 @@ func main() {
}
```
### Submitting a group of tasks associated to a context (**since v1.8.0**)
This feature provides synchronization, error propagation, and Context cancelation for subtasks of a common task. Similar to `errgroup.Group` from [`golang.org/x/sync/errgroup`](https://pkg.go.dev/golang.org/x/sync/errgroup) package with concurrency bounded by the worker pool.
``` go
package main
import (
"context"
"fmt"
"net/http"
"github.com/alitto/pond"
)
func main() {
// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()
// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())
var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}
// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
```
### Pool Configuration Options
- **MinWorkers**: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:
@@ -213,7 +267,8 @@ In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how
- [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go)
- [Creating a worker pool with a Context](./examples/pool_context/pool_context.go)
- [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go)
- [Submitting groups of related tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks](./examples/task_group/task_group.go)
- [Submitting a group of tasks associated to a context](./examples/group_context/group_context.go)
## API Reference
+2 -2
View File
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/dynamic_size
go 1.17
go 1.18
require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)
replace github.com/alitto/pond => ../../
+2 -2
View File
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size
go 1.17
go 1.18
require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)
replace github.com/alitto/pond => ../../
+9
View File
@@ -0,0 +1,9 @@
module github.com/alitto/pond/examples/group_context
go 1.18
require (
github.com/alitto/pond v1.7.1
)
replace github.com/alitto/pond => ../../
+46
View File
@@ -0,0 +1,46 @@
package main
import (
"context"
"fmt"
"net/http"
"github.com/alitto/pond"
)
func main() {
// Create a worker pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()
// Create a task group associated to a context
group, ctx := pool.GroupContext(context.Background())
var urls = []string{
"https://www.golang.org/",
"https://www.google.com/",
"https://www.github.com/",
}
// Submit tasks to fetch each URL
for _, url := range urls {
url := url
group.Submit(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := http.DefaultClient.Do(req)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP requests to complete.
err := group.Wait()
if err != nil {
fmt.Printf("Failed to fetch URLs: %v", err)
} else {
fmt.Println("Successfully fetched all URLs")
}
}
+2 -2
View File
@@ -1,7 +1,7 @@
module github.com/alitto/pond/examples/pool_context
go 1.17
go 1.18
require github.com/alitto/pond v1.7.0
require github.com/alitto/pond v1.7.1
replace github.com/alitto/pond => ../../
+2 -2
View File
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size
go 1.17
go 1.18
require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
github.com/prometheus/client_golang v1.9.0
)
+2 -2
View File
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/task_group
go 1.17
go 1.18
require (
github.com/alitto/pond v1.7.0
github.com/alitto/pond v1.7.1
)
replace github.com/alitto/pond => ../../
+1 -1
View File
@@ -1,3 +1,3 @@
module github.com/alitto/pond
go 1.17
go 1.18
+90
View File
@@ -0,0 +1,90 @@
package pond
import (
"context"
"sync"
)
// TaskGroup represents a group of related tasks
type TaskGroup struct {
pool *WorkerPool
waitGroup sync.WaitGroup
}
// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroup) Submit(task func()) {
g.waitGroup.Add(1)
g.pool.Submit(func() {
defer g.waitGroup.Done()
task()
})
}
// Wait waits until all the tasks in this group have completed
func (g *TaskGroup) Wait() {
// Wait for all tasks to complete
g.waitGroup.Wait()
}
// TaskGroup represents a group of related tasks associated to a context
type TaskGroupWithContext struct {
TaskGroup
ctx context.Context
cancel context.CancelFunc
errOnce sync.Once
err error
}
// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroupWithContext) Submit(task func() error) {
g.waitGroup.Add(1)
g.pool.Submit(func() {
defer g.waitGroup.Done()
// If context has already been cancelled, skip task execution
if g.ctx != nil {
select {
case <-g.ctx.Done():
return
default:
}
}
// don't actually ignore errors
err := task()
if err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
})
}
// Wait blocks until either all the tasks submitted to this group have completed,
// one of them returned a non-nil error or the context associated to this group
// was canceled.
func (g *TaskGroupWithContext) Wait() error {
// Wait for all tasks to complete
tasksCompleted := make(chan struct{})
go func() {
g.waitGroup.Wait()
tasksCompleted <- struct{}{}
}()
select {
case <-tasksCompleted:
// If context was provided, cancel it to signal all running tasks to stop
g.cancel()
case <-g.ctx.Done():
}
return g.err
}
+107
View File
@@ -0,0 +1,107 @@
package pond_test
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/alitto/pond"
)
func TestGroupSubmit(t *testing.T) {
pool := pond.New(5, 1000)
assertEqual(t, 0, pool.RunningWorkers())
// Submit groups of tasks
var doneCount, taskCount int32
var groups []*pond.TaskGroup
for i := 0; i < 5; i++ {
group := pool.Group()
for j := 0; j < i+5; j++ {
group.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
taskCount++
}
groups = append(groups, group)
}
// Wait for all groups to complete
for _, group := range groups {
group.Wait()
}
assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount))
}
func TestGroupContext(t *testing.T) {
pool := pond.New(3, 100)
assertEqual(t, 0, pool.RunningWorkers())
// Submit a group of tasks
var doneCount, startedCount int32
group, ctx := pool.GroupContext(context.Background())
for i := 0; i < 10; i++ {
group.Submit(func() error {
atomic.AddInt32(&startedCount, 1)
select {
case <-time.After(5 * time.Millisecond):
atomic.AddInt32(&doneCount, 1)
case <-ctx.Done():
}
return nil
})
}
err := group.Wait()
assertEqual(t, nil, err)
assertEqual(t, int32(10), atomic.LoadInt32(&startedCount))
assertEqual(t, int32(10), atomic.LoadInt32(&doneCount))
}
func TestGroupContextWithError(t *testing.T) {
pool := pond.New(1, 100)
assertEqual(t, 0, pool.RunningWorkers())
expectedErr := errors.New("Something went wrong")
// Submit a group of tasks
var doneCount, startedCount int32
group, ctx := pool.GroupContext(context.Background())
for i := 0; i < 10; i++ {
n := i
group.Submit(func() error {
atomic.AddInt32(&startedCount, 1)
// Task number 5 fails
if n == 4 {
time.Sleep(10 * time.Millisecond)
return expectedErr
}
select {
case <-time.After(5 * time.Millisecond):
atomic.AddInt32(&doneCount, 1)
case <-ctx.Done():
}
return nil
})
}
err := group.Wait()
assertEqual(t, expectedErr, err)
pool.StopAndWait()
assertEqual(t, int32(5), atomic.LoadInt32(&startedCount))
assertEqual(t, int32(4), atomic.LoadInt32(&doneCount))
}
+15 -22
View File
@@ -487,6 +487,21 @@ func (p *WorkerPool) Group() *TaskGroup {
}
}
// GroupContext creates a new task group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function submitted to the group
// returns a non-nil error or the first time Wait returns, whichever occurs first.
func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &TaskGroupWithContext{
TaskGroup: TaskGroup{
pool: p,
},
ctx: ctx,
cancel: cancel,
}, ctx
}
// worker launches a worker goroutine
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {
@@ -528,25 +543,3 @@ func worker(context context.Context, firstTask func(), tasks <-chan func(), idle
}
}
}
// TaskGroup represents a group of related tasks
type TaskGroup struct {
pool *WorkerPool
waitGroup sync.WaitGroup
}
// Submit adds a task to this group and sends it to the worker pool to be executed
func (g *TaskGroup) Submit(task func()) {
g.waitGroup.Add(1)
g.pool.Submit(func() {
defer g.waitGroup.Done()
task()
})
}
// Wait waits until all the tasks in this group have completed
func (g *TaskGroup) Wait() {
// Wait for all tasks to complete
g.waitGroup.Wait()
}
-28
View File
@@ -450,34 +450,6 @@ func TestPoolWithCustomMinWorkers(t *testing.T) {
assertEqual(t, 0, pool.RunningWorkers())
}
func TestGroupSubmit(t *testing.T) {
pool := pond.New(5, 1000)
assertEqual(t, 0, pool.RunningWorkers())
// Submit groups of tasks
var doneCount, taskCount int32
var groups []*pond.TaskGroup
for i := 0; i < 5; i++ {
group := pool.Group()
for j := 0; j < i+5; j++ {
group.Submit(func() {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&doneCount, 1)
})
taskCount++
}
groups = append(groups, group)
}
// Wait for all groups to complete
for _, group := range groups {
group.Wait()
}
assertEqual(t, int32(taskCount), atomic.LoadInt32(&doneCount))
}
func TestPoolWithCustomStrategy(t *testing.T) {
pool := pond.New(3, 3, pond.Strategy(pond.RatedResizer(2)))