From 11c9fbfb3c97b27bff10eecb7d2d39c1786f3505 Mon Sep 17 00:00:00 2001 From: Jan Dubsky Date: Fri, 30 Sep 2022 13:39:13 +0200 Subject: [PATCH] Perform logarithmic number of steps in AcquireAllIdle --- log.go | 32 ++++++++++++++++++++++++++++++++ log_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ pool.go | 47 ++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 log.go create mode 100644 log_test.go diff --git a/log.go b/log.go new file mode 100644 index 0000000..b21b946 --- /dev/null +++ b/log.go @@ -0,0 +1,32 @@ +package puddle + +import "unsafe" + +type ints interface { + int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 +} + +// log2Int returns log2 of an integer. This function panics if val < 0. For val +// == 0, returns 0. +func log2Int[T ints](val T) uint8 { + if val <= 0 { + panic("log2 of non-positive number does not exist") + } + + return log2IntRange(val, 0, uint8(8*unsafe.Sizeof(val))) +} + +func log2IntRange[T ints](val T, begin, end uint8) uint8 { + length := end - begin + if length == 1 { + return begin + } + + delim := begin + length/2 + mask := T(1) << delim + if mask > val { + return log2IntRange(val, begin, delim) + } else { + return log2IntRange(val, delim, end) + } +} diff --git a/log_test.go b/log_test.go new file mode 100644 index 0000000..a3127cb --- /dev/null +++ b/log_test.go @@ -0,0 +1,48 @@ +package puddle + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLog2Uint(t *testing.T) { + r := require.New(t) + + r.Equal(uint8(0), log2Int[uint64](1)) + r.Equal(uint8(1), log2Int[uint32](2)) + r.Equal(uint8(7), log2Int[uint8](math.MaxUint8)) + r.Equal(uint8(15), log2Int[uint16](math.MaxUint16)) + r.Equal(uint8(31), log2Int[uint32](math.MaxUint32)) + r.Equal(uint8(63), log2Int[uint64](math.MaxUint64)) + + r.Panics(func() { log2Int[uint64](0) }) + r.Panics(func() { log2Int[int64](-1) }) +} + +func FuzzLog2Uint(f *testing.F) { + const cnt = 1000 + + rand := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < cnt; i++ { + val := uint64(rand.Int63()) + // val + 1 not to test val == 0. + f.Add(val + 1) + } + + f.Fuzz(func(t *testing.T, val uint64) { + var mx uint8 + for i := 63; i >= 0; i-- { + mask := uint64(1) << i + if mask&val != 0 { + mx = uint8(i) + break + } + } + + require.Equal(t, mx, log2Int(val)) + }) +} diff --git a/pool.go b/pool.go index b1ec61d..7705bc0 100644 --- a/pool.go +++ b/pool.go @@ -476,20 +476,48 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { return nil, ErrNotAvailable } -// AcquireAllIdle atomically acquires all currently idle resources. Its intended -// use is for health check and keep-alive functionality. It does not update pool +// acquireSemAll acquires all free tokens from sem. This function is guaranteed +// to acquire at least the lowest number of tokens that has been available in +// the semaphore during runtime of this function. +// +// For the time being, semaphore doesn't allow to acquire all tokens atomically +// (see https://github.com/golang/sync/pull/19). We simulate this by trying all +// powers of 2 that are less or equal to max. +// +// For example, let's immagine we have 19 free tokens in the semaphore which in +// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then max +// is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 tokens. +// Out of those, the acquire of 16, 2 and 1 tokens will succeed. +// +// Naturally, Acquires and Releases of the semaphore might take place +// concurrently. For this reason, it's not guaranteed that absolutely all free +// tokens in the semaphore will be acquired. But it's guaranteed that at least +// the minimal number of tokens that has been present over the whole process +// will be acquired. This is sufficient for the use-case we have in this +// package. +func acquireSemAll[T ints](sem *semaphore.Weighted, max T) int { + var cnt int + for i := int(log2Int(max)); i >= 0; i-- { + val := int(1) << i + if sem.TryAcquire(int64(val)) { + cnt += val + } + } + + return cnt +} + +// AcquireAllIdle acquires all currently idle resources. Its intended use is for +// health check and keep-alive functionality. It does not update pool // statistics. func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { - var cnt int - for p.acquireSem.TryAcquire(1) { - cnt++ - } + // TODO: Replace this with acquireSem.TryAcqireAll() if it gets to + // upstream. https://github.com/golang/sync/pull/19 + cnt := acquireSemAll(p.acquireSem, int64(p.maxSize)) if cnt == 0 { return nil } - resources := make([]*Resource[T], 0, cnt) - p.mux.Lock() defer p.mux.Unlock() @@ -510,10 +538,11 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { // after this loop will (1) either be acquired soon (semaphore was // already acquired for them) or (2) were released after start of this // function. + resources := make([]*Resource[T], cnt) for i := 0; i < cnt; i++ { res := p.idleResources.Dequeue() res.status = resourceStatusAcquired - resources = append(resources, res) + resources[i] = res } return resources