diff --git a/internal/circ/queue.go b/internal/circ/queue.go deleted file mode 100644 index 7147f47..0000000 --- a/internal/circ/queue.go +++ /dev/null @@ -1,56 +0,0 @@ -package circ - -type Queue[T any] struct { - arr []T - begin int - len int -} - -func NewQueue[T any](capacity int) *Queue[T] { - return &Queue[T]{ - // TODO: Do not preallocate whole capacity of Go upstream - // accepts this: https://github.com/golang/go/issues/55978 - arr: make([]T, capacity), - } -} - -func (q *Queue[T]) Cap() int { return len(q.arr) } -func (q *Queue[T]) Len() int { return q.len } - -func (q *Queue[T]) end() int { - e := q.begin + q.len - if l := len(q.arr); e >= l { - e -= l - } - - return e -} - -func (q *Queue[T]) Enqueue(elem T) { - if q.len == len(q.arr) { - panic("enqueue: queue is full") - } - - q.arr[q.end()] = elem - q.len++ -} - -func (q *Queue[T]) Dequeue() T { - if q.len < 1 { - panic("dequeue: queue is empty") - } - - elem := q.arr[q.begin] - - // Avoid memory leaks if T is pointer or contains pointers. - var zeroVal T - q.arr[q.begin] = zeroVal - - q.len-- - q.begin++ - if q.begin == len(q.arr) { - q.begin = 0 - } - - return elem -} diff --git a/internal/circ/queue_test.go b/internal/circ/queue_test.go deleted file mode 100644 index e169251..0000000 --- a/internal/circ/queue_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package circ_test - -import ( - "fmt" - "io" - "testing" - - "github.com/jackc/puddle/v2/internal/circ" - "github.com/stretchr/testify/require" -) - -func TestQueue_EnqueueDequeue(t *testing.T) { - r := require.New(t) - - q := circ.NewQueue[int](10) - r.Equal(10, q.Cap()) - - for i := 0; i < 10; i++ { - q.Enqueue(i) - r.Equal(i+1, q.Len()) - } - - r.Panics(func() { q.Enqueue(10) }) - r.Equal(10, q.Len()) - - for i := 0; i < 10; i++ { - j := q.Dequeue() - r.Equal(i, j) - - r.Equal(10-i-1, q.Len()) - } - - r.Panics(func() { q.Dequeue() }) - r.Equal(0, q.Len()) -} - -func TestQueue_EnqueueDequeueOverflow(t *testing.T) { - r := require.New(t) - - q := circ.NewQueue[int](10) - r.Equal(10, q.Cap()) - - for i := 0; i < 10; i++ { - q.Enqueue(i) - r.Equal(i+1, q.Len()) - } - - r.Panics(func() { q.Enqueue(10) }) - r.Equal(10, q.Len()) - - for i := 0; i < 5; i++ { - j := q.Dequeue() - r.Equal(i, j) - - r.Equal(10-i-1, q.Len()) - } - - for i := 10; i < 15; i++ { - q.Enqueue(i) - r.Equal(i-5+1, q.Len()) - } - - for i := 0; i < 10; i++ { - j := q.Dequeue() - r.Equal(i+5, j) - - r.Equal(10-i-1, q.Len()) - } - - r.Panics(func() { q.Dequeue() }) - r.Equal(0, q.Len()) -} - -func BenchmarkArrayAppend(b *testing.B) { - arr := make([]int, 0, b.N) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - arr = append(arr, i) - } - - // Make sure that the Go compiler doesn't optimize writes above. - b.StopTimer() - for i := 0; i < b.N; i++ { - fmt.Fprintf(io.Discard, "%d\n", arr[i]) - } -} - -func BenchmarkArrayWrite(b *testing.B) { - arr := make([]int, b.N) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - arr[i] = i - } - - // Make sure that the Go compiler doesn't optimize writes above. - b.StopTimer() - for i := 0; i < b.N; i++ { - fmt.Fprintf(io.Discard, "%d\n", arr[i]) - } -} - -func BenchmarkEnqueue(b *testing.B) { - q := circ.NewQueue[int](b.N) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - q.Enqueue(i) - } - - // Make sure that the Go compiler doesn't optimize writes above. - b.StopTimer() - for i := 0; i < b.N; i++ { - fmt.Fprintf(io.Discard, "%d\n", q.Dequeue()) - } -} - -func BenchmarkChanWrite(b *testing.B) { - // Chennels are another way how to represent a queue. - ch := make(chan int, b.N) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ch <- i - } - - // Make sure that the Go compiler doesn't optimize writes above. - b.StopTimer() - for i := 0; i < b.N; i++ { - fmt.Fprintf(io.Discard, "%d\n", <-ch) - } -} - -func BenchmarkDequeue(b *testing.B) { - q := circ.NewQueue[int](b.N) - - for i := 0; i < b.N; i++ { - q.Enqueue(i) - } - - out := make([]int, b.N) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - out[i] = q.Dequeue() - } -} diff --git a/internal_test.go b/internal_test.go deleted file mode 100644 index a061b73..0000000 --- a/internal_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package puddle - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRemoveResourcePanicsWithBugReportIfResourceDoesNotExist(t *testing.T) { - s := []*Resource[any]{new(Resource[any]), new(Resource[any]), new(Resource[any])} - assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource[any])) }) -} diff --git a/pool.go b/pool.go index ad24339..0a1cfee 100644 --- a/pool.go +++ b/pool.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/jackc/puddle/v2/internal/circ" "go.uber.org/atomic" "golang.org/x/sync/semaphore" ) @@ -120,8 +119,8 @@ type Pool[T any] struct { acquireSem *semaphore.Weighted destructWG sync.WaitGroup - allResources []*Resource[T] - idleResources *circ.Queue[*Resource[T]] + allResources resList[T] + idleResources resList[T] constructor Constructor[T] destructor Destructor[T] @@ -155,7 +154,6 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { return &Pool[T]{ acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), - idleResources: circ.NewQueue[*Resource[T]](int(config.MaxSize)), maxSize: config.MaxSize, constructor: config.Constructor, destructor: config.Destructor, @@ -178,12 +176,11 @@ func (p *Pool[T]) Close() { p.closed = true p.cancelBaseAcquireCtx() - for p.idleResources.Len() > 0 { - res := p.idleResources.Dequeue() - p.allResources = removeResource(p.allResources, res) + for len(p.idleResources) > 0 { + res := p.idleResources.popBack() + p.allResources.remove(res) go p.destructResourceValue(res.value) } - p.idleResources = nil } // Stat is a snapshot of Pool statistics. @@ -283,11 +280,13 @@ func (p *Pool[T]) Stat() *Stat { // // WARNING: Caller of this method must hold the pool mutex! func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { - if p.idleResources.Len() == 0 { + if len(p.idleResources) == 0 { return nil } - return p.idleResources.Dequeue() + res := p.idleResources.popBack() + res.status = resourceStatusAcquired + return res } // createNewResource creates a new resource and inserts it into list of pool @@ -303,7 +302,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] { status: resourceStatusConstructing, } - p.allResources = append(p.allResources, res) + p.allResources.append(res) p.destructWG.Add(1) return res @@ -329,69 +328,108 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { } // acquire is a continuation of Acquire function that doesn't check context -// validity. This function exists separatly only for benchmarking purposes. +// validity. +// +// This function exists solely only for benchmarking purposes. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { startNano := nanotime() - var waitedForLock bool - if !p.acquireSem.TryAcquire(1) { - waitedForLock = true - err := p.acquireSem.Acquire(ctx, 1) + + for { + if !p.acquireSem.TryAcquire(1) { + waitedForLock = true + err := p.acquireSem.Acquire(ctx, 1) + if err != nil { + p.canceledAcquireCount.Add(1) + return nil, err + } + } + + p.mux.Lock() + if p.closed { + p.mux.Unlock() + p.acquireSem.Release(1) + return nil, ErrClosedPool + } + + // If a resource is available in the pool. + if res := p.tryAcquireIdleResource(); res != nil { + if waitedForLock { + p.emptyAcquireCount += 1 + } + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.mux.Unlock() + return res, nil + } + + if len(p.allResources) > int(p.maxSize) { + // Unreachable code. + p.mux.Unlock() + p.acquireSem.Release(1) + panic("bug: semaphore allowed more acquires than pool allows") + } + + // An idle resource could be "stolen" by AcquireAllIdle at the + // time we already held the semaphore token. In such a case we + // cannot create a new resource because the pool is already + // full. Consequently, we will have to try it once again. + // + // Naturally the same situation with AcquireAllIdle could happen + // even when the pool is not full. But in such a case we can + // just create a new resource. + if len(p.allResources) == int(p.maxSize) { + // We do not release the semaphore here because the + // connection is in reality held by the caller of + // AcquireAllIdle. + p.mux.Unlock() + continue + } + + // The resource is not idle, but there is enough space to create one. + res := p.createNewResource() + p.mux.Unlock() + + res, err := p.initResourceValue(ctx, res) if err != nil { - p.canceledAcquireCount.Add(1) return nil, err } - } - p.mux.Lock() - if p.closed { - p.mux.Unlock() - p.acquireSem.Release(1) - return nil, ErrClosedPool - } + p.mux.Lock() + defer p.mux.Unlock() - // If a resource is available in the pool. - if res := p.tryAcquireIdleResource(); res != nil { - res.status = resourceStatusAcquired - if waitedForLock { - p.emptyAcquireCount += 1 - } + p.emptyAcquireCount += 1 p.acquireCount += 1 p.acquireDuration += time.Duration(nanotime() - startNano) - p.mux.Unlock() + return res, nil } +} - if len(p.allResources) >= int(p.maxSize) { - p.mux.Unlock() - p.acquireSem.Release(1) - // Unreachable code. - panic("bug: semaphore allowed more acquires than pool allows") - } - - // The resource is not available, but there is enough space to create - // one. - res := p.createNewResource() - p.mux.Unlock() - - // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling - // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 - constructErrCh := make(chan error) +func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) { + // Create the resource in a goroutine to immediately return from Acquire + // if ctx is canceled without also canceling the constructor. + // + // See: + // - https://github.com/jackc/pgx/issues/1287 + // - https://github.com/jackc/pgx/issues/1259 + constructErrChan := make(chan error) go func() { constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) - value, err := p.constructResourceValue(constructorCtx) + value, err := p.constructor(constructorCtx) if err != nil { p.mux.Lock() - p.allResources = removeResource(p.allResources, res) + p.allResources.remove(res) p.destructWG.Done() p.mux.Unlock() - // We won't take the resource so we allow someone else - // to run Acquire. + // The resource won't be acquired because its + // construction failed. We have to allow someone else to + // take that resouce. p.acquireSem.Release(1) select { - case constructErrCh <- err: + case constructErrChan <- err: case <-ctx.Done(): // The caller is cancelled, so no-one awaits the // error. This branch avoid goroutine leak. @@ -402,13 +440,9 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { res.value = value res.status = resourceStatusAcquired + // This select works because the channel is unbuffered. select { - case constructErrCh <- nil: - p.mux.Lock() - p.emptyAcquireCount += 1 - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) - p.mux.Unlock() + case constructErrChan <- nil: case <-ctx.Done(): p.releaseAcquiredResource(res, res.lastUsedNano) } @@ -418,7 +452,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { case <-ctx.Done(): p.canceledAcquireCount.Add(1) return nil, ctx.Err() - case err := <-constructErrCh: + case err := <-constructErrChan: if err != nil { return nil, err } @@ -445,18 +479,30 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // If a resource is available now if res := p.tryAcquireIdleResource(); res != nil { p.acquireCount += 1 - res.status = resourceStatusAcquired return res, nil } - if len(p.allResources) >= int(p.maxSize) { + if len(p.allResources) > int(p.maxSize) { // Unreachable code. panic("bug: semaphore allowed more acquires than pool allows") } + // An idle resource could be "stolen" by AcquireAllIdle at the time we + // already held the semaphore token. In such a case we cannot create a + // new resource because the pool is already full. + // + // Naturally the same situation with AcquireAllIdle could happen even + // when the pool is not full. But in such a case we can just create a + // new resource. + if len(p.allResources) == int(p.maxSize) { + // We do not release the semaphore here because the connection + // is in reality held by the caller of AcquireAllIdle. + return nil, ErrNotAvailable + } + res := p.createNewResource() go func() { - value, err := p.constructResourceValue(ctx) + value, err := p.constructor(ctx) // We have to create the resource and only then release the // semaphore - For the time being there is no resource that // someone could acquire. @@ -465,14 +511,14 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { defer p.mux.Unlock() if err != nil { - p.allResources = removeResource(p.allResources, res) + p.allResources.remove(res) p.destructWG.Done() return } res.value = value res.status = resourceStatusIdle - p.idleResources.Enqueue(res) + p.idleResources.append(res) }() return nil, ErrNotAvailable @@ -484,10 +530,10 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // // For the time being, semaphore doesn't allow to acquire all tokens atomically // (see https://github.com/golang/sync/pull/19). We simulate this by trying all -// powers of 2 that are less or equal to max. +// powers of 2 that are less or equal to num. // // For example, let's immagine we have 19 free tokens in the semaphore which in -// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then max +// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then num // is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 tokens. // Out of those, the acquire of 16, 2 and 1 tokens will succeed. // @@ -497,57 +543,54 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // the minimal number of tokens that has been present over the whole process // will be acquired. This is sufficient for the use-case we have in this // package. -func acquireSemAll[T ints](sem *semaphore.Weighted, max T) int { - var cnt int - for i := int(log2Int(max)); i >= 0; i-- { +func acquireSemAll(sem *semaphore.Weighted, num int) int { + if sem.TryAcquire(int64(num)) { + return num + } + + var acquired int + for i := int(log2Int(num)); i >= 0; i-- { val := int(1) << i if sem.TryAcquire(int64(val)) { - cnt += val + acquired += val } } - return cnt + return acquired } // AcquireAllIdle acquires all currently idle resources. Its intended use is for // health check and keep-alive functionality. It does not update pool // statistics. func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { - // TODO: Replace this with acquireSem.TryAcqireAll() if it gets to - // upstream. https://github.com/golang/sync/pull/19 - cnt := acquireSemAll(p.acquireSem, int64(p.maxSize)) - if cnt == 0 { - return nil - } - p.mux.Lock() defer p.mux.Unlock() if p.closed { - p.acquireSem.Release(int64(cnt)) return nil } - // Some resources from the maxSize limit do not have to exist (i.e. be - // idle). - if diff := cnt - p.idleResources.Len(); diff > 0 { - p.acquireSem.Release(int64(diff)) - cnt = p.idleResources.Len() + numIdle := len(p.idleResources) + if numIdle == 0 { + return nil } - // We are not guaranteed that idleResources are empty after this loop. - // But we are guaranteed that all resources remain in idleResources - // after this loop will (1) either be acquired soon (semaphore was - // already acquired for them) or (2) were released after start of this - // function. - resources := make([]*Resource[T], cnt) - for i := 0; i < cnt; i++ { - res := p.idleResources.Dequeue() + // In acquireSemAll we use only TryAcquire and not Acquire. Because + // TryAcquire cannot block, the fact that we hold mutex locked and try + // to acquire semaphore cannot result in dead-lock. + // + // TODO: Replace this with acquireSem.TryAcqireAll() if it gets to + // upstream. https://github.com/golang/sync/pull/19 + _ = acquireSemAll(p.acquireSem, numIdle) + + idle := make([]*Resource[T], numIdle) + for i := range idle { + res := p.idleResources.popBack() res.status = resourceStatusAcquired - resources[i] = res + idle[i] = res } - return resources + return idle } // CreateResource constructs a new resource without acquiring it. @@ -562,7 +605,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { p.destructWG.Add(1) p.mux.Unlock() - value, err := p.constructResourceValue(ctx) + value, err := p.constructor(ctx) if err != nil { p.destructWG.Done() return err @@ -585,8 +628,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { go p.destructResourceValue(res.value) return ErrClosedPool } - p.allResources = append(p.allResources, res) - p.idleResources.Enqueue(res) + p.allResources.append(res) + p.idleResources.append(res) return nil } @@ -602,9 +645,9 @@ func (p *Pool[T]) Reset() { p.resetCount++ - for p.idleResources.Len() > 0 { - res := p.idleResources.Dequeue() - p.allResources = removeResource(p.allResources, res) + for len(p.idleResources) > 0 { + res := p.idleResources.popBack() + p.allResources.remove(res) go p.destructResourceValue(res.value) } } @@ -616,12 +659,12 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) defer p.mux.Unlock() if p.closed || res.poolResetCount != p.resetCount { - p.allResources = removeResource(p.allResources, res) + p.allResources.remove(res) go p.destructResourceValue(res.value) } else { res.lastUsedNano = lastUsedNano res.status = resourceStatusIdle - p.idleResources.Enqueue(res) + p.idleResources.append(res) } } @@ -632,7 +675,7 @@ func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { defer p.acquireSem.Release(1) p.mux.Lock() defer p.mux.Unlock() - p.allResources = removeResource(p.allResources, res) + p.allResources.remove(res) } func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { @@ -640,27 +683,11 @@ func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { p.mux.Lock() defer p.mux.Unlock() - p.allResources = removeResource(p.allResources, res) + p.allResources.remove(res) res.status = resourceStatusHijacked p.destructWG.Done() // not responsible for destructing hijacked resources } -func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] { - for i := range slice { - if slice[i] == res { - slice[i] = slice[len(slice)-1] - slice[len(slice)-1] = nil // Avoid memory leak - return slice[:len(slice)-1] - } - } - - panic("BUG: removeResource could not find res in slice") -} - -func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) { - return p.constructor(ctx) -} - func (p *Pool[T]) destructResourceValue(value T) { p.destructor(value) p.destructWG.Done() diff --git a/resource_list.go b/resource_list.go new file mode 100644 index 0000000..9938bfb --- /dev/null +++ b/resource_list.go @@ -0,0 +1,27 @@ +package puddle + +type resList[T any] []*Resource[T] + +func (l *resList[T]) append(val *Resource[T]) { *l = append(*l, val) } + +func (l *resList[T]) popBack() *Resource[T] { + idx := len(*l) - 1 + val := (*l)[idx] + (*l)[idx] = nil // Avoid memory leak + *l = (*l)[:idx] + + return val +} + +func (l *resList[T]) remove(val *Resource[T]) { + for i, elem := range *l { + if elem == val { + (*l)[i] = (*l)[len(*l)-1] + (*l)[len(*l)-1] = nil // Avoid memory leak + (*l) = (*l)[:len(*l)-1] + return + } + } + + panic("BUG: removeResource could not find res in slice") +} diff --git a/resource_list_test.go b/resource_list_test.go new file mode 100644 index 0000000..f73b78d --- /dev/null +++ b/resource_list_test.go @@ -0,0 +1,21 @@ +package puddle + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResList_PanicsWithBugReportIfResourceDoesNotExist(t *testing.T) { + arr := []*Resource[any]{ + new(Resource[any]), + new(Resource[any]), + new(Resource[any]), + } + + list := resList[any](arr) + + assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { + list.remove(new(Resource[any])) + }) +}