Use generational stack for idle connections
This commit is contained in:
committed by
Jack Christensen
parent
3009dbab62
commit
d970a39050
@@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/puddle/v2/internal/genstack"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
@@ -115,26 +116,21 @@ func (res *Resource[T]) IdleDuration() time.Duration {
|
||||
|
||||
// Pool is a concurrency-safe resource pool.
|
||||
type Pool[T any] struct {
|
||||
// Pool invariant is that semaphore is locked before mutex (doesn't
|
||||
// apply to TryAcquire in AcquireAllIdle). Another invariant is that
|
||||
// semaphore has to be released BEFORE unlock of mutex!
|
||||
|
||||
// mux is the pool internal lock. Any modification of shared state of
|
||||
// the pool (but Acquires of acquireSem) must be performed only by
|
||||
// holder of the lock. Long running operations are not allowed when mux
|
||||
// is held.
|
||||
mux sync.Mutex
|
||||
// acquireSem provides an allowance to TRY to acquire a resource. The
|
||||
// acquire of semaphore token doesn't guarantee that an attempt to
|
||||
// acquire the resource will succeed.
|
||||
// acquireSem provides an allowance to acquire a resource.
|
||||
//
|
||||
// Releases are allowed only when caller holds mux. Acquires have to
|
||||
// happen before mux is locked.
|
||||
// happen before mux is locked (doesn't apply to semaphore.TryAcquire in
|
||||
// AcquireAllIdle).
|
||||
acquireSem *semaphore.Weighted
|
||||
destructWG sync.WaitGroup
|
||||
|
||||
destructWG sync.WaitGroup
|
||||
allResources resList[T]
|
||||
idleResources resList[T]
|
||||
idleResources *genstack.GenStack[*Resource[T]]
|
||||
|
||||
constructor Constructor[T]
|
||||
destructor Destructor[T]
|
||||
@@ -168,6 +164,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
|
||||
|
||||
return &Pool[T]{
|
||||
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
|
||||
idleResources: genstack.NewGenStack[*Resource[T]](),
|
||||
maxSize: config.MaxSize,
|
||||
constructor: config.Constructor,
|
||||
destructor: config.Destructor,
|
||||
@@ -190,8 +187,7 @@ func (p *Pool[T]) Close() {
|
||||
p.closed = true
|
||||
p.cancelBaseAcquireCtx()
|
||||
|
||||
for len(p.idleResources) > 0 {
|
||||
res := p.idleResources.popBack()
|
||||
for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
|
||||
p.allResources.remove(res)
|
||||
go p.destructResourceValue(res.value)
|
||||
}
|
||||
@@ -294,11 +290,11 @@ func (p *Pool[T]) Stat() *Stat {
|
||||
//
|
||||
// WARNING: Caller of this method must hold the pool mutex!
|
||||
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
|
||||
if len(p.idleResources) == 0 {
|
||||
res, ok := p.idleResources.Pop()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
res := p.idleResources.popBack()
|
||||
res.status = resourceStatusAcquired
|
||||
return res
|
||||
}
|
||||
@@ -347,75 +343,57 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
|
||||
// This function exists solely only for benchmarking purposes.
|
||||
func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
|
||||
startNano := nanotime()
|
||||
|
||||
var waitedForLock bool
|
||||
|
||||
for {
|
||||
if !p.acquireSem.TryAcquire(1) {
|
||||
waitedForLock = true
|
||||
err := p.acquireSem.Acquire(ctx, 1)
|
||||
if err != nil {
|
||||
p.canceledAcquireCount.Add(1)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
p.mux.Lock()
|
||||
if p.closed {
|
||||
p.acquireSem.Release(1)
|
||||
p.mux.Unlock()
|
||||
return nil, ErrClosedPool
|
||||
}
|
||||
|
||||
// If a resource is available in the pool.
|
||||
if res := p.tryAcquireIdleResource(); res != nil {
|
||||
if waitedForLock {
|
||||
p.emptyAcquireCount += 1
|
||||
}
|
||||
p.acquireCount += 1
|
||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||
p.mux.Unlock()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
if len(p.allResources) > int(p.maxSize) {
|
||||
// Unreachable code.
|
||||
panic("bug: semaphore allowed more acquires than pool allows")
|
||||
}
|
||||
|
||||
// An idle resource could be "stolen" by AcquireAllIdle at the
|
||||
// time we already held the semaphore token. In such a case we
|
||||
// cannot create a new resource because the pool is already
|
||||
// full. Consequently, we will have to try it once again.
|
||||
//
|
||||
// Naturally the same situation with AcquireAllIdle could happen
|
||||
// even when the pool is not full. But in such a case we can
|
||||
// just create a new resource.
|
||||
if len(p.allResources) == int(p.maxSize) {
|
||||
// We do not release the semaphore here because the
|
||||
// connection is in reality held by the caller of
|
||||
// AcquireAllIdle.
|
||||
p.mux.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// The resource is not idle, but there is enough space to create one.
|
||||
res := p.createNewResource()
|
||||
p.mux.Unlock()
|
||||
|
||||
res, err := p.initResourceValue(ctx, res)
|
||||
if !p.acquireSem.TryAcquire(1) {
|
||||
waitedForLock = true
|
||||
err := p.acquireSem.Acquire(ctx, 1)
|
||||
if err != nil {
|
||||
p.canceledAcquireCount.Add(1)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
p.mux.Lock()
|
||||
if p.closed {
|
||||
p.acquireSem.Release(1)
|
||||
p.mux.Unlock()
|
||||
return nil, ErrClosedPool
|
||||
}
|
||||
|
||||
p.emptyAcquireCount += 1
|
||||
// If a resource is available in the pool.
|
||||
if res := p.tryAcquireIdleResource(); res != nil {
|
||||
if waitedForLock {
|
||||
p.emptyAcquireCount += 1
|
||||
}
|
||||
p.acquireCount += 1
|
||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||
|
||||
p.mux.Unlock()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
if len(p.allResources) >= int(p.maxSize) {
|
||||
// Unreachable code.
|
||||
panic("bug: semaphore allowed more acquires than pool allows")
|
||||
}
|
||||
|
||||
// The resource is not idle, but there is enough space to create one.
|
||||
res := p.createNewResource()
|
||||
p.mux.Unlock()
|
||||
|
||||
res, err := p.initResourceValue(ctx, res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
|
||||
p.emptyAcquireCount += 1
|
||||
p.acquireCount += 1
|
||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
|
||||
@@ -494,24 +472,11 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
if len(p.allResources) > int(p.maxSize) {
|
||||
if len(p.allResources) >= int(p.maxSize) {
|
||||
// Unreachable code.
|
||||
panic("bug: semaphore allowed more acquires than pool allows")
|
||||
}
|
||||
|
||||
// An idle resource could be "stolen" by AcquireAllIdle at the time we
|
||||
// already held the semaphore token. In such a case we cannot create a
|
||||
// new resource because the pool is already full.
|
||||
//
|
||||
// Naturally the same situation with AcquireAllIdle could happen even
|
||||
// when the pool is not full. But in such a case we can just create a
|
||||
// new resource.
|
||||
if len(p.allResources) == int(p.maxSize) {
|
||||
// We do not release the semaphore here because the connection
|
||||
// is in reality held by the caller of AcquireAllIdle.
|
||||
return nil, ErrNotAvailable
|
||||
}
|
||||
|
||||
res := p.createNewResource()
|
||||
go func() {
|
||||
value, err := p.constructor(ctx)
|
||||
@@ -531,7 +496,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
|
||||
res.value = value
|
||||
res.status = resourceStatusIdle
|
||||
p.idleResources.append(res)
|
||||
p.idleResources.Push(res)
|
||||
}()
|
||||
|
||||
return nil, ErrNotAvailable
|
||||
@@ -556,6 +521,9 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
// the minimal number of tokens that has been present over the whole process
|
||||
// will be acquired. This is sufficient for the use-case we have in this
|
||||
// package.
|
||||
//
|
||||
// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
|
||||
// upstream. https://github.com/golang/sync/pull/19
|
||||
func acquireSemAll(sem *semaphore.Weighted, num int) int {
|
||||
if sem.TryAcquire(int64(num)) {
|
||||
return num
|
||||
@@ -583,7 +551,7 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
||||
return nil
|
||||
}
|
||||
|
||||
numIdle := len(p.idleResources)
|
||||
numIdle := p.idleResources.Len()
|
||||
if numIdle == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -592,17 +560,27 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
||||
// TryAcquire cannot block, the fact that we hold mutex locked and try
|
||||
// to acquire semaphore cannot result in dead-lock.
|
||||
//
|
||||
// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
|
||||
// upstream. https://github.com/golang/sync/pull/19
|
||||
_ = acquireSemAll(p.acquireSem, numIdle)
|
||||
// Because the mutex is locked, no parallel Release can run. This
|
||||
// implies that the number of tokens can only decrease because some
|
||||
// Acquire/TryAcquire call can consume the semaphore token. Consequently
|
||||
// acquired is always less or equal to numIdle. Moreover if acquired <
|
||||
// numIdle, then there are some parallel Acquire/TryAcquire calls that
|
||||
// will take the remaining idle connections.
|
||||
acquired := acquireSemAll(p.acquireSem, numIdle)
|
||||
|
||||
idle := make([]*Resource[T], numIdle)
|
||||
idle := make([]*Resource[T], acquired)
|
||||
for i := range idle {
|
||||
res := p.idleResources.popBack()
|
||||
res, _ := p.idleResources.Pop()
|
||||
res.status = resourceStatusAcquired
|
||||
idle[i] = res
|
||||
}
|
||||
|
||||
// We have to bump the generation to ensure that Acquire/TryAcquire
|
||||
// calls running in parallel (those which caused acquired < numIdle)
|
||||
// will consume old connections and not freshly released connections
|
||||
// instead.
|
||||
p.idleResources.NextGen()
|
||||
|
||||
return idle
|
||||
}
|
||||
|
||||
@@ -642,7 +620,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
||||
return ErrClosedPool
|
||||
}
|
||||
p.allResources.append(res)
|
||||
p.idleResources.append(res)
|
||||
p.idleResources.Push(res)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -658,8 +636,7 @@ func (p *Pool[T]) Reset() {
|
||||
|
||||
p.resetCount++
|
||||
|
||||
for len(p.idleResources) > 0 {
|
||||
res := p.idleResources.popBack()
|
||||
for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
|
||||
p.allResources.remove(res)
|
||||
go p.destructResourceValue(res.value)
|
||||
}
|
||||
@@ -677,7 +654,7 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
|
||||
} else {
|
||||
res.lastUsedNano = lastUsedNano
|
||||
res.status = resourceStatusIdle
|
||||
p.idleResources.append(res)
|
||||
p.idleResources.Push(res)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -685,9 +662,11 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
|
||||
// pool Remove will panic.
|
||||
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
|
||||
p.destructResourceValue(res.value)
|
||||
|
||||
p.mux.Lock()
|
||||
defer p.mux.Unlock()
|
||||
defer p.acquireSem.Release(1)
|
||||
|
||||
p.allResources.remove(res)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user