Perform logarithmic number of steps in AcquireAllIdle
This commit is contained in:
committed by
Jack Christensen
parent
2c35738882
commit
11c9fbfb3c
@@ -0,0 +1,32 @@
|
|||||||
|
package puddle
|
||||||
|
|
||||||
|
import "unsafe"
|
||||||
|
|
||||||
|
type ints interface {
|
||||||
|
int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// log2Int returns log2 of an integer. This function panics if val < 0. For val
|
||||||
|
// == 0, returns 0.
|
||||||
|
func log2Int[T ints](val T) uint8 {
|
||||||
|
if val <= 0 {
|
||||||
|
panic("log2 of non-positive number does not exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
return log2IntRange(val, 0, uint8(8*unsafe.Sizeof(val)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func log2IntRange[T ints](val T, begin, end uint8) uint8 {
|
||||||
|
length := end - begin
|
||||||
|
if length == 1 {
|
||||||
|
return begin
|
||||||
|
}
|
||||||
|
|
||||||
|
delim := begin + length/2
|
||||||
|
mask := T(1) << delim
|
||||||
|
if mask > val {
|
||||||
|
return log2IntRange(val, begin, delim)
|
||||||
|
} else {
|
||||||
|
return log2IntRange(val, delim, end)
|
||||||
|
}
|
||||||
|
}
|
||||||
+48
@@ -0,0 +1,48 @@
|
|||||||
|
package puddle
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLog2Uint(t *testing.T) {
|
||||||
|
r := require.New(t)
|
||||||
|
|
||||||
|
r.Equal(uint8(0), log2Int[uint64](1))
|
||||||
|
r.Equal(uint8(1), log2Int[uint32](2))
|
||||||
|
r.Equal(uint8(7), log2Int[uint8](math.MaxUint8))
|
||||||
|
r.Equal(uint8(15), log2Int[uint16](math.MaxUint16))
|
||||||
|
r.Equal(uint8(31), log2Int[uint32](math.MaxUint32))
|
||||||
|
r.Equal(uint8(63), log2Int[uint64](math.MaxUint64))
|
||||||
|
|
||||||
|
r.Panics(func() { log2Int[uint64](0) })
|
||||||
|
r.Panics(func() { log2Int[int64](-1) })
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzLog2Uint(f *testing.F) {
|
||||||
|
const cnt = 1000
|
||||||
|
|
||||||
|
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
for i := 0; i < cnt; i++ {
|
||||||
|
val := uint64(rand.Int63())
|
||||||
|
// val + 1 not to test val == 0.
|
||||||
|
f.Add(val + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Fuzz(func(t *testing.T, val uint64) {
|
||||||
|
var mx uint8
|
||||||
|
for i := 63; i >= 0; i-- {
|
||||||
|
mask := uint64(1) << i
|
||||||
|
if mask&val != 0 {
|
||||||
|
mx = uint8(i)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, mx, log2Int(val))
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -476,20 +476,48 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
|||||||
return nil, ErrNotAvailable
|
return nil, ErrNotAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
// AcquireAllIdle atomically acquires all currently idle resources. Its intended
|
// acquireSemAll acquires all free tokens from sem. This function is guaranteed
|
||||||
// use is for health check and keep-alive functionality. It does not update pool
|
// to acquire at least the lowest number of tokens that has been available in
|
||||||
|
// the semaphore during runtime of this function.
|
||||||
|
//
|
||||||
|
// For the time being, semaphore doesn't allow to acquire all tokens atomically
|
||||||
|
// (see https://github.com/golang/sync/pull/19). We simulate this by trying all
|
||||||
|
// powers of 2 that are less or equal to max.
|
||||||
|
//
|
||||||
|
// For example, let's immagine we have 19 free tokens in the semaphore which in
|
||||||
|
// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then max
|
||||||
|
// is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 tokens.
|
||||||
|
// Out of those, the acquire of 16, 2 and 1 tokens will succeed.
|
||||||
|
//
|
||||||
|
// Naturally, Acquires and Releases of the semaphore might take place
|
||||||
|
// concurrently. For this reason, it's not guaranteed that absolutely all free
|
||||||
|
// tokens in the semaphore will be acquired. But it's guaranteed that at least
|
||||||
|
// the minimal number of tokens that has been present over the whole process
|
||||||
|
// will be acquired. This is sufficient for the use-case we have in this
|
||||||
|
// package.
|
||||||
|
func acquireSemAll[T ints](sem *semaphore.Weighted, max T) int {
|
||||||
|
var cnt int
|
||||||
|
for i := int(log2Int(max)); i >= 0; i-- {
|
||||||
|
val := int(1) << i
|
||||||
|
if sem.TryAcquire(int64(val)) {
|
||||||
|
cnt += val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return cnt
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireAllIdle acquires all currently idle resources. Its intended use is for
|
||||||
|
// health check and keep-alive functionality. It does not update pool
|
||||||
// statistics.
|
// statistics.
|
||||||
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
||||||
var cnt int
|
// TODO: Replace this with acquireSem.TryAcqireAll() if it gets to
|
||||||
for p.acquireSem.TryAcquire(1) {
|
// upstream. https://github.com/golang/sync/pull/19
|
||||||
cnt++
|
cnt := acquireSemAll(p.acquireSem, int64(p.maxSize))
|
||||||
}
|
|
||||||
if cnt == 0 {
|
if cnt == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resources := make([]*Resource[T], 0, cnt)
|
|
||||||
|
|
||||||
p.mux.Lock()
|
p.mux.Lock()
|
||||||
defer p.mux.Unlock()
|
defer p.mux.Unlock()
|
||||||
|
|
||||||
@@ -510,10 +538,11 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
|||||||
// after this loop will (1) either be acquired soon (semaphore was
|
// after this loop will (1) either be acquired soon (semaphore was
|
||||||
// already acquired for them) or (2) were released after start of this
|
// already acquired for them) or (2) were released after start of this
|
||||||
// function.
|
// function.
|
||||||
|
resources := make([]*Resource[T], cnt)
|
||||||
for i := 0; i < cnt; i++ {
|
for i := 0; i < cnt; i++ {
|
||||||
res := p.idleResources.Dequeue()
|
res := p.idleResources.Dequeue()
|
||||||
res.status = resourceStatusAcquired
|
res.status = resourceStatusAcquired
|
||||||
resources = append(resources, res)
|
resources[i] = res
|
||||||
}
|
}
|
||||||
|
|
||||||
return resources
|
return resources
|
||||||
|
|||||||
Reference in New Issue
Block a user