diff --git a/export_test.go b/export_test.go new file mode 100644 index 0000000..e977402 --- /dev/null +++ b/export_test.go @@ -0,0 +1,7 @@ +package puddle + +import "context" + +func (p *Pool[T]) AcquireRaw(ctx context.Context) (*Resource[T], error) { + return p.acquire(ctx) +} diff --git a/pool.go b/pool.go index 3708ebe..bd6f71b 100644 --- a/pool.go +++ b/pool.go @@ -293,7 +293,6 @@ func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Va // the problem of it being impossible to create resources when the time to create a resource is greater than any one // caller of Acquire is willing to wait. func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { - startNano := nanotime() if doneChan := ctx.Done(); doneChan != nil { select { case <-ctx.Done(): @@ -305,6 +304,13 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { } } + return p.acquire(ctx) +} + +// acquire is a continuation of Acquire function that doesn't check context +// validity. This function exists separatly only for benchmarking purposes. +func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { + startNano := nanotime() p.cond.L.Lock() emptyAcquire := false diff --git a/pool_test.go b/pool_test.go index 10ac50f..9e22d37 100644 --- a/pool_test.go +++ b/pool_test.go @@ -1226,26 +1226,33 @@ func BenchmarkAcquire_ReleaseAfterAcquire(b *testing.B) { } } -func BenchmarkAcquire_ReleaseAfterAcquireWithCPUOverload(b *testing.B) { - r := require.New(b) - ctx := context.Background() - pool := benchmarkPool[int32](b) - releaseChan := releaser[int32](b) - +func withCPULoad() { // Multiply by 2 to similate overload of the system. numGoroutines := runtime.NumCPU() * 2 + + var wg sync.WaitGroup for i := 0; i < numGoroutines; i++ { - startChan := make(chan struct{}) + wg.Add(1) go func() { - close(startChan) + wg.Done() // Similate computationally intensive task. for j := 0; true; j++ { } }() - <-startChan } + wg.Wait() +} + +func BenchmarkAcquire_ReleaseAfterAcquireWithCPULoad(b *testing.B) { + r := require.New(b) + ctx := context.Background() + pool := benchmarkPool[int32](b) + releaseChan := releaser[int32](b) + + withCPULoad() + res, err := pool.Acquire(ctx) r.NoError(err) // We need to release the last connection. Otherwise the pool.Close() @@ -1259,3 +1266,65 @@ func BenchmarkAcquire_ReleaseAfterAcquireWithCPUOverload(b *testing.B) { r.NoError(err) } } + +func BenchmarkAcquire_MultipleCancelled(b *testing.B) { + const cancelCnt = 64 + + r := require.New(b) + ctx := context.Background() + pool := benchmarkPool[int32](b) + releaseChan := releaser[int32](b) + + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + + res, err := pool.Acquire(ctx) + r.NoError(err) + // We need to release the last connection. Otherwise the pool.Close() + // method will block and this function will never return. + defer func() { res.Release() }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < cancelCnt; j++ { + _, err = pool.AcquireRaw(cancelCtx) + r.Equal(context.Canceled, err) + } + + releaseChan <- res + res, err = pool.Acquire(ctx) + r.NoError(err) + } +} + +func BenchmarkAcquire_MultipleCancelledWithCPULoad(b *testing.B) { + const cancelCnt = 3 + + r := require.New(b) + ctx := context.Background() + pool := benchmarkPool[int32](b) + releaseChan := releaser[int32](b) + + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + + withCPULoad() + + res, err := pool.Acquire(ctx) + r.NoError(err) + // We need to release the last connection. Otherwise the pool.Close() + // method will block and this function will never return. + defer func() { res.Release() }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < cancelCnt; j++ { + _, err = pool.AcquireRaw(cancelCtx) + r.Equal(context.Canceled, err) + } + + releaseChan <- res + res, err = pool.Acquire(ctx) + r.NoError(err) + } +}