2
0

[pool] Replace queue with stack

This commit is contained in:
Jan Dubsky
2022-10-10 18:15:52 +02:00
committed by Jack Christensen
parent 30b91519a0
commit 89668fae42
6 changed files with 194 additions and 335 deletions
-56
View File
@@ -1,56 +0,0 @@
package circ
type Queue[T any] struct {
arr []T
begin int
len int
}
func NewQueue[T any](capacity int) *Queue[T] {
return &Queue[T]{
// TODO: Do not preallocate whole capacity of Go upstream
// accepts this: https://github.com/golang/go/issues/55978
arr: make([]T, capacity),
}
}
func (q *Queue[T]) Cap() int { return len(q.arr) }
func (q *Queue[T]) Len() int { return q.len }
func (q *Queue[T]) end() int {
e := q.begin + q.len
if l := len(q.arr); e >= l {
e -= l
}
return e
}
func (q *Queue[T]) Enqueue(elem T) {
if q.len == len(q.arr) {
panic("enqueue: queue is full")
}
q.arr[q.end()] = elem
q.len++
}
func (q *Queue[T]) Dequeue() T {
if q.len < 1 {
panic("dequeue: queue is empty")
}
elem := q.arr[q.begin]
// Avoid memory leaks if T is pointer or contains pointers.
var zeroVal T
q.arr[q.begin] = zeroVal
q.len--
q.begin++
if q.begin == len(q.arr) {
q.begin = 0
}
return elem
}
-148
View File
@@ -1,148 +0,0 @@
package circ_test
import (
"fmt"
"io"
"testing"
"github.com/jackc/puddle/v2/internal/circ"
"github.com/stretchr/testify/require"
)
func TestQueue_EnqueueDequeue(t *testing.T) {
r := require.New(t)
q := circ.NewQueue[int](10)
r.Equal(10, q.Cap())
for i := 0; i < 10; i++ {
q.Enqueue(i)
r.Equal(i+1, q.Len())
}
r.Panics(func() { q.Enqueue(10) })
r.Equal(10, q.Len())
for i := 0; i < 10; i++ {
j := q.Dequeue()
r.Equal(i, j)
r.Equal(10-i-1, q.Len())
}
r.Panics(func() { q.Dequeue() })
r.Equal(0, q.Len())
}
func TestQueue_EnqueueDequeueOverflow(t *testing.T) {
r := require.New(t)
q := circ.NewQueue[int](10)
r.Equal(10, q.Cap())
for i := 0; i < 10; i++ {
q.Enqueue(i)
r.Equal(i+1, q.Len())
}
r.Panics(func() { q.Enqueue(10) })
r.Equal(10, q.Len())
for i := 0; i < 5; i++ {
j := q.Dequeue()
r.Equal(i, j)
r.Equal(10-i-1, q.Len())
}
for i := 10; i < 15; i++ {
q.Enqueue(i)
r.Equal(i-5+1, q.Len())
}
for i := 0; i < 10; i++ {
j := q.Dequeue()
r.Equal(i+5, j)
r.Equal(10-i-1, q.Len())
}
r.Panics(func() { q.Dequeue() })
r.Equal(0, q.Len())
}
func BenchmarkArrayAppend(b *testing.B) {
arr := make([]int, 0, b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
arr = append(arr, i)
}
// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", arr[i])
}
}
func BenchmarkArrayWrite(b *testing.B) {
arr := make([]int, b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
arr[i] = i
}
// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", arr[i])
}
}
func BenchmarkEnqueue(b *testing.B) {
q := circ.NewQueue[int](b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Enqueue(i)
}
// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", q.Dequeue())
}
}
func BenchmarkChanWrite(b *testing.B) {
// Chennels are another way how to represent a queue.
ch := make(chan int, b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
// Make sure that the Go compiler doesn't optimize writes above.
b.StopTimer()
for i := 0; i < b.N; i++ {
fmt.Fprintf(io.Discard, "%d\n", <-ch)
}
}
func BenchmarkDequeue(b *testing.B) {
q := circ.NewQueue[int](b.N)
for i := 0; i < b.N; i++ {
q.Enqueue(i)
}
out := make([]int, b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
out[i] = q.Dequeue()
}
}
-12
View File
@@ -1,12 +0,0 @@
package puddle
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRemoveResourcePanicsWithBugReportIfResourceDoesNotExist(t *testing.T) {
s := []*Resource[any]{new(Resource[any]), new(Resource[any]), new(Resource[any])}
assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource[any])) })
}
+146 -119
View File
@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/jackc/puddle/v2/internal/circ"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
)
@@ -120,8 +119,8 @@ type Pool[T any] struct {
acquireSem *semaphore.Weighted
destructWG sync.WaitGroup
allResources []*Resource[T]
idleResources *circ.Queue[*Resource[T]]
allResources resList[T]
idleResources resList[T]
constructor Constructor[T]
destructor Destructor[T]
@@ -155,7 +154,6 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
return &Pool[T]{
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
idleResources: circ.NewQueue[*Resource[T]](int(config.MaxSize)),
maxSize: config.MaxSize,
constructor: config.Constructor,
destructor: config.Destructor,
@@ -178,12 +176,11 @@ func (p *Pool[T]) Close() {
p.closed = true
p.cancelBaseAcquireCtx()
for p.idleResources.Len() > 0 {
res := p.idleResources.Dequeue()
p.allResources = removeResource(p.allResources, res)
for len(p.idleResources) > 0 {
res := p.idleResources.popBack()
p.allResources.remove(res)
go p.destructResourceValue(res.value)
}
p.idleResources = nil
}
// Stat is a snapshot of Pool statistics.
@@ -283,11 +280,13 @@ func (p *Pool[T]) Stat() *Stat {
//
// WARNING: Caller of this method must hold the pool mutex!
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
if p.idleResources.Len() == 0 {
if len(p.idleResources) == 0 {
return nil
}
return p.idleResources.Dequeue()
res := p.idleResources.popBack()
res.status = resourceStatusAcquired
return res
}
// createNewResource creates a new resource and inserts it into list of pool
@@ -303,7 +302,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] {
status: resourceStatusConstructing,
}
p.allResources = append(p.allResources, res)
p.allResources.append(res)
p.destructWG.Add(1)
return res
@@ -329,69 +328,108 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
}
// acquire is a continuation of Acquire function that doesn't check context
// validity. This function exists separatly only for benchmarking purposes.
// validity.
//
// This function exists solely only for benchmarking purposes.
func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
startNano := nanotime()
var waitedForLock bool
if !p.acquireSem.TryAcquire(1) {
waitedForLock = true
err := p.acquireSem.Acquire(ctx, 1)
for {
if !p.acquireSem.TryAcquire(1) {
waitedForLock = true
err := p.acquireSem.Acquire(ctx, 1)
if err != nil {
p.canceledAcquireCount.Add(1)
return nil, err
}
}
p.mux.Lock()
if p.closed {
p.mux.Unlock()
p.acquireSem.Release(1)
return nil, ErrClosedPool
}
// If a resource is available in the pool.
if res := p.tryAcquireIdleResource(); res != nil {
if waitedForLock {
p.emptyAcquireCount += 1
}
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.mux.Unlock()
return res, nil
}
if len(p.allResources) > int(p.maxSize) {
// Unreachable code.
p.mux.Unlock()
p.acquireSem.Release(1)
panic("bug: semaphore allowed more acquires than pool allows")
}
// An idle resource could be "stolen" by AcquireAllIdle at the
// time we already held the semaphore token. In such a case we
// cannot create a new resource because the pool is already
// full. Consequently, we will have to try it once again.
//
// Naturally the same situation with AcquireAllIdle could happen
// even when the pool is not full. But in such a case we can
// just create a new resource.
if len(p.allResources) == int(p.maxSize) {
// We do not release the semaphore here because the
// connection is in reality held by the caller of
// AcquireAllIdle.
p.mux.Unlock()
continue
}
// The resource is not idle, but there is enough space to create one.
res := p.createNewResource()
p.mux.Unlock()
res, err := p.initResourceValue(ctx, res)
if err != nil {
p.canceledAcquireCount.Add(1)
return nil, err
}
}
p.mux.Lock()
if p.closed {
p.mux.Unlock()
p.acquireSem.Release(1)
return nil, ErrClosedPool
}
p.mux.Lock()
defer p.mux.Unlock()
// If a resource is available in the pool.
if res := p.tryAcquireIdleResource(); res != nil {
res.status = resourceStatusAcquired
if waitedForLock {
p.emptyAcquireCount += 1
}
p.emptyAcquireCount += 1
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.mux.Unlock()
return res, nil
}
}
if len(p.allResources) >= int(p.maxSize) {
p.mux.Unlock()
p.acquireSem.Release(1)
// Unreachable code.
panic("bug: semaphore allowed more acquires than pool allows")
}
// The resource is not available, but there is enough space to create
// one.
res := p.createNewResource()
p.mux.Unlock()
// Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling
// the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259
constructErrCh := make(chan error)
func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
// Create the resource in a goroutine to immediately return from Acquire
// if ctx is canceled without also canceling the constructor.
//
// See:
// - https://github.com/jackc/pgx/issues/1287
// - https://github.com/jackc/pgx/issues/1259
constructErrChan := make(chan error)
go func() {
constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
value, err := p.constructResourceValue(constructorCtx)
value, err := p.constructor(constructorCtx)
if err != nil {
p.mux.Lock()
p.allResources = removeResource(p.allResources, res)
p.allResources.remove(res)
p.destructWG.Done()
p.mux.Unlock()
// We won't take the resource so we allow someone else
// to run Acquire.
// The resource won't be acquired because its
// construction failed. We have to allow someone else to
// take that resouce.
p.acquireSem.Release(1)
select {
case constructErrCh <- err:
case constructErrChan <- err:
case <-ctx.Done():
// The caller is cancelled, so no-one awaits the
// error. This branch avoid goroutine leak.
@@ -402,13 +440,9 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
res.value = value
res.status = resourceStatusAcquired
// This select works because the channel is unbuffered.
select {
case constructErrCh <- nil:
p.mux.Lock()
p.emptyAcquireCount += 1
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.mux.Unlock()
case constructErrChan <- nil:
case <-ctx.Done():
p.releaseAcquiredResource(res, res.lastUsedNano)
}
@@ -418,7 +452,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
case <-ctx.Done():
p.canceledAcquireCount.Add(1)
return nil, ctx.Err()
case err := <-constructErrCh:
case err := <-constructErrChan:
if err != nil {
return nil, err
}
@@ -445,18 +479,30 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
// If a resource is available now
if res := p.tryAcquireIdleResource(); res != nil {
p.acquireCount += 1
res.status = resourceStatusAcquired
return res, nil
}
if len(p.allResources) >= int(p.maxSize) {
if len(p.allResources) > int(p.maxSize) {
// Unreachable code.
panic("bug: semaphore allowed more acquires than pool allows")
}
// An idle resource could be "stolen" by AcquireAllIdle at the time we
// already held the semaphore token. In such a case we cannot create a
// new resource because the pool is already full.
//
// Naturally the same situation with AcquireAllIdle could happen even
// when the pool is not full. But in such a case we can just create a
// new resource.
if len(p.allResources) == int(p.maxSize) {
// We do not release the semaphore here because the connection
// is in reality held by the caller of AcquireAllIdle.
return nil, ErrNotAvailable
}
res := p.createNewResource()
go func() {
value, err := p.constructResourceValue(ctx)
value, err := p.constructor(ctx)
// We have to create the resource and only then release the
// semaphore - For the time being there is no resource that
// someone could acquire.
@@ -465,14 +511,14 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
defer p.mux.Unlock()
if err != nil {
p.allResources = removeResource(p.allResources, res)
p.allResources.remove(res)
p.destructWG.Done()
return
}
res.value = value
res.status = resourceStatusIdle
p.idleResources.Enqueue(res)
p.idleResources.append(res)
}()
return nil, ErrNotAvailable
@@ -484,10 +530,10 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
//
// For the time being, semaphore doesn't allow to acquire all tokens atomically
// (see https://github.com/golang/sync/pull/19). We simulate this by trying all
// powers of 2 that are less or equal to max.
// powers of 2 that are less or equal to num.
//
// For example, let's immagine we have 19 free tokens in the semaphore which in
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then max
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then num
// is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 tokens.
// Out of those, the acquire of 16, 2 and 1 tokens will succeed.
//
@@ -497,57 +543,54 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
// the minimal number of tokens that has been present over the whole process
// will be acquired. This is sufficient for the use-case we have in this
// package.
func acquireSemAll[T ints](sem *semaphore.Weighted, max T) int {
var cnt int
for i := int(log2Int(max)); i >= 0; i-- {
func acquireSemAll(sem *semaphore.Weighted, num int) int {
if sem.TryAcquire(int64(num)) {
return num
}
var acquired int
for i := int(log2Int(num)); i >= 0; i-- {
val := int(1) << i
if sem.TryAcquire(int64(val)) {
cnt += val
acquired += val
}
}
return cnt
return acquired
}
// AcquireAllIdle acquires all currently idle resources. Its intended use is for
// health check and keep-alive functionality. It does not update pool
// statistics.
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
// TODO: Replace this with acquireSem.TryAcqireAll() if it gets to
// upstream. https://github.com/golang/sync/pull/19
cnt := acquireSemAll(p.acquireSem, int64(p.maxSize))
if cnt == 0 {
return nil
}
p.mux.Lock()
defer p.mux.Unlock()
if p.closed {
p.acquireSem.Release(int64(cnt))
return nil
}
// Some resources from the maxSize limit do not have to exist (i.e. be
// idle).
if diff := cnt - p.idleResources.Len(); diff > 0 {
p.acquireSem.Release(int64(diff))
cnt = p.idleResources.Len()
numIdle := len(p.idleResources)
if numIdle == 0 {
return nil
}
// We are not guaranteed that idleResources are empty after this loop.
// But we are guaranteed that all resources remain in idleResources
// after this loop will (1) either be acquired soon (semaphore was
// already acquired for them) or (2) were released after start of this
// function.
resources := make([]*Resource[T], cnt)
for i := 0; i < cnt; i++ {
res := p.idleResources.Dequeue()
// In acquireSemAll we use only TryAcquire and not Acquire. Because
// TryAcquire cannot block, the fact that we hold mutex locked and try
// to acquire semaphore cannot result in dead-lock.
//
// TODO: Replace this with acquireSem.TryAcqireAll() if it gets to
// upstream. https://github.com/golang/sync/pull/19
_ = acquireSemAll(p.acquireSem, numIdle)
idle := make([]*Resource[T], numIdle)
for i := range idle {
res := p.idleResources.popBack()
res.status = resourceStatusAcquired
resources[i] = res
idle[i] = res
}
return resources
return idle
}
// CreateResource constructs a new resource without acquiring it.
@@ -562,7 +605,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
p.destructWG.Add(1)
p.mux.Unlock()
value, err := p.constructResourceValue(ctx)
value, err := p.constructor(ctx)
if err != nil {
p.destructWG.Done()
return err
@@ -585,8 +628,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
go p.destructResourceValue(res.value)
return ErrClosedPool
}
p.allResources = append(p.allResources, res)
p.idleResources.Enqueue(res)
p.allResources.append(res)
p.idleResources.append(res)
return nil
}
@@ -602,9 +645,9 @@ func (p *Pool[T]) Reset() {
p.resetCount++
for p.idleResources.Len() > 0 {
res := p.idleResources.Dequeue()
p.allResources = removeResource(p.allResources, res)
for len(p.idleResources) > 0 {
res := p.idleResources.popBack()
p.allResources.remove(res)
go p.destructResourceValue(res.value)
}
}
@@ -616,12 +659,12 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
defer p.mux.Unlock()
if p.closed || res.poolResetCount != p.resetCount {
p.allResources = removeResource(p.allResources, res)
p.allResources.remove(res)
go p.destructResourceValue(res.value)
} else {
res.lastUsedNano = lastUsedNano
res.status = resourceStatusIdle
p.idleResources.Enqueue(res)
p.idleResources.append(res)
}
}
@@ -632,7 +675,7 @@ func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
defer p.acquireSem.Release(1)
p.mux.Lock()
defer p.mux.Unlock()
p.allResources = removeResource(p.allResources, res)
p.allResources.remove(res)
}
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
@@ -640,27 +683,11 @@ func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
p.mux.Lock()
defer p.mux.Unlock()
p.allResources = removeResource(p.allResources, res)
p.allResources.remove(res)
res.status = resourceStatusHijacked
p.destructWG.Done() // not responsible for destructing hijacked resources
}
func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {
for i := range slice {
if slice[i] == res {
slice[i] = slice[len(slice)-1]
slice[len(slice)-1] = nil // Avoid memory leak
return slice[:len(slice)-1]
}
}
panic("BUG: removeResource could not find res in slice")
}
func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) {
return p.constructor(ctx)
}
func (p *Pool[T]) destructResourceValue(value T) {
p.destructor(value)
p.destructWG.Done()
+27
View File
@@ -0,0 +1,27 @@
package puddle
type resList[T any] []*Resource[T]
func (l *resList[T]) append(val *Resource[T]) { *l = append(*l, val) }
func (l *resList[T]) popBack() *Resource[T] {
idx := len(*l) - 1
val := (*l)[idx]
(*l)[idx] = nil // Avoid memory leak
*l = (*l)[:idx]
return val
}
func (l *resList[T]) remove(val *Resource[T]) {
for i, elem := range *l {
if elem == val {
(*l)[i] = (*l)[len(*l)-1]
(*l)[len(*l)-1] = nil // Avoid memory leak
(*l) = (*l)[:len(*l)-1]
return
}
}
panic("BUG: removeResource could not find res in slice")
}
+21
View File
@@ -0,0 +1,21 @@
package puddle
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestResList_PanicsWithBugReportIfResourceDoesNotExist(t *testing.T) {
arr := []*Resource[any]{
new(Resource[any]),
new(Resource[any]),
new(Resource[any]),
}
list := resList[any](arr)
assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() {
list.remove(new(Resource[any]))
})
}