From fdc2381cbe4012cb43d887f8f5cfa0357da82409 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 3 Sep 2022 13:08:12 -0500 Subject: [PATCH] Do not cancel resource construction when Acquire is canceled https://github.com/jackc/pgx/issues/1287 https://github.com/jackc/pgx/issues/1259 --- pool.go | 77 ++++++++++++++++++++++++++++++---------------------- pool_test.go | 46 +++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 32 deletions(-) diff --git a/pool.go b/pool.go index bad7627..9c87f58 100644 --- a/pool.go +++ b/pool.go @@ -129,7 +129,9 @@ type Pool[T any] struct { resetCount int - closed bool + baseAcquireCtx context.Context + cancelBaseAcquireCtx func() + closed bool } type Config[T any] struct { @@ -144,12 +146,16 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { return nil, errors.New("MaxSize must be >= 1") } + baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) + return &Pool[T]{ - cond: sync.NewCond(new(sync.Mutex)), - destructWG: &sync.WaitGroup{}, - maxSize: config.MaxSize, - constructor: config.Constructor, - destructor: config.Destructor, + cond: sync.NewCond(new(sync.Mutex)), + destructWG: &sync.WaitGroup{}, + maxSize: config.MaxSize, + constructor: config.Constructor, + destructor: config.Destructor, + baseAcquireCtx: baseAcquireCtx, + cancelBaseAcquireCtx: cancelBaseAcquireCtx, }, nil } @@ -162,6 +168,7 @@ func (p *Pool[T]) Close() { return } p.closed = true + p.cancelBaseAcquireCtx() for _, res := range p.idleResources { p.allResources = removeResource(p.allResources, res) @@ -266,10 +273,25 @@ func (p *Pool[T]) Stat() *Stat { return s } -// Acquire gets a resource from the pool. If no resources are available and the pool -// is not at maximum capacity it will create a new resource. If the pool is at -// maximum capacity it will block until a resource is available. ctx can be used -// to cancel the Acquire. +// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation. +type valueCancelCtx struct { + valueCtx context.Context + cancelCtx context.Context +} + +func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() } +func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() } +func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() } +func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) } + +// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will +// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be +// used to cancel the Acquire. +// +// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to +// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids +// the problem of it being impossible to create resources when the time to create a resource is greater than any one +// caller of Acquire is willing to wait. func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { startNano := nanotime() if doneChan := ctx.Done(); doneChan != nil { @@ -317,55 +339,46 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { p.destructWG.Add(1) p.cond.L.Unlock() - // we create the resource in the background because the constructor might - // outlive the context and we want to continue constructing it as long as - // necessary but the acquire should be cancelled when the context is cancelled - // see: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 + // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling + // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 constructErrCh := make(chan error) go func() { - value, err := p.constructResourceValue(ctx) + constructorCtx := &valueCancelCtx{valueCtx: ctx, cancelCtx: p.baseAcquireCtx} + value, err := p.constructResourceValue(constructorCtx) p.cond.L.Lock() if err != nil { p.allResources = removeResource(p.allResources, res) p.destructWG.Done() - // we can't use default here in case we get here before the caller is - // in the select - select { - case constructErrCh <- err: - case <-ctx.Done(): - p.canceledAcquireCount += 1 - } + constructErrCh <- err + p.cond.L.Unlock() p.cond.Signal() return } - res.value = value - // assume that we will acquire it + res.value = value res.status = resourceStatusAcquired - // we can't use default here in case we get here before the caller is - // in the select + select { case constructErrCh <- nil: p.emptyAcquireCount += 1 p.acquireCount += 1 p.acquireDuration += time.Duration(nanotime() - startNano) p.cond.L.Unlock() - // we don't call Signal here we didn't change any of the resource pools + // No need to call Signal as this new resource was immediately acquired and did not change availability for + // any waiting Acquire calls. case <-ctx.Done(): - p.canceledAcquireCount += 1 p.cond.L.Unlock() - // we don't call Signal here we didn't change any of the resopurce pools - // since we couldn't send the constructed resource to the acquire - // function that means the caller has stopped waiting and we should - // just put this resource back in the pool p.releaseAcquiredResource(res, res.lastUsedNano) } }() select { case <-ctx.Done(): + p.cond.L.Lock() + p.canceledAcquireCount += 1 + p.cond.L.Unlock() return nil, ctx.Err() case err := <-constructErrCh: if err != nil { diff --git a/pool_test.go b/pool_test.go index caa9258..b6c573e 100644 --- a/pool_test.go +++ b/pool_test.go @@ -75,6 +75,52 @@ func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) { res.Release() } +func TestPoolAcquireCallsConstructorWithAcquireContextValuesButNotDeadline(t *testing.T) { + constructor := func(ctx context.Context) (int, error) { + if ctx.Value("test") != "from Acquire" { + return 0, errors.New("did not get value from Acquire") + } + if _, ok := ctx.Deadline(); ok { + return 0, errors.New("should not have gotten deadline from Acquire") + } + + return 1, nil + } + pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + require.NoError(t, err) + defer pool.Close() + + ctx := context.WithValue(context.Background(), "test", "from Acquire") + ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + res, err := pool.Acquire(ctx) + require.NoError(t, err) + assert.Equal(t, 1, res.Value()) + assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second) + res.Release() +} + +func TestPoolAcquireCalledConstructorIsNotCanceledByAcquireCancellation(t *testing.T) { + constructor := func(ctx context.Context) (int, error) { + time.Sleep(100 * time.Millisecond) + return 1, nil + } + pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + require.NoError(t, err) + defer pool.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond) + defer cancel() + res, err := pool.Acquire(ctx) + assert.Nil(t, res) + assert.Equal(t, context.DeadlineExceeded, err) + + time.Sleep(200 * time.Millisecond) + + assert.EqualValues(t, 1, pool.Stat().TotalResources()) + assert.EqualValues(t, 1, pool.Stat().CanceledAcquireCount()) +} + func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) { constructor, createCounter := createConstructor() pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})