diff --git a/internal/genstack/gen_stack.go b/internal/genstack/gen_stack.go new file mode 100644 index 0000000..7e4660c --- /dev/null +++ b/internal/genstack/gen_stack.go @@ -0,0 +1,85 @@ +package genstack + +// GenStack implements a generational stack. +// +// GenStack works as common stack except for the fact that all elements in the +// older generation are guaranteed to be popped before any element in the newer +// generation. New elements are always pushed to the current (newest) +// generation. +// +// We could also say that GenStack behaves as a stack in case of a single +// generation, but it behaves as a queue of individual generation stacks. +type GenStack[T any] struct { + // We can represent arbitrary number of generations using 2 stacks. The + // new stack stores all new pushes and the old stack serves all reads. + // Old stack can represent multiple generations. If old == new, then all + // elements pushed in previous (not current) generations have already + // been popped. + + old *stack[T] + new *stack[T] +} + +// NewGenStack creates a new empty GenStack. +func NewGenStack[T any]() *GenStack[T] { + s := &stack[T]{} + return &GenStack[T]{ + old: s, + new: s, + } +} + +func (s *GenStack[T]) Pop() (T, bool) { + // Pushes always append to the new stack, so if the old once becomes + // empty, it will remail empty forever. + if s.old.len() == 0 && s.old != s.new { + s.old = s.new + } + + if s.old.len() == 0 { + var zero T + return zero, false + } + + return s.old.pop(), true +} + +// Push pushes a new element at the top of the stack. +func (s *GenStack[T]) Push(v T) { s.new.push(v) } + +// NextGen starts a new stack generation. +func (s *GenStack[T]) NextGen() { + if s.old == s.new { + s.new = &stack[T]{} + return + } + + // We need to pop from the old stack to the top of the new stack. Let's + // have an example: + // + // Old: 4 3 2 1 + // New: 8 7 6 5 + // PopOrder: 1 2 3 4 5 6 7 8 + // + // + // To preserve pop order, we have to take all elements from the old + // stack and push them to the top of new stack: + // + // New: 8 7 6 5 4 3 2 1 + // + s.new.push(s.old.takeAll()...) + + // We have the old stack allocated and empty, so why not to reuse it as + // new new stack. + s.old, s.new = s.new, s.old +} + +// Len returns number of elements in the stack. +func (s *GenStack[T]) Len() int { + l := s.old.len() + if s.old != s.new { + l += s.new.len() + } + + return l +} diff --git a/internal/genstack/gen_stack_test.go b/internal/genstack/gen_stack_test.go new file mode 100644 index 0000000..519bd3b --- /dev/null +++ b/internal/genstack/gen_stack_test.go @@ -0,0 +1,90 @@ +package genstack + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func requirePopEmpty[T any](t testing.TB, s *GenStack[T]) { + v, ok := s.Pop() + require.False(t, ok) + require.Zero(t, v) +} + +func requirePop[T any](t testing.TB, s *GenStack[T], expected T) { + v, ok := s.Pop() + require.True(t, ok) + require.Equal(t, expected, v) +} + +func TestGenStack_Empty(t *testing.T) { + s := NewGenStack[int]() + requirePopEmpty(t, s) +} + +func TestGenStack_SingleGen(t *testing.T) { + r := require.New(t) + s := NewGenStack[int]() + + s.Push(1) + s.Push(2) + r.Equal(2, s.Len()) + + requirePop(t, s, 2) + requirePop(t, s, 1) + requirePopEmpty(t, s) +} + +func TestGenStack_TwoGen(t *testing.T) { + r := require.New(t) + s := NewGenStack[int]() + + s.Push(3) + s.Push(4) + s.Push(5) + r.Equal(3, s.Len()) + s.NextGen() + r.Equal(3, s.Len()) + s.Push(6) + s.Push(7) + r.Equal(5, s.Len()) + + requirePop(t, s, 5) + requirePop(t, s, 4) + requirePop(t, s, 3) + requirePop(t, s, 7) + requirePop(t, s, 6) + requirePopEmpty(t, s) +} + +func TestGenStack_MuptiGen(t *testing.T) { + r := require.New(t) + s := NewGenStack[int]() + + s.Push(10) + s.Push(11) + s.Push(12) + r.Equal(3, s.Len()) + s.NextGen() + r.Equal(3, s.Len()) + s.Push(13) + s.Push(14) + r.Equal(5, s.Len()) + s.NextGen() + r.Equal(5, s.Len()) + s.Push(15) + s.Push(16) + s.Push(17) + r.Equal(8, s.Len()) + + requirePop(t, s, 12) + requirePop(t, s, 11) + requirePop(t, s, 10) + requirePop(t, s, 14) + requirePop(t, s, 13) + requirePop(t, s, 17) + requirePop(t, s, 16) + requirePop(t, s, 15) + requirePopEmpty(t, s) +} diff --git a/internal/genstack/stack.go b/internal/genstack/stack.go new file mode 100644 index 0000000..dbced0c --- /dev/null +++ b/internal/genstack/stack.go @@ -0,0 +1,39 @@ +package genstack + +// stack is a wrapper around an array implementing a stack. +// +// We cannot use slice to represent the stack because append might change the +// pointer value of the slice. That would be an issue in GenStack +// implementation. +type stack[T any] struct { + arr []T +} + +// push pushes a new element at the top of a stack. +func (s *stack[T]) push(vs ...T) { s.arr = append(s.arr, vs...) } + +// pop pops the stack top-most element. +// +// If stack length is zero, this method panics. +func (s *stack[T]) pop() T { + idx := s.len() - 1 + val := s.arr[idx] + + // Avoid memory leak + var zero T + s.arr[idx] = zero + + s.arr = s.arr[:idx] + return val +} + +// takeAll returns all elements in the stack in order as they are stored - i.e. +// the top-most stack element is the last one. +func (s *stack[T]) takeAll() []T { + arr := s.arr + s.arr = nil + return arr +} + +// len returns number of elements in the stack. +func (s *stack[T]) len() int { return len(s.arr) } diff --git a/pool.go b/pool.go index bdbca59..bd16c37 100644 --- a/pool.go +++ b/pool.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/jackc/puddle/v2/internal/genstack" "go.uber.org/atomic" "golang.org/x/sync/semaphore" ) @@ -115,26 +116,21 @@ func (res *Resource[T]) IdleDuration() time.Duration { // Pool is a concurrency-safe resource pool. type Pool[T any] struct { - // Pool invariant is that semaphore is locked before mutex (doesn't - // apply to TryAcquire in AcquireAllIdle). Another invariant is that - // semaphore has to be released BEFORE unlock of mutex! - // mux is the pool internal lock. Any modification of shared state of // the pool (but Acquires of acquireSem) must be performed only by // holder of the lock. Long running operations are not allowed when mux // is held. mux sync.Mutex - // acquireSem provides an allowance to TRY to acquire a resource. The - // acquire of semaphore token doesn't guarantee that an attempt to - // acquire the resource will succeed. + // acquireSem provides an allowance to acquire a resource. // // Releases are allowed only when caller holds mux. Acquires have to - // happen before mux is locked. + // happen before mux is locked (doesn't apply to semaphore.TryAcquire in + // AcquireAllIdle). acquireSem *semaphore.Weighted + destructWG sync.WaitGroup - destructWG sync.WaitGroup allResources resList[T] - idleResources resList[T] + idleResources *genstack.GenStack[*Resource[T]] constructor Constructor[T] destructor Destructor[T] @@ -168,6 +164,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { return &Pool[T]{ acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), + idleResources: genstack.NewGenStack[*Resource[T]](), maxSize: config.MaxSize, constructor: config.Constructor, destructor: config.Destructor, @@ -190,8 +187,7 @@ func (p *Pool[T]) Close() { p.closed = true p.cancelBaseAcquireCtx() - for len(p.idleResources) > 0 { - res := p.idleResources.popBack() + for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { p.allResources.remove(res) go p.destructResourceValue(res.value) } @@ -294,11 +290,11 @@ func (p *Pool[T]) Stat() *Stat { // // WARNING: Caller of this method must hold the pool mutex! func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { - if len(p.idleResources) == 0 { + res, ok := p.idleResources.Pop() + if !ok { return nil } - res := p.idleResources.popBack() res.status = resourceStatusAcquired return res } @@ -347,75 +343,57 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { // This function exists solely only for benchmarking purposes. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { startNano := nanotime() + var waitedForLock bool - - for { - if !p.acquireSem.TryAcquire(1) { - waitedForLock = true - err := p.acquireSem.Acquire(ctx, 1) - if err != nil { - p.canceledAcquireCount.Add(1) - return nil, err - } - } - - p.mux.Lock() - if p.closed { - p.acquireSem.Release(1) - p.mux.Unlock() - return nil, ErrClosedPool - } - - // If a resource is available in the pool. - if res := p.tryAcquireIdleResource(); res != nil { - if waitedForLock { - p.emptyAcquireCount += 1 - } - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) - p.mux.Unlock() - return res, nil - } - - if len(p.allResources) > int(p.maxSize) { - // Unreachable code. - panic("bug: semaphore allowed more acquires than pool allows") - } - - // An idle resource could be "stolen" by AcquireAllIdle at the - // time we already held the semaphore token. In such a case we - // cannot create a new resource because the pool is already - // full. Consequently, we will have to try it once again. - // - // Naturally the same situation with AcquireAllIdle could happen - // even when the pool is not full. But in such a case we can - // just create a new resource. - if len(p.allResources) == int(p.maxSize) { - // We do not release the semaphore here because the - // connection is in reality held by the caller of - // AcquireAllIdle. - p.mux.Unlock() - continue - } - - // The resource is not idle, but there is enough space to create one. - res := p.createNewResource() - p.mux.Unlock() - - res, err := p.initResourceValue(ctx, res) + if !p.acquireSem.TryAcquire(1) { + waitedForLock = true + err := p.acquireSem.Acquire(ctx, 1) if err != nil { + p.canceledAcquireCount.Add(1) return nil, err } + } - p.mux.Lock() - defer p.mux.Unlock() + p.mux.Lock() + if p.closed { + p.acquireSem.Release(1) + p.mux.Unlock() + return nil, ErrClosedPool + } - p.emptyAcquireCount += 1 + // If a resource is available in the pool. + if res := p.tryAcquireIdleResource(); res != nil { + if waitedForLock { + p.emptyAcquireCount += 1 + } p.acquireCount += 1 p.acquireDuration += time.Duration(nanotime() - startNano) - + p.mux.Unlock() return res, nil } + + if len(p.allResources) >= int(p.maxSize) { + // Unreachable code. + panic("bug: semaphore allowed more acquires than pool allows") + } + + // The resource is not idle, but there is enough space to create one. + res := p.createNewResource() + p.mux.Unlock() + + res, err := p.initResourceValue(ctx, res) + if err != nil { + return nil, err + } + + p.mux.Lock() + defer p.mux.Unlock() + + p.emptyAcquireCount += 1 + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + + return res, nil } func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) { @@ -494,24 +472,11 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { return res, nil } - if len(p.allResources) > int(p.maxSize) { + if len(p.allResources) >= int(p.maxSize) { // Unreachable code. panic("bug: semaphore allowed more acquires than pool allows") } - // An idle resource could be "stolen" by AcquireAllIdle at the time we - // already held the semaphore token. In such a case we cannot create a - // new resource because the pool is already full. - // - // Naturally the same situation with AcquireAllIdle could happen even - // when the pool is not full. But in such a case we can just create a - // new resource. - if len(p.allResources) == int(p.maxSize) { - // We do not release the semaphore here because the connection - // is in reality held by the caller of AcquireAllIdle. - return nil, ErrNotAvailable - } - res := p.createNewResource() go func() { value, err := p.constructor(ctx) @@ -531,7 +496,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { res.value = value res.status = resourceStatusIdle - p.idleResources.append(res) + p.idleResources.Push(res) }() return nil, ErrNotAvailable @@ -556,6 +521,9 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // the minimal number of tokens that has been present over the whole process // will be acquired. This is sufficient for the use-case we have in this // package. +// +// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to +// upstream. https://github.com/golang/sync/pull/19 func acquireSemAll(sem *semaphore.Weighted, num int) int { if sem.TryAcquire(int64(num)) { return num @@ -583,7 +551,7 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { return nil } - numIdle := len(p.idleResources) + numIdle := p.idleResources.Len() if numIdle == 0 { return nil } @@ -592,17 +560,27 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { // TryAcquire cannot block, the fact that we hold mutex locked and try // to acquire semaphore cannot result in dead-lock. // - // TODO: Replace this with acquireSem.TryAcquireAll() if it gets to - // upstream. https://github.com/golang/sync/pull/19 - _ = acquireSemAll(p.acquireSem, numIdle) + // Because the mutex is locked, no parallel Release can run. This + // implies that the number of tokens can only decrease because some + // Acquire/TryAcquire call can consume the semaphore token. Consequently + // acquired is always less or equal to numIdle. Moreover if acquired < + // numIdle, then there are some parallel Acquire/TryAcquire calls that + // will take the remaining idle connections. + acquired := acquireSemAll(p.acquireSem, numIdle) - idle := make([]*Resource[T], numIdle) + idle := make([]*Resource[T], acquired) for i := range idle { - res := p.idleResources.popBack() + res, _ := p.idleResources.Pop() res.status = resourceStatusAcquired idle[i] = res } + // We have to bump the generation to ensure that Acquire/TryAcquire + // calls running in parallel (those which caused acquired < numIdle) + // will consume old connections and not freshly released connections + // instead. + p.idleResources.NextGen() + return idle } @@ -642,7 +620,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { return ErrClosedPool } p.allResources.append(res) - p.idleResources.append(res) + p.idleResources.Push(res) return nil } @@ -658,8 +636,7 @@ func (p *Pool[T]) Reset() { p.resetCount++ - for len(p.idleResources) > 0 { - res := p.idleResources.popBack() + for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { p.allResources.remove(res) go p.destructResourceValue(res.value) } @@ -677,7 +654,7 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) } else { res.lastUsedNano = lastUsedNano res.status = resourceStatusIdle - p.idleResources.append(res) + p.idleResources.Push(res) } } @@ -685,9 +662,11 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) // pool Remove will panic. func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { p.destructResourceValue(res.value) + p.mux.Lock() defer p.mux.Unlock() defer p.acquireSem.Release(1) + p.allResources.remove(res) }