From 33a5f62a2eba1731dd17af28418afb6d929b0a51 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Wed, 26 Dec 2018 16:30:39 -0600 Subject: [PATCH] Add stress test and fix race it detected --- pool.go | 2 ++ pool_test.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/pool.go b/pool.go index d3e76c2..39c57df 100644 --- a/pool.go +++ b/pool.go @@ -285,7 +285,9 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) { select { case <-ctx.Done(): + p.cond.L.Lock() p.canceledAcquireCount += 1 + p.cond.L.Unlock() // Allow goroutine waiting for signal to exit. Re-signal since we couldn't // do anything with it. Another goroutine might be waiting. diff --git a/pool_test.go b/pool_test.go index 226c018..f4cad11 100644 --- a/pool_test.go +++ b/pool_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "math/rand" + "os" "sync" "testing" "time" @@ -478,6 +480,96 @@ func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) { assert.Nil(t, res) } +func TestStress(t *testing.T) { + constructor, _ := createConstructor() + var destructorCalls Counter + destructor := func(interface{}) { + destructorCalls.Next() + } + + pool := puddle.NewPool(constructor, destructor, 10) + + finishChan := make(chan struct{}) + wg := &sync.WaitGroup{} + + releaseOrDestroyOrHijack := func(res *puddle.Resource) { + n := rand.Intn(100) + if n < 5 { + res.Hijack() + destructor(res) + } else if n < 10 { + res.Destroy() + } else { + res.Release() + } + } + + actions := []func(){ + // Acquire + func() { + res, err := pool.Acquire(context.Background()) + if err != nil { + if err != puddle.ErrClosedPool { + assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err) + } + return + } + + time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond) + releaseOrDestroyOrHijack(res) + }, + // Acquire possibly canceled by context + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(200))*time.Millisecond) + defer cancel() + res, err := pool.Acquire(ctx) + if err != nil { + if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded { + assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err) + } + return + } + + time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond) + releaseOrDestroyOrHijack(res) + }, + // AcquireAllIdle (though under heavy load this will almost certainly always get an empty slice) + func() { + resources := pool.AcquireAllIdle() + for _, res := range resources { + res.Release() + } + }, + } + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + for { + select { + case <-finishChan: + wg.Done() + return + default: + } + + actions[rand.Intn(len(actions))]() + } + }() + } + + s := os.Getenv("STRESS_TEST_DURATION") + if s == "" { + s = "1s" + } + testDuration, err := time.ParseDuration(s) + require.Nil(t, err) + time.AfterFunc(testDuration, func() { close(finishChan) }) + wg.Wait() + + pool.Close() +} + func BenchmarkPoolAcquireAndRelease(b *testing.B) { benchmarks := []struct { poolSize int