Do not cancel resource construction when Acquire is canceled
https://github.com/jackc/pgx/issues/1287 https://github.com/jackc/pgx/issues/1259
This commit is contained in:
@@ -129,7 +129,9 @@ type Pool[T any] struct {
|
|||||||
|
|
||||||
resetCount int
|
resetCount int
|
||||||
|
|
||||||
closed bool
|
baseAcquireCtx context.Context
|
||||||
|
cancelBaseAcquireCtx func()
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config[T any] struct {
|
type Config[T any] struct {
|
||||||
@@ -144,12 +146,16 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
|
|||||||
return nil, errors.New("MaxSize must be >= 1")
|
return nil, errors.New("MaxSize must be >= 1")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
return &Pool[T]{
|
return &Pool[T]{
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
destructWG: &sync.WaitGroup{},
|
destructWG: &sync.WaitGroup{},
|
||||||
maxSize: config.MaxSize,
|
maxSize: config.MaxSize,
|
||||||
constructor: config.Constructor,
|
constructor: config.Constructor,
|
||||||
destructor: config.Destructor,
|
destructor: config.Destructor,
|
||||||
|
baseAcquireCtx: baseAcquireCtx,
|
||||||
|
cancelBaseAcquireCtx: cancelBaseAcquireCtx,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,6 +168,7 @@ func (p *Pool[T]) Close() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.closed = true
|
p.closed = true
|
||||||
|
p.cancelBaseAcquireCtx()
|
||||||
|
|
||||||
for _, res := range p.idleResources {
|
for _, res := range p.idleResources {
|
||||||
p.allResources = removeResource(p.allResources, res)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
@@ -266,10 +273,25 @@ func (p *Pool[T]) Stat() *Stat {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire gets a resource from the pool. If no resources are available and the pool
|
// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation.
|
||||||
// is not at maximum capacity it will create a new resource. If the pool is at
|
type valueCancelCtx struct {
|
||||||
// maximum capacity it will block until a resource is available. ctx can be used
|
valueCtx context.Context
|
||||||
// to cancel the Acquire.
|
cancelCtx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() }
|
||||||
|
func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() }
|
||||||
|
func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() }
|
||||||
|
func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) }
|
||||||
|
|
||||||
|
// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
|
||||||
|
// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
|
||||||
|
// used to cancel the Acquire.
|
||||||
|
//
|
||||||
|
// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
|
||||||
|
// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
|
||||||
|
// the problem of it being impossible to create resources when the time to create a resource is greater than any one
|
||||||
|
// caller of Acquire is willing to wait.
|
||||||
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
||||||
startNano := nanotime()
|
startNano := nanotime()
|
||||||
if doneChan := ctx.Done(); doneChan != nil {
|
if doneChan := ctx.Done(); doneChan != nil {
|
||||||
@@ -317,55 +339,46 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
|||||||
p.destructWG.Add(1)
|
p.destructWG.Add(1)
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
// we create the resource in the background because the constructor might
|
// Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling
|
||||||
// outlive the context and we want to continue constructing it as long as
|
// the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259
|
||||||
// necessary but the acquire should be cancelled when the context is cancelled
|
|
||||||
// see: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259
|
|
||||||
constructErrCh := make(chan error)
|
constructErrCh := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
value, err := p.constructResourceValue(ctx)
|
constructorCtx := &valueCancelCtx{valueCtx: ctx, cancelCtx: p.baseAcquireCtx}
|
||||||
|
value, err := p.constructResourceValue(constructorCtx)
|
||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.allResources = removeResource(p.allResources, res)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
p.destructWG.Done()
|
p.destructWG.Done()
|
||||||
|
|
||||||
// we can't use default here in case we get here before the caller is
|
constructErrCh <- err
|
||||||
// in the select
|
|
||||||
select {
|
|
||||||
case constructErrCh <- err:
|
|
||||||
case <-ctx.Done():
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
}
|
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
p.cond.Signal()
|
p.cond.Signal()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
res.value = value
|
|
||||||
|
|
||||||
// assume that we will acquire it
|
res.value = value
|
||||||
res.status = resourceStatusAcquired
|
res.status = resourceStatusAcquired
|
||||||
// we can't use default here in case we get here before the caller is
|
|
||||||
// in the select
|
|
||||||
select {
|
select {
|
||||||
case constructErrCh <- nil:
|
case constructErrCh <- nil:
|
||||||
p.emptyAcquireCount += 1
|
p.emptyAcquireCount += 1
|
||||||
p.acquireCount += 1
|
p.acquireCount += 1
|
||||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
// we don't call Signal here we didn't change any of the resource pools
|
// No need to call Signal as this new resource was immediately acquired and did not change availability for
|
||||||
|
// any waiting Acquire calls.
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
// we don't call Signal here we didn't change any of the resopurce pools
|
|
||||||
// since we couldn't send the constructed resource to the acquire
|
|
||||||
// function that means the caller has stopped waiting and we should
|
|
||||||
// just put this resource back in the pool
|
|
||||||
p.releaseAcquiredResource(res, res.lastUsedNano)
|
p.releaseAcquiredResource(res, res.lastUsedNano)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
p.cond.L.Unlock()
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case err := <-constructErrCh:
|
case err := <-constructErrCh:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -75,6 +75,52 @@ func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
|
|||||||
res.Release()
|
res.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolAcquireCallsConstructorWithAcquireContextValuesButNotDeadline(t *testing.T) {
|
||||||
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
|
if ctx.Value("test") != "from Acquire" {
|
||||||
|
return 0, errors.New("did not get value from Acquire")
|
||||||
|
}
|
||||||
|
if _, ok := ctx.Deadline(); ok {
|
||||||
|
return 0, errors.New("should not have gotten deadline from Acquire")
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer pool.Close()
|
||||||
|
|
||||||
|
ctx := context.WithValue(context.Background(), "test", "from Acquire")
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
res, err := pool.Acquire(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, res.Value())
|
||||||
|
assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
|
||||||
|
res.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPoolAcquireCalledConstructorIsNotCanceledByAcquireCancellation(t *testing.T) {
|
||||||
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer pool.Close()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
res, err := pool.Acquire(ctx)
|
||||||
|
assert.Nil(t, res)
|
||||||
|
assert.Equal(t, context.DeadlineExceeded, err)
|
||||||
|
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.EqualValues(t, 1, pool.Stat().TotalResources())
|
||||||
|
assert.EqualValues(t, 1, pool.Stat().CanceledAcquireCount())
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
|
func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
|
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
|
||||||
|
|||||||
Reference in New Issue
Block a user