Use circular queue for idle connections
This commit is contained in:
committed by
Jack Christensen
parent
021588b93e
commit
2c35738882
@@ -0,0 +1,56 @@
|
|||||||
|
package circ
|
||||||
|
|
||||||
|
type Queue[T any] struct {
|
||||||
|
arr []T
|
||||||
|
begin int
|
||||||
|
len int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueue[T any](capacity int) *Queue[T] {
|
||||||
|
return &Queue[T]{
|
||||||
|
// TODO: Do not preallocate whole capacity of Go upstream
|
||||||
|
// accepts this: https://github.com/golang/go/issues/55978
|
||||||
|
arr: make([]T, capacity),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue[T]) Cap() int { return len(q.arr) }
|
||||||
|
func (q *Queue[T]) Len() int { return q.len }
|
||||||
|
|
||||||
|
func (q *Queue[T]) end() int {
|
||||||
|
e := q.begin + q.len
|
||||||
|
if l := len(q.arr); e >= l {
|
||||||
|
e -= l
|
||||||
|
}
|
||||||
|
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue[T]) Enqueue(elem T) {
|
||||||
|
if q.len == len(q.arr) {
|
||||||
|
panic("enqueue: queue is full")
|
||||||
|
}
|
||||||
|
|
||||||
|
q.arr[q.end()] = elem
|
||||||
|
q.len++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue[T]) Dequeue() T {
|
||||||
|
if q.len < 1 {
|
||||||
|
panic("dequeue: queue is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
elem := q.arr[q.begin]
|
||||||
|
|
||||||
|
// Avoid memory leaks if T is pointer or contains pointers.
|
||||||
|
var zeroVal T
|
||||||
|
q.arr[q.begin] = zeroVal
|
||||||
|
|
||||||
|
q.len--
|
||||||
|
q.begin++
|
||||||
|
if q.begin == len(q.arr) {
|
||||||
|
q.begin = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return elem
|
||||||
|
}
|
||||||
@@ -0,0 +1,148 @@
|
|||||||
|
package circ_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jackc/puddle/v2/internal/circ"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQueue_EnqueueDequeue(t *testing.T) {
|
||||||
|
r := require.New(t)
|
||||||
|
|
||||||
|
q := circ.NewQueue[int](10)
|
||||||
|
r.Equal(10, q.Cap())
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
q.Enqueue(i)
|
||||||
|
r.Equal(i+1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Panics(func() { q.Enqueue(10) })
|
||||||
|
r.Equal(10, q.Len())
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
j := q.Dequeue()
|
||||||
|
r.Equal(i, j)
|
||||||
|
|
||||||
|
r.Equal(10-i-1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Panics(func() { q.Dequeue() })
|
||||||
|
r.Equal(0, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQueue_EnqueueDequeueOverflow(t *testing.T) {
|
||||||
|
r := require.New(t)
|
||||||
|
|
||||||
|
q := circ.NewQueue[int](10)
|
||||||
|
r.Equal(10, q.Cap())
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
q.Enqueue(i)
|
||||||
|
r.Equal(i+1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Panics(func() { q.Enqueue(10) })
|
||||||
|
r.Equal(10, q.Len())
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
j := q.Dequeue()
|
||||||
|
r.Equal(i, j)
|
||||||
|
|
||||||
|
r.Equal(10-i-1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 10; i < 15; i++ {
|
||||||
|
q.Enqueue(i)
|
||||||
|
r.Equal(i-5+1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
j := q.Dequeue()
|
||||||
|
r.Equal(i+5, j)
|
||||||
|
|
||||||
|
r.Equal(10-i-1, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Panics(func() { q.Dequeue() })
|
||||||
|
r.Equal(0, q.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkArrayAppend(b *testing.B) {
|
||||||
|
arr := make([]int, 0, b.N)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
arr = append(arr, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that the Go compiler doesn't optimize writes above.
|
||||||
|
b.StopTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fmt.Fprintf(io.Discard, "%d\n", arr[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkArrayWrite(b *testing.B) {
|
||||||
|
arr := make([]int, b.N)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
arr[i] = i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that the Go compiler doesn't optimize writes above.
|
||||||
|
b.StopTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fmt.Fprintf(io.Discard, "%d\n", arr[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkEnqueue(b *testing.B) {
|
||||||
|
q := circ.NewQueue[int](b.N)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Enqueue(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that the Go compiler doesn't optimize writes above.
|
||||||
|
b.StopTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fmt.Fprintf(io.Discard, "%d\n", q.Dequeue())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkChanWrite(b *testing.B) {
|
||||||
|
// Chennels are another way how to represent a queue.
|
||||||
|
ch := make(chan int, b.N)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ch <- i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that the Go compiler doesn't optimize writes above.
|
||||||
|
b.StopTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
fmt.Fprintf(io.Discard, "%d\n", <-ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkDequeue(b *testing.B) {
|
||||||
|
q := circ.NewQueue[int](b.N)
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
q.Enqueue(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]int, b.N)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
out[i] = q.Dequeue()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/puddle/v2/internal/circ"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -120,7 +121,7 @@ type Pool[T any] struct {
|
|||||||
destructWG sync.WaitGroup
|
destructWG sync.WaitGroup
|
||||||
|
|
||||||
allResources []*Resource[T]
|
allResources []*Resource[T]
|
||||||
idleResources []*Resource[T]
|
idleResources *circ.Queue[*Resource[T]]
|
||||||
|
|
||||||
constructor Constructor[T]
|
constructor Constructor[T]
|
||||||
destructor Destructor[T]
|
destructor Destructor[T]
|
||||||
@@ -154,6 +155,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
|
|||||||
|
|
||||||
return &Pool[T]{
|
return &Pool[T]{
|
||||||
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
|
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
|
||||||
|
idleResources: circ.NewQueue[*Resource[T]](int(config.MaxSize)),
|
||||||
maxSize: config.MaxSize,
|
maxSize: config.MaxSize,
|
||||||
constructor: config.Constructor,
|
constructor: config.Constructor,
|
||||||
destructor: config.Destructor,
|
destructor: config.Destructor,
|
||||||
@@ -176,7 +178,8 @@ func (p *Pool[T]) Close() {
|
|||||||
p.closed = true
|
p.closed = true
|
||||||
p.cancelBaseAcquireCtx()
|
p.cancelBaseAcquireCtx()
|
||||||
|
|
||||||
for _, res := range p.idleResources {
|
for p.idleResources.Len() > 0 {
|
||||||
|
res := p.idleResources.Dequeue()
|
||||||
p.allResources = removeResource(p.allResources, res)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
go p.destructResourceValue(res.value)
|
go p.destructResourceValue(res.value)
|
||||||
}
|
}
|
||||||
@@ -280,15 +283,11 @@ func (p *Pool[T]) Stat() *Stat {
|
|||||||
//
|
//
|
||||||
// WARNING: Caller of this method must hold the pool mutex!
|
// WARNING: Caller of this method must hold the pool mutex!
|
||||||
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
|
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
|
||||||
if len(p.idleResources) == 0 {
|
if p.idleResources.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
res := p.idleResources[len(p.idleResources)-1]
|
return p.idleResources.Dequeue()
|
||||||
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
|
|
||||||
p.idleResources = p.idleResources[:len(p.idleResources)-1]
|
|
||||||
|
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// createNewResource creates a new resource and inserts it into list of pool
|
// createNewResource creates a new resource and inserts it into list of pool
|
||||||
@@ -471,7 +470,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
|||||||
|
|
||||||
res.value = value
|
res.value = value
|
||||||
res.status = resourceStatusIdle
|
res.status = resourceStatusIdle
|
||||||
p.idleResources = append(p.idleResources, res)
|
p.idleResources.Enqueue(res)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil, ErrNotAvailable
|
return nil, ErrNotAvailable
|
||||||
@@ -501,20 +500,21 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
|||||||
|
|
||||||
// Some resources from the maxSize limit do not have to exist (i.e. be
|
// Some resources from the maxSize limit do not have to exist (i.e. be
|
||||||
// idle).
|
// idle).
|
||||||
if diff := cnt - len(p.idleResources); diff > 0 {
|
if diff := cnt - p.idleResources.Len(); diff > 0 {
|
||||||
p.acquireSem.Release(int64(diff))
|
p.acquireSem.Release(int64(diff))
|
||||||
cnt = len(p.idleResources)
|
cnt = p.idleResources.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resources above cnt might be reserved by the semaphore.
|
// We are not guaranteed that idleResources are empty after this loop.
|
||||||
for i := len(p.idleResources) - cnt; i < len(p.idleResources); i++ {
|
// But we are guaranteed that all resources remain in idleResources
|
||||||
res := p.idleResources[i]
|
// after this loop will (1) either be acquired soon (semaphore was
|
||||||
p.idleResources[i] = nil // Avoid memory leak.
|
// already acquired for them) or (2) were released after start of this
|
||||||
|
// function.
|
||||||
|
for i := 0; i < cnt; i++ {
|
||||||
|
res := p.idleResources.Dequeue()
|
||||||
res.status = resourceStatusAcquired
|
res.status = resourceStatusAcquired
|
||||||
resources = append(resources, res)
|
resources = append(resources, res)
|
||||||
}
|
}
|
||||||
p.idleResources = p.idleResources[:len(p.idleResources)-cnt]
|
|
||||||
|
|
||||||
return resources
|
return resources
|
||||||
}
|
}
|
||||||
@@ -555,7 +555,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
|||||||
return ErrClosedPool
|
return ErrClosedPool
|
||||||
}
|
}
|
||||||
p.allResources = append(p.allResources, res)
|
p.allResources = append(p.allResources, res)
|
||||||
p.idleResources = append(p.idleResources, res)
|
p.idleResources.Enqueue(res)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -571,12 +571,11 @@ func (p *Pool[T]) Reset() {
|
|||||||
|
|
||||||
p.resetCount++
|
p.resetCount++
|
||||||
|
|
||||||
for i := range p.idleResources {
|
for p.idleResources.Len() > 0 {
|
||||||
p.allResources = removeResource(p.allResources, p.idleResources[i])
|
res := p.idleResources.Dequeue()
|
||||||
go p.destructResourceValue(p.idleResources[i].value)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
p.idleResources[i] = nil
|
go p.destructResourceValue(res.value)
|
||||||
}
|
}
|
||||||
p.idleResources = p.idleResources[0:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// releaseAcquiredResource returns res to the the pool.
|
// releaseAcquiredResource returns res to the the pool.
|
||||||
@@ -591,7 +590,7 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
|
|||||||
} else {
|
} else {
|
||||||
res.lastUsedNano = lastUsedNano
|
res.lastUsedNano = lastUsedNano
|
||||||
res.status = resourceStatusIdle
|
res.status = resourceStatusIdle
|
||||||
p.idleResources = append(p.idleResources, res)
|
p.idleResources.Enqueue(res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user