Fix race with background Acquire creation
This commit is contained in:
committed by
Jack Christensen
parent
da694536ff
commit
f63192c063
@@ -323,49 +323,34 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
|||||||
p.allResources = removeResource(p.allResources, res)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
p.destructWG.Done()
|
p.destructWG.Done()
|
||||||
|
|
||||||
select {
|
// we can't use default here in case we get here before the caller is
|
||||||
case <-ctx.Done():
|
// in the select
|
||||||
if err == ctx.Err() {
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
p.cond.Signal()
|
|
||||||
|
|
||||||
// try to notify the caller that we failed
|
|
||||||
select {
|
select {
|
||||||
case constructErrCh <- err:
|
case constructErrCh <- err:
|
||||||
default:
|
case <-ctx.Done():
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
}
|
}
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
res.value = value
|
res.value = value
|
||||||
|
|
||||||
// check the context now so we don't increment the metrics when the caller
|
// assume that we will acquire it
|
||||||
// has already been cancelled
|
res.status = resourceStatusAcquired
|
||||||
|
// we can't use default here in case we get here before the caller is
|
||||||
|
// in the select
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case constructErrCh <- nil:
|
||||||
p.cond.L.Unlock()
|
|
||||||
default:
|
|
||||||
// assume that we will acquire it
|
|
||||||
res.status = resourceStatusAcquired
|
|
||||||
// we have to increment these BEFORE the next select because otherwise
|
|
||||||
// they could run after Acquire has returned and that will mess up
|
|
||||||
// tests but this also means that these could be incremented even if
|
|
||||||
// the acquire times out, but we just checked that so the chances are
|
|
||||||
// slim
|
|
||||||
p.emptyAcquireCount += 1
|
p.emptyAcquireCount += 1
|
||||||
p.acquireCount += 1
|
p.acquireCount += 1
|
||||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
|
// we don't call Signal here we didn't change any of the resource pools
|
||||||
|
case <-ctx.Done():
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
p.cond.L.Unlock()
|
||||||
// we don't call Signal here we didn't change any of the resopurce pools
|
// we don't call Signal here we didn't change any of the resopurce pools
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case constructErrCh <- nil:
|
|
||||||
default:
|
|
||||||
// since we couldn't send the constructed resource to the acquire
|
// since we couldn't send the constructed resource to the acquire
|
||||||
// function that means the caller has stopped waiting and we should
|
// function that means the caller has stopped waiting and we should
|
||||||
// just put this resource back in the pool
|
// just put this resource back in the pool
|
||||||
|
|||||||
@@ -648,6 +648,9 @@ func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
|
|||||||
_, err := pool.Acquire(ctx)
|
_, err := pool.Acquire(ctx)
|
||||||
require.Equal(t, context.Canceled, err)
|
require.Equal(t, context.Canceled, err)
|
||||||
|
|
||||||
|
// sleep to give the constructor goroutine time to mark cancelled
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
stat := pool.Stat()
|
stat := pool.Stat()
|
||||||
assert.Equal(t, int64(0), stat.AcquireCount())
|
assert.Equal(t, int64(0), stat.AcquireCount())
|
||||||
assert.Equal(t, int64(1), stat.CanceledAcquireCount())
|
assert.Equal(t, int64(1), stat.CanceledAcquireCount())
|
||||||
|
|||||||
Reference in New Issue
Block a user