2
0

Move main logic to package and use generics for API with backwards compatibility

This commit is contained in:
Столяров Владимир Алексеевич
2022-03-18 18:20:15 +03:00
committed by Jack Christensen
parent 314357b795
commit 4e95984946
9 changed files with 663 additions and 635 deletions
+5 -5
View File
@@ -22,15 +22,15 @@ own.
## Example Usage
```go
constructor := func(context.Context) (interface{}, error) {
constructor := func(context.Context) (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:8080")
}
destructor := func(value interface{}) {
value.(net.Conn).Close()
destructor := func(value net.Conn) {
value.Close()
}
maxPoolSize := 10
pool := puddle.NewPool(constructor, destructor, maxPoolSize)
pool := puddleg.NewPool(constructor, destructor, maxPoolSize)
// Acquire resource from the pool.
res, err := pool.Acquire(context.Background())
@@ -39,7 +39,7 @@ if err != nil {
}
// Use resource.
_, err = res.Value().(net.Conn).Write([]byte{1})
_, err = res.Value().Write([]byte{1})
if err != nil {
// ...
}
+6 -1
View File
@@ -1,5 +1,10 @@
module github.com/jackc/puddle
go 1.12
go 1.18
require github.com/stretchr/testify v1.3.0
require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
)
+26 -523
View File
@@ -1,532 +1,35 @@
package puddle
import (
"context"
"errors"
"sync"
"time"
import "github.com/jackc/puddle/puddleg"
var (
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
// or a pool that is closed while the acquire is waiting.
ErrClosedPool = puddleg.ErrClosedPool
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
// that is at maximum capacity and has no available resources.
ErrNotAvailable = puddleg.ErrNotAvailable
)
const (
resourceStatusConstructing = 0
resourceStatusIdle = iota
resourceStatusAcquired = iota
resourceStatusHijacked = iota
type (
// Constructor is a function called by the pool to construct a resource.
Constructor = puddleg.Constructor[any]
// Destructor is a function called by the pool to destroy a resource.
Destructor = puddleg.Destructor[any]
// Resource is the resource handle returned by acquiring from the pool.
Resource = puddleg.Resource[any]
// Pool is a concurrency-safe resource pool.
Pool = puddleg.Pool[any]
// Stat is a snapshot of Pool statistics.
Stat = puddleg.Stat
)
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
// or a pool that is closed while the acquire is waiting.
var ErrClosedPool = errors.New("closed pool")
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
// that is at maximum capacity and has no available resources.
var ErrNotAvailable = errors.New("resource not available")
// Constructor is a function called by the pool to construct a resource.
type Constructor func(ctx context.Context) (res interface{}, err error)
// Destructor is a function called by the pool to destroy a resource.
type Destructor func(res interface{})
// Resource is the resource handle returned by acquiring from the pool.
type Resource struct {
value interface{}
pool *Pool
creationTime time.Time
lastUsedNano int64
status byte
}
// Value returns the resource value.
func (res *Resource) Value() interface{} {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.value
}
// Release returns the resource to the pool. res must not be subsequently used.
func (res *Resource) Release() {
if res.status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
res.pool.releaseAcquiredResource(res, nanotime())
}
// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
// will not change. res must not be subsequently used.
func (res *Resource) ReleaseUnused() {
if res.status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
res.pool.releaseAcquiredResource(res, res.lastUsedNano)
}
// Destroy returns the resource to the pool for destruction. res must not be
// subsequently used.
func (res *Resource) Destroy() {
if res.status != resourceStatusAcquired {
panic("tried to destroy resource that is not acquired")
}
go res.pool.destroyAcquiredResource(res)
}
// Hijack assumes ownership of the resource from the pool. Caller is responsible
// for cleanup of resource value.
func (res *Resource) Hijack() {
if res.status != resourceStatusAcquired {
panic("tried to hijack resource that is not acquired")
}
res.pool.hijackAcquiredResource(res)
}
// CreationTime returns when the resource was created by the pool.
func (res *Resource) CreationTime() time.Time {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.creationTime
}
// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
func (res *Resource) LastUsedNanotime() int64 {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.lastUsedNano
}
// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
// LastUsedNanotime to the current nanotime.
func (res *Resource) IdleDuration() time.Duration {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return time.Duration(nanotime() - res.lastUsedNano)
}
// Pool is a concurrency-safe resource pool.
type Pool struct {
cond *sync.Cond
destructWG *sync.WaitGroup
allResources []*Resource
idleResources []*Resource
constructor Constructor
destructor Destructor
maxSize int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount int64
closed bool
}
// NewPool creates a new pool. Panics if maxSize is less than 1.
func NewPool(constructor Constructor, destructor Destructor, maxSize int32) *Pool {
if maxSize < 1 {
panic("maxSize is less than 1")
}
return &Pool{
cond: sync.NewCond(new(sync.Mutex)),
destructWG: &sync.WaitGroup{},
maxSize: maxSize,
constructor: constructor,
destructor: destructor,
}
}
// Close destroys all resources in the pool and rejects future Acquire calls.
// Blocks until all resources are returned to pool and destroyed.
func (p *Pool) Close() {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return
}
p.closed = true
for _, res := range p.idleResources {
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
p.idleResources = nil
p.cond.L.Unlock()
// Wake up all go routines waiting for a resource to be returned so they can terminate.
p.cond.Broadcast()
p.destructWG.Wait()
}
// Stat is a snapshot of Pool statistics.
type Stat struct {
constructingResources int32
acquiredResources int32
idleResources int32
maxResources int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount int64
}
// TotalResource returns the total number of resources currently in the pool.
// The value is the sum of ConstructingResources, AcquiredResources, and
// IdleResources.
func (s *Stat) TotalResources() int32 {
return s.constructingResources + s.acquiredResources + s.idleResources
}
// ConstructingResources returns the number of resources with construction in progress in
// the pool.
func (s *Stat) ConstructingResources() int32 {
return s.constructingResources
}
// AcquiredResources returns the number of currently acquired resources in the pool.
func (s *Stat) AcquiredResources() int32 {
return s.acquiredResources
}
// IdleResources returns the number of currently idle resources in the pool.
func (s *Stat) IdleResources() int32 {
return s.idleResources
}
// MaxResources returns the maximum size of the pool.
func (s *Stat) MaxResources() int32 {
return s.maxResources
}
// AcquireCount returns the cumulative count of successful acquires from the pool.
func (s *Stat) AcquireCount() int64 {
return s.acquireCount
}
// AcquireDuration returns the total duration of all successful acquires from
// the pool.
func (s *Stat) AcquireDuration() time.Duration {
return s.acquireDuration
}
// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
// that waited for a resource to be released or constructed because the pool was
// empty.
func (s *Stat) EmptyAcquireCount() int64 {
return s.emptyAcquireCount
}
// CanceledAcquireCount returns the cumulative count of acquires from the pool
// that were canceled by a context.
func (s *Stat) CanceledAcquireCount() int64 {
return s.canceledAcquireCount
}
// Stat returns the current pool statistics.
func (p *Pool) Stat() *Stat {
p.cond.L.Lock()
s := &Stat{
maxResources: p.maxSize,
acquireCount: p.acquireCount,
emptyAcquireCount: p.emptyAcquireCount,
canceledAcquireCount: p.canceledAcquireCount,
acquireDuration: p.acquireDuration,
}
for _, res := range p.allResources {
switch res.status {
case resourceStatusConstructing:
s.constructingResources += 1
case resourceStatusIdle:
s.idleResources += 1
case resourceStatusAcquired:
s.acquiredResources += 1
}
}
p.cond.L.Unlock()
return s
}
// Acquire gets a resource from the pool. If no resources are available and the pool
// is not at maximum capacity it will create a new resource. If the pool is at
// maximum capacity it will block until a resource is available. ctx can be used
// to cancel the Acquire.
func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
startNano := nanotime()
if doneChan := ctx.Done(); doneChan != nil {
select {
case <-ctx.Done():
p.cond.L.Lock()
p.canceledAcquireCount += 1
p.cond.L.Unlock()
return nil, ctx.Err()
default:
}
}
p.cond.L.Lock()
emptyAcquire := false
for {
if p.closed {
p.cond.L.Unlock()
return nil, ErrClosedPool
}
// If a resource is available now
if len(p.idleResources) > 0 {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
res.status = resourceStatusAcquired
if emptyAcquire {
p.emptyAcquireCount += 1
}
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock()
return res, nil
}
emptyAcquire = true
// If there is room to create a resource do so
if len(p.allResources) < int(p.maxSize) {
res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
p.cond.L.Unlock()
value, err := p.constructResourceValue(ctx)
p.cond.L.Lock()
if err != nil {
p.allResources = removeResource(p.allResources, res)
p.destructWG.Done()
select {
case <-ctx.Done():
if err == ctx.Err() {
p.canceledAcquireCount += 1
}
default:
}
p.cond.L.Unlock()
p.cond.Signal()
return nil, err
}
res.value = value
res.status = resourceStatusAcquired
p.emptyAcquireCount += 1
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock()
return res, nil
}
if ctx.Done() == nil {
p.cond.Wait()
} else {
// Convert p.cond.Wait into a channel
waitChan := make(chan struct{}, 1)
go func() {
p.cond.Wait()
waitChan <- struct{}{}
}()
select {
case <-ctx.Done():
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
// do anything with it. Another goroutine might be waiting.
go func() {
<-waitChan
p.cond.Signal()
p.cond.L.Unlock()
}()
p.cond.L.Lock()
p.canceledAcquireCount += 1
p.cond.L.Unlock()
return nil, ctx.Err()
case <-waitChan:
}
}
}
}
// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
// used to cancel the background creation.
func (p *Pool) TryAcquire(ctx context.Context) (*Resource, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.closed {
return nil, ErrClosedPool
}
// If a resource is available now
if len(p.idleResources) > 0 {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
p.acquireCount += 1
res.status = resourceStatusAcquired
return res, nil
}
if len(p.allResources) < int(p.maxSize) {
res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
go func() {
value, err := p.constructResourceValue(ctx)
defer p.cond.Signal()
p.cond.L.Lock()
defer p.cond.L.Unlock()
if err != nil {
p.allResources = removeResource(p.allResources, res)
p.destructWG.Done()
return
}
res.value = value
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
}()
}
return nil, ErrNotAvailable
}
// AcquireAllIdle atomically acquires all currently idle resources. Its intended
// use is for health check and keep-alive functionality. It does not update pool
// statistics.
func (p *Pool) AcquireAllIdle() []*Resource {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return nil
}
for _, res := range p.idleResources {
res.status = resourceStatusAcquired
}
resources := p.idleResources // Swap out current slice
p.idleResources = nil
p.cond.L.Unlock()
return resources
}
// CreateResource constructs a new resource without acquiring it.
// It goes straight in the IdlePool. It does not check against maxSize.
// It can be useful to maintain warm resources under little load.
func (p *Pool) CreateResource(ctx context.Context) error {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return ErrClosedPool
}
p.cond.L.Unlock()
value, err := p.constructResourceValue(ctx)
if err != nil {
return err
}
res := &Resource{
pool: p,
creationTime: time.Now(),
status: resourceStatusIdle,
value: value,
lastUsedNano: nanotime(),
}
p.destructWG.Add(1)
p.cond.L.Lock()
// If closed while constructing resource then destroy it and return an error
if p.closed {
go p.destructResourceValue(res.value)
p.cond.L.Unlock()
return ErrClosedPool
}
p.allResources = append(p.allResources, res)
p.idleResources = append(p.idleResources, res)
p.cond.L.Unlock()
return nil
}
// releaseAcquiredResource returns res to the the pool.
func (p *Pool) releaseAcquiredResource(res *Resource, lastUsedNano int64) {
p.cond.L.Lock()
if !p.closed {
res.lastUsedNano = lastUsedNano
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
} else {
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
p.cond.L.Unlock()
p.cond.Signal()
}
// Remove removes res from the pool and closes it. If res is not part of the
// pool Remove will panic.
func (p *Pool) destroyAcquiredResource(res *Resource) {
p.destructResourceValue(res.value)
p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res)
p.cond.L.Unlock()
p.cond.Signal()
}
func (p *Pool) hijackAcquiredResource(res *Resource) {
p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res)
res.status = resourceStatusHijacked
p.destructWG.Done() // not responsible for destructing hijacked resources
p.cond.L.Unlock()
p.cond.Signal()
}
func removeResource(slice []*Resource, res *Resource) []*Resource {
for i := range slice {
if slice[i] == res {
slice[i] = slice[len(slice)-1]
slice[len(slice)-1] = nil // Avoid memory leak
return slice[:len(slice)-1]
}
}
panic("BUG: removeResource could not find res in slice")
}
func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) {
return p.constructor(ctx)
}
func (p *Pool) destructResourceValue(value interface{}) {
p.destructor(value)
p.destructWG.Done()
return puddleg.NewPool(constructor, destructor, maxSize)
}
+11
View File
@@ -0,0 +1,11 @@
// Package puddleg is a generic resource pool with type-parametrized api.
/*
Puddle is a tiny generic resource pool library for Go that uses the standard
context library to signal cancellation of acquires. It is designed to contain
the minimum functionality a resource pool needs that cannot be implemented
without concurrency concerns. For example, a database connection pool may use
puddle internally and implement health checks and keep-alive behavior without
needing to implement any concurrent code of its own.
*/
package puddleg
@@ -1,4 +1,4 @@
package puddle
package puddleg
import (
"testing"
@@ -7,6 +7,6 @@ import (
)
func TestRemoveResourcePanicsWithBugReportIfResourceDoesNotExist(t *testing.T) {
s := []*Resource{new(Resource), new(Resource), new(Resource)}
assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource)) })
s := []*Resource[any]{new(Resource[any]), new(Resource[any]), new(Resource[any])}
assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource[any])) })
}
@@ -1,8 +1,8 @@
// +build purego appengine js
//go:build purego || appengine || js
// This file contains the safe implementation of nanotime using time.Now().
package puddle
package puddleg
import (
"time"
@@ -1,8 +1,8 @@
// +build !purego,!appengine,!js
//go:build !purego && !appengine && !js
// This file contains the implementation of nanotime using runtime.nanotime.
package puddle
package puddleg
import "unsafe"
+532
View File
@@ -0,0 +1,532 @@
package puddleg
import (
"context"
"errors"
"sync"
"time"
)
const (
resourceStatusConstructing = 0
resourceStatusIdle = iota
resourceStatusAcquired = iota
resourceStatusHijacked = iota
)
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
// or a pool that is closed while the acquire is waiting.
var ErrClosedPool = errors.New("closed pool")
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
// that is at maximum capacity and has no available resources.
var ErrNotAvailable = errors.New("resource not available")
// Constructor is a function called by the pool to construct a resource.
type Constructor[T any] func(ctx context.Context) (res T, err error)
// Destructor is a function called by the pool to destroy a resource.
type Destructor[T any] func(res T)
// Resource is the resource handle returned by acquiring from the pool.
type Resource[T any] struct {
value T
pool *Pool[T]
creationTime time.Time
lastUsedNano int64
status byte
}
// Value returns the resource value.
func (res *Resource[T]) Value() T {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.value
}
// Release returns the resource to the pool. res must not be subsequently used.
func (res *Resource[T]) Release() {
if res.status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
res.pool.releaseAcquiredResource(res, nanotime())
}
// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
// will not change. res must not be subsequently used.
func (res *Resource[T]) ReleaseUnused() {
if res.status != resourceStatusAcquired {
panic("tried to release resource that is not acquired")
}
res.pool.releaseAcquiredResource(res, res.lastUsedNano)
}
// Destroy returns the resource to the pool for destruction. res must not be
// subsequently used.
func (res *Resource[T]) Destroy() {
if res.status != resourceStatusAcquired {
panic("tried to destroy resource that is not acquired")
}
go res.pool.destroyAcquiredResource(res)
}
// Hijack assumes ownership of the resource from the pool. Caller is responsible
// for cleanup of resource value.
func (res *Resource[T]) Hijack() {
if res.status != resourceStatusAcquired {
panic("tried to hijack resource that is not acquired")
}
res.pool.hijackAcquiredResource(res)
}
// CreationTime returns when the resource was created by the pool.
func (res *Resource[T]) CreationTime() time.Time {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.creationTime
}
// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
func (res *Resource[T]) LastUsedNanotime() int64 {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return res.lastUsedNano
}
// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
// LastUsedNanotime to the current nanotime.
func (res *Resource[T]) IdleDuration() time.Duration {
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
panic("tried to access resource that is not acquired or hijacked")
}
return time.Duration(nanotime() - res.lastUsedNano)
}
// Pool is a concurrency-safe resource pool.
type Pool[T any] struct {
cond *sync.Cond
destructWG *sync.WaitGroup
allResources []*Resource[T]
idleResources []*Resource[T]
constructor Constructor[T]
destructor Destructor[T]
maxSize int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount int64
closed bool
}
// NewPool creates a new pool. Panics if maxSize is less than 1.
func NewPool[T any](constructor Constructor[T], destructor Destructor[T], maxSize int32) *Pool[T] {
if maxSize < 1 {
panic("maxSize is less than 1")
}
return &Pool[T]{
cond: sync.NewCond(new(sync.Mutex)),
destructWG: &sync.WaitGroup{},
maxSize: maxSize,
constructor: constructor,
destructor: destructor,
}
}
// Close destroys all resources in the pool and rejects future Acquire calls.
// Blocks until all resources are returned to pool and destroyed.
func (p *Pool[T]) Close() {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return
}
p.closed = true
for _, res := range p.idleResources {
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
p.idleResources = nil
p.cond.L.Unlock()
// Wake up all go routines waiting for a resource to be returned so they can terminate.
p.cond.Broadcast()
p.destructWG.Wait()
}
// Stat is a snapshot of Pool statistics.
type Stat struct {
constructingResources int32
acquiredResources int32
idleResources int32
maxResources int32
acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount int64
}
// TotalResources returns the total number of resources currently in the pool.
// The value is the sum of ConstructingResources, AcquiredResources, and
// IdleResources.
func (s *Stat) TotalResources() int32 {
return s.constructingResources + s.acquiredResources + s.idleResources
}
// ConstructingResources returns the number of resources with construction in progress in
// the pool.
func (s *Stat) ConstructingResources() int32 {
return s.constructingResources
}
// AcquiredResources returns the number of currently acquired resources in the pool.
func (s *Stat) AcquiredResources() int32 {
return s.acquiredResources
}
// IdleResources returns the number of currently idle resources in the pool.
func (s *Stat) IdleResources() int32 {
return s.idleResources
}
// MaxResources returns the maximum size of the pool.
func (s *Stat) MaxResources() int32 {
return s.maxResources
}
// AcquireCount returns the cumulative count of successful acquires from the pool.
func (s *Stat) AcquireCount() int64 {
return s.acquireCount
}
// AcquireDuration returns the total duration of all successful acquires from
// the pool.
func (s *Stat) AcquireDuration() time.Duration {
return s.acquireDuration
}
// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
// that waited for a resource to be released or constructed because the pool was
// empty.
func (s *Stat) EmptyAcquireCount() int64 {
return s.emptyAcquireCount
}
// CanceledAcquireCount returns the cumulative count of acquires from the pool
// that were canceled by a context.
func (s *Stat) CanceledAcquireCount() int64 {
return s.canceledAcquireCount
}
// Stat returns the current pool statistics.
func (p *Pool[T]) Stat() *Stat {
p.cond.L.Lock()
s := &Stat{
maxResources: p.maxSize,
acquireCount: p.acquireCount,
emptyAcquireCount: p.emptyAcquireCount,
canceledAcquireCount: p.canceledAcquireCount,
acquireDuration: p.acquireDuration,
}
for _, res := range p.allResources {
switch res.status {
case resourceStatusConstructing:
s.constructingResources += 1
case resourceStatusIdle:
s.idleResources += 1
case resourceStatusAcquired:
s.acquiredResources += 1
}
}
p.cond.L.Unlock()
return s
}
// Acquire gets a resource from the pool. If no resources are available and the pool
// is not at maximum capacity it will create a new resource. If the pool is at
// maximum capacity it will block until a resource is available. ctx can be used
// to cancel the Acquire.
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
startNano := nanotime()
if doneChan := ctx.Done(); doneChan != nil {
select {
case <-ctx.Done():
p.cond.L.Lock()
p.canceledAcquireCount += 1
p.cond.L.Unlock()
return nil, ctx.Err()
default:
}
}
p.cond.L.Lock()
emptyAcquire := false
for {
if p.closed {
p.cond.L.Unlock()
return nil, ErrClosedPool
}
// If a resource is available now
if len(p.idleResources) > 0 {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
res.status = resourceStatusAcquired
if emptyAcquire {
p.emptyAcquireCount += 1
}
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock()
return res, nil
}
emptyAcquire = true
// If there is room to create a resource do so
if len(p.allResources) < int(p.maxSize) {
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
p.cond.L.Unlock()
value, err := p.constructResourceValue(ctx)
p.cond.L.Lock()
if err != nil {
p.allResources = removeResource(p.allResources, res)
p.destructWG.Done()
select {
case <-ctx.Done():
if err == ctx.Err() {
p.canceledAcquireCount += 1
}
default:
}
p.cond.L.Unlock()
p.cond.Signal()
return nil, err
}
res.value = value
res.status = resourceStatusAcquired
p.emptyAcquireCount += 1
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock()
return res, nil
}
if ctx.Done() == nil {
p.cond.Wait()
} else {
// Convert p.cond.Wait into a channel
waitChan := make(chan struct{}, 1)
go func() {
p.cond.Wait()
waitChan <- struct{}{}
}()
select {
case <-ctx.Done():
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
// do anything with it. Another goroutine might be waiting.
go func() {
<-waitChan
p.cond.Signal()
p.cond.L.Unlock()
}()
p.cond.L.Lock()
p.canceledAcquireCount += 1
p.cond.L.Unlock()
return nil, ctx.Err()
case <-waitChan:
}
}
}
}
// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
// used to cancel the background creation.
func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.closed {
return nil, ErrClosedPool
}
// If a resource is available now
if len(p.idleResources) > 0 {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
p.acquireCount += 1
res.status = resourceStatusAcquired
return res, nil
}
if len(p.allResources) < int(p.maxSize) {
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
go func() {
value, err := p.constructResourceValue(ctx)
defer p.cond.Signal()
p.cond.L.Lock()
defer p.cond.L.Unlock()
if err != nil {
p.allResources = removeResource(p.allResources, res)
p.destructWG.Done()
return
}
res.value = value
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
}()
}
return nil, ErrNotAvailable
}
// AcquireAllIdle atomically acquires all currently idle resources. Its intended
// use is for health check and keep-alive functionality. It does not update pool
// statistics.
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return nil
}
for _, res := range p.idleResources {
res.status = resourceStatusAcquired
}
resources := p.idleResources // Swap out current slice
p.idleResources = nil
p.cond.L.Unlock()
return resources
}
// CreateResource constructs a new resource without acquiring it.
// It goes straight in the IdlePool. It does not check against maxSize.
// It can be useful to maintain warm resources under little load.
func (p *Pool[T]) CreateResource(ctx context.Context) error {
p.cond.L.Lock()
if p.closed {
p.cond.L.Unlock()
return ErrClosedPool
}
p.cond.L.Unlock()
value, err := p.constructResourceValue(ctx)
if err != nil {
return err
}
res := &Resource[T]{
pool: p,
creationTime: time.Now(),
status: resourceStatusIdle,
value: value,
lastUsedNano: nanotime(),
}
p.destructWG.Add(1)
p.cond.L.Lock()
// If closed while constructing resource then destroy it and return an error
if p.closed {
go p.destructResourceValue(res.value)
p.cond.L.Unlock()
return ErrClosedPool
}
p.allResources = append(p.allResources, res)
p.idleResources = append(p.idleResources, res)
p.cond.L.Unlock()
return nil
}
// releaseAcquiredResource returns res to the the pool.
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
p.cond.L.Lock()
if !p.closed {
res.lastUsedNano = lastUsedNano
res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res)
} else {
p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
}
p.cond.L.Unlock()
p.cond.Signal()
}
// Remove removes res from the pool and closes it. If res is not part of the
// pool Remove will panic.
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
p.destructResourceValue(res.value)
p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res)
p.cond.L.Unlock()
p.cond.Signal()
}
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res)
res.status = resourceStatusHijacked
p.destructWG.Done() // not responsible for destructing hijacked resources
p.cond.L.Unlock()
p.cond.Signal()
}
func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {
for i := range slice {
if slice[i] == res {
slice[i] = slice[len(slice)-1]
slice[len(slice)-1] = nil // Avoid memory leak
return slice[:len(slice)-1]
}
}
panic("BUG: removeResource could not find res in slice")
}
func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) {
return p.constructor(ctx)
}
func (p *Pool[T]) destructResourceValue(value T) {
p.destructor(value)
p.destructWG.Done()
}
+76 -99
View File
@@ -1,4 +1,4 @@
package puddle_test
package puddleg_test
import (
"context"
@@ -14,7 +14,7 @@ import (
"testing"
"time"
"github.com/jackc/puddle"
"github.com/jackc/puddle/puddleg"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -41,48 +41,25 @@ func (c *Counter) Value() int {
return n
}
func createConstructor() (puddle.Constructor, *Counter) {
func createConstructor() (puddleg.Constructor[int], *Counter) {
var c Counter
f := func(ctx context.Context) (interface{}, error) {
f := func(ctx context.Context) (int, error) {
return c.Next(), nil
}
return f, &c
}
func createConstructorWithNotifierChan() (puddle.Constructor, *Counter, chan int) {
ch := make(chan int)
var c Counter
f := func(ctx context.Context) (interface{}, error) {
n := c.Next()
// Because the tests will not read from ch until after the create function f returns.
go func() { ch <- n }()
return n, nil
}
return f, &c, ch
}
func stubDestructor(interface{}) {}
func waitForRead(ch chan int) bool {
select {
case <-ch:
return true
case <-time.NewTimer(time.Second).C:
return false
}
}
func stubDestructor(int) {}
func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) {
constructor, _ := createConstructor()
assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, -1) })
assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, 0) })
assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, -1) })
assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, 0) })
}
func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
defer pool.Close()
res, err := pool.Acquire(context.Background())
@@ -94,7 +71,7 @@ func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
constructor, createCounter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
wg := &sync.WaitGroup{}
@@ -119,7 +96,7 @@ func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T)
func TestPoolAcquireWithCancellableContext(t *testing.T) {
constructor, createCounter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
wg := &sync.WaitGroup{}
@@ -146,10 +123,10 @@ func TestPoolAcquireWithCancellableContext(t *testing.T) {
func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
errCreateFailed := errors.New("create failed")
constructor := func(ctx context.Context) (interface{}, error) {
return nil, errCreateFailed
constructor := func(ctx context.Context) (int, error) {
return 0, errCreateFailed
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.Acquire(context.Background())
assert.Equal(t, errCreateFailed, err)
@@ -158,7 +135,7 @@ func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
func TestPoolAcquireReusesResources(t *testing.T) {
constructor, createCounter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -177,11 +154,11 @@ func TestPoolAcquireReusesResources(t *testing.T) {
func TestPoolTryAcquire(t *testing.T) {
constructor, createCounter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
// Pool is initially empty so TryAcquire fails but starts construction of resource in the background.
res, err := pool.TryAcquire(context.Background())
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
assert.Nil(t, res)
// Wait for background creation to complete.
@@ -193,7 +170,7 @@ func TestPoolTryAcquire(t *testing.T) {
defer res.Release()
res, err = pool.TryAcquire(context.Background())
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
assert.Nil(t, res)
assert.Equal(t, 1, createCounter.Value())
@@ -201,29 +178,29 @@ func TestPoolTryAcquire(t *testing.T) {
func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
pool.Close()
res, err := pool.TryAcquire(context.Background())
assert.Equal(t, puddle.ErrClosedPool, err)
assert.Equal(t, puddleg.ErrClosedPool, err)
assert.Nil(t, res)
}
func TestPoolTryAcquireWithFailedResourceCreate(t *testing.T) {
errCreateFailed := errors.New("create failed")
constructor := func(ctx context.Context) (interface{}, error) {
return nil, errCreateFailed
constructor := func(ctx context.Context) (int, error) {
return 0, errCreateFailed
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.TryAcquire(context.Background())
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
assert.Nil(t, res)
}
func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
constructor, createCounter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
assert.Panics(t, func() { pool.Acquire(nil) })
@@ -236,10 +213,10 @@ func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
}
func TestPoolAcquireContextAlreadyCanceled(t *testing.T) {
constructor := func(ctx context.Context) (interface{}, error) {
constructor := func(ctx context.Context) (int, error) {
panic("should never be called")
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
ctx, cancel := context.WithCancel(context.Background())
cancel()
@@ -254,15 +231,15 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
timeoutChan := time.After(1 * time.Second)
var constructorCalls Counter
constructor := func(ctx context.Context) (interface{}, error) {
constructor := func(ctx context.Context) (int, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
return 0, ctx.Err()
case <-timeoutChan:
}
return constructorCalls.Next(), nil
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.Acquire(ctx)
assert.Equal(t, context.Canceled, err)
@@ -271,10 +248,10 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
func TestPoolAcquireAllIdle(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
defer pool.Close()
resources := make([]*puddle.Resource, 4)
resources := make([]*puddleg.Resource[int], 4)
var err error
resources[0], err = pool.Acquire(context.Background())
@@ -291,7 +268,7 @@ func TestPoolAcquireAllIdle(t *testing.T) {
resources[0].Release()
resources[3].Release()
assert.ElementsMatch(t, []*puddle.Resource{resources[0], resources[3]}, pool.AcquireAllIdle())
assert.ElementsMatch(t, []*puddleg.Resource[int]{resources[0], resources[3]}, pool.AcquireAllIdle())
resources[0].Release()
resources[3].Release()
@@ -308,14 +285,14 @@ func TestPoolAcquireAllIdle(t *testing.T) {
func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
pool.Close()
assert.Nil(t, pool.AcquireAllIdle())
}
func TestPoolCreateResource(t *testing.T) {
constructor, counter := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
defer pool.Close()
var err error
@@ -339,10 +316,10 @@ func TestPoolCreateResource(t *testing.T) {
func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
errCreateFailed := errors.New("create failed")
constructor := func(ctx context.Context) (interface{}, error) {
return nil, errCreateFailed
constructor := func(ctx context.Context) (int, error) {
return 0, errCreateFailed
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
err := pool.CreateResource(context.Background())
assert.Equal(t, errCreateFailed, err)
@@ -350,20 +327,20 @@ func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
pool.Close()
err := pool.CreateResource(context.Background())
assert.Equal(t, puddle.ErrClosedPool, err)
assert.Equal(t, puddleg.ErrClosedPool, err)
}
func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) {
// There is no way to guarantee the correct order of the pool being closed while the resource is being constructed.
// But these sleeps should make it extremely likely. (Ah, the lengths we go for 100% test coverage...)
constructor := func(ctx context.Context) (interface{}, error) {
constructor := func(ctx context.Context) (int, error) {
time.Sleep(500 * time.Millisecond)
return "abc", nil
return 123, nil
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
acquireErrChan := make(chan error)
go func() {
@@ -375,20 +352,20 @@ func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testin
pool.Close()
err := <-acquireErrChan
assert.Equal(t, puddle.ErrClosedPool, err)
assert.Equal(t, puddleg.ErrClosedPool, err)
}
func TestPoolCloseClosesAllIdleResources(t *testing.T) {
constructor, _ := createConstructor()
var destructorCalls Counter
destructor := func(interface{}) {
destructor := func(int) {
destructorCalls.Next()
}
p := puddle.NewPool(constructor, destructor, 10)
p := puddleg.NewPool(constructor, destructor, 10)
resources := make([]*puddle.Resource, 4)
resources := make([]*puddleg.Resource[int], 4)
for i := range resources {
var err error
resources[i], err = p.Acquire(context.Background())
@@ -407,13 +384,13 @@ func TestPoolCloseClosesAllIdleResources(t *testing.T) {
func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
constructor, _ := createConstructor()
var destructorCalls Counter
destructor := func(interface{}) {
destructor := func(int) {
destructorCalls.Next()
}
p := puddle.NewPool(constructor, destructor, 10)
p := puddleg.NewPool(constructor, destructor, 10)
resources := make([]*puddle.Resource, 4)
resources := make([]*puddleg.Resource[int], 4)
for i := range resources {
var err error
resources[i], err = p.Acquire(context.Background())
@@ -421,7 +398,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
}
for _, res := range resources {
go func(res *puddle.Resource) {
go func(res *puddleg.Resource[int]) {
time.Sleep(100 * time.Millisecond)
res.Release()
}(res)
@@ -434,7 +411,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
constructor, _ := createConstructor()
p := puddle.NewPool(constructor, stubDestructor, 10)
p := puddleg.NewPool(constructor, stubDestructor, 10)
p.Close()
p.Close()
@@ -446,7 +423,7 @@ func TestPoolStatResources(t *testing.T) {
endWaitChan := make(chan struct{})
var constructorCalls Counter
constructor := func(ctx context.Context) (interface{}, error) {
constructor := func(ctx context.Context) (int, error) {
select {
case <-startWaitChan:
close(waitingChan)
@@ -456,7 +433,7 @@ func TestPoolStatResources(t *testing.T) {
return constructorCalls.Next(), nil
}
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
defer pool.Close()
resAcquired, err := pool.Acquire(context.Background())
@@ -491,7 +468,7 @@ func TestPoolStatResources(t *testing.T) {
func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
defer pool.Close()
res, err := pool.Acquire(context.Background())
@@ -537,7 +514,7 @@ func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
defer pool.Close()
ctx, cancel := context.WithCancel(context.Background())
@@ -551,12 +528,12 @@ func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
}
func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
constructor := func(ctx context.Context) (interface{}, error) {
constructor := func(ctx context.Context) (int, error) {
<-ctx.Done()
return nil, ctx.Err()
return 0, ctx.Err()
}
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
defer pool.Close()
ctx, cancel := context.WithCancel(context.Background())
@@ -571,7 +548,7 @@ func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
defer pool.Close()
res, err := pool.Acquire(context.Background())
@@ -592,11 +569,11 @@ func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
constructor, _ := createConstructor()
var destructorCalls Counter
destructor := func(interface{}) {
destructor := func(int) {
destructorCalls.Next()
}
pool := puddle.NewPool(constructor, destructor, 10)
pool := puddleg.NewPool(constructor, destructor, 10)
res, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -615,7 +592,7 @@ func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -635,7 +612,7 @@ func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
func TestResourceLastUsageTimeTracking(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 1)
pool := puddleg.NewPool(constructor, stubDestructor, 1)
res, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -669,7 +646,7 @@ func TestResourceLastUsageTimeTracking(t *testing.T) {
func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
res, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -687,11 +664,11 @@ func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, 10)
pool := puddleg.NewPool(constructor, stubDestructor, 10)
pool.Close()
res, err := pool.Acquire(context.Background())
assert.Equal(t, puddle.ErrClosedPool, err)
assert.Equal(t, puddleg.ErrClosedPool, err)
assert.Nil(t, res)
}
@@ -705,7 +682,7 @@ func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) {
}
destructor := func(value interface{}) {}
pool := puddle.NewPool(constructor, destructor, 1)
pool := puddleg.NewPool(constructor, destructor, 1)
res1, err := pool.Acquire(context.Background())
require.NoError(t, err)
@@ -728,7 +705,7 @@ func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) {
func TestStress(t *testing.T) {
constructor, _ := createConstructor()
var destructorCalls Counter
destructor := func(interface{}) {
destructor := func(int) {
destructorCalls.Next()
}
@@ -737,16 +714,16 @@ func TestStress(t *testing.T) {
poolSize = 4
}
pool := puddle.NewPool(constructor, destructor, int32(poolSize))
pool := puddleg.NewPool(constructor, destructor, int32(poolSize))
finishChan := make(chan struct{})
wg := &sync.WaitGroup{}
releaseOrDestroyOrHijack := func(res *puddle.Resource) {
releaseOrDestroyOrHijack := func(res *puddleg.Resource[int]) {
n := rand.Intn(100)
if n < 5 {
res.Hijack()
destructor(res)
destructor(res.Value())
} else if n < 10 {
res.Destroy()
} else {
@@ -759,7 +736,7 @@ func TestStress(t *testing.T) {
func() {
res, err := pool.Acquire(context.Background())
if err != nil {
if err != puddle.ErrClosedPool {
if err != puddleg.ErrClosedPool {
assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err)
}
return
@@ -774,7 +751,7 @@ func TestStress(t *testing.T) {
defer cancel()
res, err := pool.Acquire(ctx)
if err != nil {
if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
if err != puddleg.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err)
}
return
@@ -787,7 +764,7 @@ func TestStress(t *testing.T) {
func() {
res, err := pool.TryAcquire(context.Background())
if err != nil {
if err != puddle.ErrClosedPool && err != puddle.ErrNotAvailable {
if err != puddleg.ErrClosedPool && err != puddleg.ErrNotAvailable {
assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err)
}
return
@@ -873,7 +850,7 @@ func ExamplePool() {
}
maxPoolSize := int32(10)
pool := puddle.NewPool(constructor, destructor, maxPoolSize)
pool := puddleg.NewPool(constructor, destructor, maxPoolSize)
// Use pool multiple times
for i := 0; i < 10; i++ {
@@ -972,7 +949,7 @@ func BenchmarkPoolAcquireAndRelease(b *testing.B) {
wg := &sync.WaitGroup{}
constructor, _ := createConstructor()
pool := puddle.NewPool(constructor, stubDestructor, bm.poolSize)
pool := puddleg.NewPool(constructor, stubDestructor, bm.poolSize)
for i := 0; i < bm.clientCount; i++ {
wg.Add(1)