[pool] Increase test coverage
This commit is contained in:
committed by
Jack Christensen
parent
89668fae42
commit
3009dbab62
@@ -115,10 +115,24 @@ func (res *Resource[T]) IdleDuration() time.Duration {
|
||||
|
||||
// Pool is a concurrency-safe resource pool.
|
||||
type Pool[T any] struct {
|
||||
mux sync.Mutex
|
||||
acquireSem *semaphore.Weighted
|
||||
destructWG sync.WaitGroup
|
||||
// Pool invariant is that semaphore is locked before mutex (doesn't
|
||||
// apply to TryAcquire in AcquireAllIdle). Another invariant is that
|
||||
// semaphore has to be released BEFORE unlock of mutex!
|
||||
|
||||
// mux is the pool internal lock. Any modification of shared state of
|
||||
// the pool (but Acquires of acquireSem) must be performed only by
|
||||
// holder of the lock. Long running operations are not allowed when mux
|
||||
// is held.
|
||||
mux sync.Mutex
|
||||
// acquireSem provides an allowance to TRY to acquire a resource. The
|
||||
// acquire of semaphore token doesn't guarantee that an attempt to
|
||||
// acquire the resource will succeed.
|
||||
//
|
||||
// Releases are allowed only when caller holds mux. Acquires have to
|
||||
// happen before mux is locked.
|
||||
acquireSem *semaphore.Weighted
|
||||
|
||||
destructWG sync.WaitGroup
|
||||
allResources resList[T]
|
||||
idleResources resList[T]
|
||||
|
||||
@@ -347,8 +361,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
|
||||
|
||||
p.mux.Lock()
|
||||
if p.closed {
|
||||
p.mux.Unlock()
|
||||
p.acquireSem.Release(1)
|
||||
p.mux.Unlock()
|
||||
return nil, ErrClosedPool
|
||||
}
|
||||
|
||||
@@ -365,8 +379,6 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
|
||||
|
||||
if len(p.allResources) > int(p.maxSize) {
|
||||
// Unreachable code.
|
||||
p.mux.Unlock()
|
||||
p.acquireSem.Release(1)
|
||||
panic("bug: semaphore allowed more acquires than pool allows")
|
||||
}
|
||||
|
||||
@@ -422,11 +434,11 @@ func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Res
|
||||
p.allResources.remove(res)
|
||||
p.destructWG.Done()
|
||||
|
||||
p.mux.Unlock()
|
||||
// The resource won't be acquired because its
|
||||
// construction failed. We have to allow someone else to
|
||||
// take that resouce.
|
||||
p.acquireSem.Release(1)
|
||||
p.mux.Unlock()
|
||||
|
||||
select {
|
||||
case constructErrChan <- err:
|
||||
@@ -503,12 +515,13 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
res := p.createNewResource()
|
||||
go func() {
|
||||
value, err := p.constructor(ctx)
|
||||
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
// We have to create the resource and only then release the
|
||||
// semaphore - For the time being there is no resource that
|
||||
// someone could acquire.
|
||||
defer p.acquireSem.Release(1)
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
if err != nil {
|
||||
p.allResources.remove(res)
|
||||
@@ -524,18 +537,18 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
return nil, ErrNotAvailable
|
||||
}
|
||||
|
||||
// acquireSemAll acquires all free tokens from sem. This function is guaranteed
|
||||
// to acquire at least the lowest number of tokens that has been available in
|
||||
// the semaphore during runtime of this function.
|
||||
// acquireSemAll tries to acquire num free tokens from sem. This function is
|
||||
// guaranteed to acquire at least the lowest number of tokens that has been
|
||||
// available in the semaphore during runtime of this function.
|
||||
//
|
||||
// For the time being, semaphore doesn't allow to acquire all tokens atomically
|
||||
// (see https://github.com/golang/sync/pull/19). We simulate this by trying all
|
||||
// powers of 2 that are less or equal to num.
|
||||
//
|
||||
// For example, let's immagine we have 19 free tokens in the semaphore which in
|
||||
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then num
|
||||
// is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 tokens.
|
||||
// Out of those, the acquire of 16, 2 and 1 tokens will succeed.
|
||||
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
|
||||
// num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
|
||||
// tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
|
||||
//
|
||||
// Naturally, Acquires and Releases of the semaphore might take place
|
||||
// concurrently. For this reason, it's not guaranteed that absolutely all free
|
||||
@@ -550,7 +563,7 @@ func acquireSemAll(sem *semaphore.Weighted, num int) int {
|
||||
|
||||
var acquired int
|
||||
for i := int(log2Int(num)); i >= 0; i-- {
|
||||
val := int(1) << i
|
||||
val := 1 << i
|
||||
if sem.TryAcquire(int64(val)) {
|
||||
acquired += val
|
||||
}
|
||||
@@ -579,7 +592,7 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
||||
// TryAcquire cannot block, the fact that we hold mutex locked and try
|
||||
// to acquire semaphore cannot result in dead-lock.
|
||||
//
|
||||
// TODO: Replace this with acquireSem.TryAcqireAll() if it gets to
|
||||
// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
|
||||
// upstream. https://github.com/golang/sync/pull/19
|
||||
_ = acquireSemAll(p.acquireSem, numIdle)
|
||||
|
||||
@@ -654,9 +667,9 @@ func (p *Pool[T]) Reset() {
|
||||
|
||||
// releaseAcquiredResource returns res to the the pool.
|
||||
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
|
||||
defer p.acquireSem.Release(1)
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
defer p.acquireSem.Release(1)
|
||||
|
||||
if p.closed || res.poolResetCount != p.resetCount {
|
||||
p.allResources.remove(res)
|
||||
@@ -672,16 +685,16 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
|
||||
// pool Remove will panic.
|
||||
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
|
||||
p.destructResourceValue(res.value)
|
||||
defer p.acquireSem.Release(1)
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
defer p.acquireSem.Release(1)
|
||||
p.allResources.remove(res)
|
||||
}
|
||||
|
||||
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
|
||||
defer p.acquireSem.Release(1)
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
defer p.acquireSem.Release(1)
|
||||
|
||||
p.allResources.remove(res)
|
||||
res.status = resourceStatusHijacked
|
||||
|
||||
Reference in New Issue
Block a user