Merge branch 'mtharp-deadlock-race'
This commit is contained in:
@@ -303,10 +303,6 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
p.cond.L.Lock()
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
|
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
|
||||||
// do anything with it. Another goroutine might be waiting.
|
// do anything with it. Another goroutine might be waiting.
|
||||||
go func() {
|
go func() {
|
||||||
@@ -315,6 +311,9 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
|
|||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
p.cond.L.Unlock()
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case <-waitChan:
|
case <-waitChan:
|
||||||
}
|
}
|
||||||
|
|||||||
+12
-4
@@ -8,6 +8,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -514,7 +515,12 @@ func TestStress(t *testing.T) {
|
|||||||
destructorCalls.Next()
|
destructorCalls.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
pool := puddle.NewPool(constructor, destructor, 10)
|
poolSize := runtime.NumCPU()
|
||||||
|
if poolSize < 4 {
|
||||||
|
poolSize = 4
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := puddle.NewPool(constructor, destructor, int32(poolSize))
|
||||||
|
|
||||||
finishChan := make(chan struct{})
|
finishChan := make(chan struct{})
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
@@ -547,7 +553,7 @@ func TestStress(t *testing.T) {
|
|||||||
},
|
},
|
||||||
// Acquire possibly canceled by context
|
// Acquire possibly canceled by context
|
||||||
func() {
|
func() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(200))*time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(2000))*time.Nanosecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
res, err := pool.Acquire(ctx)
|
res, err := pool.Acquire(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -557,7 +563,7 @@ func TestStress(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Int63n(2000)) * time.Nanosecond)
|
||||||
releaseOrDestroyOrHijack(res)
|
releaseOrDestroyOrHijack(res)
|
||||||
},
|
},
|
||||||
// AcquireAllIdle (though under heavy load this will almost certainly always get an empty slice)
|
// AcquireAllIdle (though under heavy load this will almost certainly always get an empty slice)
|
||||||
@@ -569,7 +575,9 @@ func TestStress(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 100; i++ {
|
workerCount := int(poolSize) * 2
|
||||||
|
|
||||||
|
for i := 0; i < workerCount; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
|||||||
Reference in New Issue
Block a user