diff --git a/internal/circ/queue.go b/internal/circ/queue.go new file mode 100644 index 0000000..7147f47 --- /dev/null +++ b/internal/circ/queue.go @@ -0,0 +1,56 @@ +package circ + +type Queue[T any] struct { + arr []T + begin int + len int +} + +func NewQueue[T any](capacity int) *Queue[T] { + return &Queue[T]{ + // TODO: Do not preallocate whole capacity of Go upstream + // accepts this: https://github.com/golang/go/issues/55978 + arr: make([]T, capacity), + } +} + +func (q *Queue[T]) Cap() int { return len(q.arr) } +func (q *Queue[T]) Len() int { return q.len } + +func (q *Queue[T]) end() int { + e := q.begin + q.len + if l := len(q.arr); e >= l { + e -= l + } + + return e +} + +func (q *Queue[T]) Enqueue(elem T) { + if q.len == len(q.arr) { + panic("enqueue: queue is full") + } + + q.arr[q.end()] = elem + q.len++ +} + +func (q *Queue[T]) Dequeue() T { + if q.len < 1 { + panic("dequeue: queue is empty") + } + + elem := q.arr[q.begin] + + // Avoid memory leaks if T is pointer or contains pointers. + var zeroVal T + q.arr[q.begin] = zeroVal + + q.len-- + q.begin++ + if q.begin == len(q.arr) { + q.begin = 0 + } + + return elem +} diff --git a/internal/circ/queue_test.go b/internal/circ/queue_test.go new file mode 100644 index 0000000..e169251 --- /dev/null +++ b/internal/circ/queue_test.go @@ -0,0 +1,148 @@ +package circ_test + +import ( + "fmt" + "io" + "testing" + + "github.com/jackc/puddle/v2/internal/circ" + "github.com/stretchr/testify/require" +) + +func TestQueue_EnqueueDequeue(t *testing.T) { + r := require.New(t) + + q := circ.NewQueue[int](10) + r.Equal(10, q.Cap()) + + for i := 0; i < 10; i++ { + q.Enqueue(i) + r.Equal(i+1, q.Len()) + } + + r.Panics(func() { q.Enqueue(10) }) + r.Equal(10, q.Len()) + + for i := 0; i < 10; i++ { + j := q.Dequeue() + r.Equal(i, j) + + r.Equal(10-i-1, q.Len()) + } + + r.Panics(func() { q.Dequeue() }) + r.Equal(0, q.Len()) +} + +func TestQueue_EnqueueDequeueOverflow(t *testing.T) { + r := require.New(t) + + q := circ.NewQueue[int](10) + r.Equal(10, q.Cap()) + + for i := 0; i < 10; i++ { + q.Enqueue(i) + r.Equal(i+1, q.Len()) + } + + r.Panics(func() { q.Enqueue(10) }) + r.Equal(10, q.Len()) + + for i := 0; i < 5; i++ { + j := q.Dequeue() + r.Equal(i, j) + + r.Equal(10-i-1, q.Len()) + } + + for i := 10; i < 15; i++ { + q.Enqueue(i) + r.Equal(i-5+1, q.Len()) + } + + for i := 0; i < 10; i++ { + j := q.Dequeue() + r.Equal(i+5, j) + + r.Equal(10-i-1, q.Len()) + } + + r.Panics(func() { q.Dequeue() }) + r.Equal(0, q.Len()) +} + +func BenchmarkArrayAppend(b *testing.B) { + arr := make([]int, 0, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + arr = append(arr, i) + } + + // Make sure that the Go compiler doesn't optimize writes above. + b.StopTimer() + for i := 0; i < b.N; i++ { + fmt.Fprintf(io.Discard, "%d\n", arr[i]) + } +} + +func BenchmarkArrayWrite(b *testing.B) { + arr := make([]int, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + arr[i] = i + } + + // Make sure that the Go compiler doesn't optimize writes above. + b.StopTimer() + for i := 0; i < b.N; i++ { + fmt.Fprintf(io.Discard, "%d\n", arr[i]) + } +} + +func BenchmarkEnqueue(b *testing.B) { + q := circ.NewQueue[int](b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Enqueue(i) + } + + // Make sure that the Go compiler doesn't optimize writes above. + b.StopTimer() + for i := 0; i < b.N; i++ { + fmt.Fprintf(io.Discard, "%d\n", q.Dequeue()) + } +} + +func BenchmarkChanWrite(b *testing.B) { + // Chennels are another way how to represent a queue. + ch := make(chan int, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ch <- i + } + + // Make sure that the Go compiler doesn't optimize writes above. + b.StopTimer() + for i := 0; i < b.N; i++ { + fmt.Fprintf(io.Discard, "%d\n", <-ch) + } +} + +func BenchmarkDequeue(b *testing.B) { + q := circ.NewQueue[int](b.N) + + for i := 0; i < b.N; i++ { + q.Enqueue(i) + } + + out := make([]int, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + out[i] = q.Dequeue() + } +} diff --git a/pool.go b/pool.go index c60fc5b..b1ec61d 100644 --- a/pool.go +++ b/pool.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/jackc/puddle/v2/internal/circ" "golang.org/x/sync/semaphore" ) @@ -120,7 +121,7 @@ type Pool[T any] struct { destructWG sync.WaitGroup allResources []*Resource[T] - idleResources []*Resource[T] + idleResources *circ.Queue[*Resource[T]] constructor Constructor[T] destructor Destructor[T] @@ -154,6 +155,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { return &Pool[T]{ acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), + idleResources: circ.NewQueue[*Resource[T]](int(config.MaxSize)), maxSize: config.MaxSize, constructor: config.Constructor, destructor: config.Destructor, @@ -176,7 +178,8 @@ func (p *Pool[T]) Close() { p.closed = true p.cancelBaseAcquireCtx() - for _, res := range p.idleResources { + for p.idleResources.Len() > 0 { + res := p.idleResources.Dequeue() p.allResources = removeResource(p.allResources, res) go p.destructResourceValue(res.value) } @@ -280,15 +283,11 @@ func (p *Pool[T]) Stat() *Stat { // // WARNING: Caller of this method must hold the pool mutex! func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { - if len(p.idleResources) == 0 { + if p.idleResources.Len() == 0 { return nil } - res := p.idleResources[len(p.idleResources)-1] - p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak - p.idleResources = p.idleResources[:len(p.idleResources)-1] - - return res + return p.idleResources.Dequeue() } // createNewResource creates a new resource and inserts it into list of pool @@ -471,7 +470,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { res.value = value res.status = resourceStatusIdle - p.idleResources = append(p.idleResources, res) + p.idleResources.Enqueue(res) }() return nil, ErrNotAvailable @@ -501,20 +500,21 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { // Some resources from the maxSize limit do not have to exist (i.e. be // idle). - if diff := cnt - len(p.idleResources); diff > 0 { + if diff := cnt - p.idleResources.Len(); diff > 0 { p.acquireSem.Release(int64(diff)) - cnt = len(p.idleResources) + cnt = p.idleResources.Len() } - // Resources above cnt might be reserved by the semaphore. - for i := len(p.idleResources) - cnt; i < len(p.idleResources); i++ { - res := p.idleResources[i] - p.idleResources[i] = nil // Avoid memory leak. - + // We are not guaranteed that idleResources are empty after this loop. + // But we are guaranteed that all resources remain in idleResources + // after this loop will (1) either be acquired soon (semaphore was + // already acquired for them) or (2) were released after start of this + // function. + for i := 0; i < cnt; i++ { + res := p.idleResources.Dequeue() res.status = resourceStatusAcquired resources = append(resources, res) } - p.idleResources = p.idleResources[:len(p.idleResources)-cnt] return resources } @@ -555,7 +555,7 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { return ErrClosedPool } p.allResources = append(p.allResources, res) - p.idleResources = append(p.idleResources, res) + p.idleResources.Enqueue(res) return nil } @@ -571,12 +571,11 @@ func (p *Pool[T]) Reset() { p.resetCount++ - for i := range p.idleResources { - p.allResources = removeResource(p.allResources, p.idleResources[i]) - go p.destructResourceValue(p.idleResources[i].value) - p.idleResources[i] = nil + for p.idleResources.Len() > 0 { + res := p.idleResources.Dequeue() + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) } - p.idleResources = p.idleResources[0:0] } // releaseAcquiredResource returns res to the the pool. @@ -591,7 +590,7 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) } else { res.lastUsedNano = lastUsedNano res.status = resourceStatusIdle - p.idleResources = append(p.idleResources, res) + p.idleResources.Enqueue(res) } }