diff --git a/README.md b/README.md index 8ece49f..9361786 100644 --- a/README.md +++ b/README.md @@ -22,15 +22,15 @@ own. ## Example Usage ```go -constructor := func(context.Context) (interface{}, error) { +constructor := func(context.Context) (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:8080") } -destructor := func(value interface{}) { - value.(net.Conn).Close() +destructor := func(value net.Conn) { + value.Close() } maxPoolSize := 10 -pool := puddle.NewPool(constructor, destructor, maxPoolSize) +pool := puddleg.NewPool(constructor, destructor, maxPoolSize) // Acquire resource from the pool. res, err := pool.Acquire(context.Background()) @@ -39,7 +39,7 @@ if err != nil { } // Use resource. -_, err = res.Value().(net.Conn).Write([]byte{1}) +_, err = res.Value().Write([]byte{1}) if err != nil { // ... } diff --git a/go.mod b/go.mod index 96005a6..463f87d 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,10 @@ module github.com/jackc/puddle -go 1.12 +go 1.18 require github.com/stretchr/testify v1.3.0 + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect +) diff --git a/pool.go b/pool.go index 1baf88c..8ec74ff 100644 --- a/pool.go +++ b/pool.go @@ -1,532 +1,35 @@ package puddle -import ( - "context" - "errors" - "sync" - "time" +import "github.com/jackc/puddle/puddleg" + +var ( + // ErrClosedPool occurs on an attempt to acquire a connection from a closed pool + // or a pool that is closed while the acquire is waiting. + ErrClosedPool = puddleg.ErrClosedPool + + // ErrNotAvailable occurs on an attempt to acquire a resource from a pool + // that is at maximum capacity and has no available resources. + ErrNotAvailable = puddleg.ErrNotAvailable ) -const ( - resourceStatusConstructing = 0 - resourceStatusIdle = iota - resourceStatusAcquired = iota - resourceStatusHijacked = iota +type ( + // Constructor is a function called by the pool to construct a resource. + Constructor = puddleg.Constructor[any] + + // Destructor is a function called by the pool to destroy a resource. + Destructor = puddleg.Destructor[any] + + // Resource is the resource handle returned by acquiring from the pool. + Resource = puddleg.Resource[any] + + // Pool is a concurrency-safe resource pool. + Pool = puddleg.Pool[any] + + // Stat is a snapshot of Pool statistics. + Stat = puddleg.Stat ) -// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool -// or a pool that is closed while the acquire is waiting. -var ErrClosedPool = errors.New("closed pool") - -// ErrNotAvailable occurs on an attempt to acquire a resource from a pool -// that is at maximum capacity and has no available resources. -var ErrNotAvailable = errors.New("resource not available") - -// Constructor is a function called by the pool to construct a resource. -type Constructor func(ctx context.Context) (res interface{}, err error) - -// Destructor is a function called by the pool to destroy a resource. -type Destructor func(res interface{}) - -// Resource is the resource handle returned by acquiring from the pool. -type Resource struct { - value interface{} - pool *Pool - creationTime time.Time - lastUsedNano int64 - status byte -} - -// Value returns the resource value. -func (res *Resource) Value() interface{} { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - return res.value -} - -// Release returns the resource to the pool. res must not be subsequently used. -func (res *Resource) Release() { - if res.status != resourceStatusAcquired { - panic("tried to release resource that is not acquired") - } - res.pool.releaseAcquiredResource(res, nanotime()) -} - -// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime -// will not change. res must not be subsequently used. -func (res *Resource) ReleaseUnused() { - if res.status != resourceStatusAcquired { - panic("tried to release resource that is not acquired") - } - res.pool.releaseAcquiredResource(res, res.lastUsedNano) -} - -// Destroy returns the resource to the pool for destruction. res must not be -// subsequently used. -func (res *Resource) Destroy() { - if res.status != resourceStatusAcquired { - panic("tried to destroy resource that is not acquired") - } - go res.pool.destroyAcquiredResource(res) -} - -// Hijack assumes ownership of the resource from the pool. Caller is responsible -// for cleanup of resource value. -func (res *Resource) Hijack() { - if res.status != resourceStatusAcquired { - panic("tried to hijack resource that is not acquired") - } - res.pool.hijackAcquiredResource(res) -} - -// CreationTime returns when the resource was created by the pool. -func (res *Resource) CreationTime() time.Time { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - return res.creationTime -} - -// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time -// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with -// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead. -func (res *Resource) LastUsedNanotime() int64 { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - - return res.lastUsedNano -} - -// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting -// LastUsedNanotime to the current nanotime. -func (res *Resource) IdleDuration() time.Duration { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - - return time.Duration(nanotime() - res.lastUsedNano) -} - -// Pool is a concurrency-safe resource pool. -type Pool struct { - cond *sync.Cond - destructWG *sync.WaitGroup - - allResources []*Resource - idleResources []*Resource - - constructor Constructor - destructor Destructor - maxSize int32 - - acquireCount int64 - acquireDuration time.Duration - emptyAcquireCount int64 - canceledAcquireCount int64 - - closed bool -} - // NewPool creates a new pool. Panics if maxSize is less than 1. func NewPool(constructor Constructor, destructor Destructor, maxSize int32) *Pool { - if maxSize < 1 { - panic("maxSize is less than 1") - } - - return &Pool{ - cond: sync.NewCond(new(sync.Mutex)), - destructWG: &sync.WaitGroup{}, - maxSize: maxSize, - constructor: constructor, - destructor: destructor, - } -} - -// Close destroys all resources in the pool and rejects future Acquire calls. -// Blocks until all resources are returned to pool and destroyed. -func (p *Pool) Close() { - p.cond.L.Lock() - if p.closed { - p.cond.L.Unlock() - return - } - p.closed = true - - for _, res := range p.idleResources { - p.allResources = removeResource(p.allResources, res) - go p.destructResourceValue(res.value) - } - p.idleResources = nil - p.cond.L.Unlock() - - // Wake up all go routines waiting for a resource to be returned so they can terminate. - p.cond.Broadcast() - - p.destructWG.Wait() -} - -// Stat is a snapshot of Pool statistics. -type Stat struct { - constructingResources int32 - acquiredResources int32 - idleResources int32 - maxResources int32 - acquireCount int64 - acquireDuration time.Duration - emptyAcquireCount int64 - canceledAcquireCount int64 -} - -// TotalResource returns the total number of resources currently in the pool. -// The value is the sum of ConstructingResources, AcquiredResources, and -// IdleResources. -func (s *Stat) TotalResources() int32 { - return s.constructingResources + s.acquiredResources + s.idleResources -} - -// ConstructingResources returns the number of resources with construction in progress in -// the pool. -func (s *Stat) ConstructingResources() int32 { - return s.constructingResources -} - -// AcquiredResources returns the number of currently acquired resources in the pool. -func (s *Stat) AcquiredResources() int32 { - return s.acquiredResources -} - -// IdleResources returns the number of currently idle resources in the pool. -func (s *Stat) IdleResources() int32 { - return s.idleResources -} - -// MaxResources returns the maximum size of the pool. -func (s *Stat) MaxResources() int32 { - return s.maxResources -} - -// AcquireCount returns the cumulative count of successful acquires from the pool. -func (s *Stat) AcquireCount() int64 { - return s.acquireCount -} - -// AcquireDuration returns the total duration of all successful acquires from -// the pool. -func (s *Stat) AcquireDuration() time.Duration { - return s.acquireDuration -} - -// EmptyAcquireCount returns the cumulative count of successful acquires from the pool -// that waited for a resource to be released or constructed because the pool was -// empty. -func (s *Stat) EmptyAcquireCount() int64 { - return s.emptyAcquireCount -} - -// CanceledAcquireCount returns the cumulative count of acquires from the pool -// that were canceled by a context. -func (s *Stat) CanceledAcquireCount() int64 { - return s.canceledAcquireCount -} - -// Stat returns the current pool statistics. -func (p *Pool) Stat() *Stat { - p.cond.L.Lock() - s := &Stat{ - maxResources: p.maxSize, - acquireCount: p.acquireCount, - emptyAcquireCount: p.emptyAcquireCount, - canceledAcquireCount: p.canceledAcquireCount, - acquireDuration: p.acquireDuration, - } - - for _, res := range p.allResources { - switch res.status { - case resourceStatusConstructing: - s.constructingResources += 1 - case resourceStatusIdle: - s.idleResources += 1 - case resourceStatusAcquired: - s.acquiredResources += 1 - } - } - - p.cond.L.Unlock() - return s -} - -// Acquire gets a resource from the pool. If no resources are available and the pool -// is not at maximum capacity it will create a new resource. If the pool is at -// maximum capacity it will block until a resource is available. ctx can be used -// to cancel the Acquire. -func (p *Pool) Acquire(ctx context.Context) (*Resource, error) { - startNano := nanotime() - if doneChan := ctx.Done(); doneChan != nil { - select { - case <-ctx.Done(): - p.cond.L.Lock() - p.canceledAcquireCount += 1 - p.cond.L.Unlock() - return nil, ctx.Err() - default: - } - } - - p.cond.L.Lock() - - emptyAcquire := false - - for { - if p.closed { - p.cond.L.Unlock() - return nil, ErrClosedPool - } - - // If a resource is available now - if len(p.idleResources) > 0 { - res := p.idleResources[len(p.idleResources)-1] - p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak - p.idleResources = p.idleResources[:len(p.idleResources)-1] - res.status = resourceStatusAcquired - if emptyAcquire { - p.emptyAcquireCount += 1 - } - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) - p.cond.L.Unlock() - return res, nil - } - - emptyAcquire = true - - // If there is room to create a resource do so - if len(p.allResources) < int(p.maxSize) { - res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} - p.allResources = append(p.allResources, res) - p.destructWG.Add(1) - p.cond.L.Unlock() - - value, err := p.constructResourceValue(ctx) - p.cond.L.Lock() - if err != nil { - p.allResources = removeResource(p.allResources, res) - p.destructWG.Done() - - select { - case <-ctx.Done(): - if err == ctx.Err() { - p.canceledAcquireCount += 1 - } - default: - } - - p.cond.L.Unlock() - p.cond.Signal() - return nil, err - } - - res.value = value - res.status = resourceStatusAcquired - p.emptyAcquireCount += 1 - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) - p.cond.L.Unlock() - return res, nil - } - - if ctx.Done() == nil { - p.cond.Wait() - } else { - // Convert p.cond.Wait into a channel - waitChan := make(chan struct{}, 1) - go func() { - p.cond.Wait() - waitChan <- struct{}{} - }() - - select { - case <-ctx.Done(): - // Allow goroutine waiting for signal to exit. Re-signal since we couldn't - // do anything with it. Another goroutine might be waiting. - go func() { - <-waitChan - p.cond.Signal() - p.cond.L.Unlock() - }() - - p.cond.L.Lock() - p.canceledAcquireCount += 1 - p.cond.L.Unlock() - return nil, ctx.Err() - case <-waitChan: - } - } - } -} - -// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no -// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only -// used to cancel the background creation. -func (p *Pool) TryAcquire(ctx context.Context) (*Resource, error) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - if p.closed { - return nil, ErrClosedPool - } - - // If a resource is available now - if len(p.idleResources) > 0 { - res := p.idleResources[len(p.idleResources)-1] - p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak - p.idleResources = p.idleResources[:len(p.idleResources)-1] - p.acquireCount += 1 - res.status = resourceStatusAcquired - return res, nil - } - - if len(p.allResources) < int(p.maxSize) { - res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} - p.allResources = append(p.allResources, res) - p.destructWG.Add(1) - - go func() { - value, err := p.constructResourceValue(ctx) - defer p.cond.Signal() - p.cond.L.Lock() - defer p.cond.L.Unlock() - - if err != nil { - p.allResources = removeResource(p.allResources, res) - p.destructWG.Done() - return - } - - res.value = value - res.status = resourceStatusIdle - p.idleResources = append(p.idleResources, res) - }() - } - - return nil, ErrNotAvailable -} - -// AcquireAllIdle atomically acquires all currently idle resources. Its intended -// use is for health check and keep-alive functionality. It does not update pool -// statistics. -func (p *Pool) AcquireAllIdle() []*Resource { - p.cond.L.Lock() - if p.closed { - p.cond.L.Unlock() - return nil - } - - for _, res := range p.idleResources { - res.status = resourceStatusAcquired - } - resources := p.idleResources // Swap out current slice - p.idleResources = nil - - p.cond.L.Unlock() - return resources -} - -// CreateResource constructs a new resource without acquiring it. -// It goes straight in the IdlePool. It does not check against maxSize. -// It can be useful to maintain warm resources under little load. -func (p *Pool) CreateResource(ctx context.Context) error { - p.cond.L.Lock() - if p.closed { - p.cond.L.Unlock() - return ErrClosedPool - } - p.cond.L.Unlock() - - value, err := p.constructResourceValue(ctx) - if err != nil { - return err - } - - res := &Resource{ - pool: p, - creationTime: time.Now(), - status: resourceStatusIdle, - value: value, - lastUsedNano: nanotime(), - } - p.destructWG.Add(1) - - p.cond.L.Lock() - // If closed while constructing resource then destroy it and return an error - if p.closed { - go p.destructResourceValue(res.value) - p.cond.L.Unlock() - return ErrClosedPool - } - p.allResources = append(p.allResources, res) - p.idleResources = append(p.idleResources, res) - p.cond.L.Unlock() - - return nil -} - -// releaseAcquiredResource returns res to the the pool. -func (p *Pool) releaseAcquiredResource(res *Resource, lastUsedNano int64) { - p.cond.L.Lock() - - if !p.closed { - res.lastUsedNano = lastUsedNano - res.status = resourceStatusIdle - p.idleResources = append(p.idleResources, res) - } else { - p.allResources = removeResource(p.allResources, res) - go p.destructResourceValue(res.value) - } - - p.cond.L.Unlock() - p.cond.Signal() -} - -// Remove removes res from the pool and closes it. If res is not part of the -// pool Remove will panic. -func (p *Pool) destroyAcquiredResource(res *Resource) { - p.destructResourceValue(res.value) - p.cond.L.Lock() - p.allResources = removeResource(p.allResources, res) - p.cond.L.Unlock() - p.cond.Signal() -} - -func (p *Pool) hijackAcquiredResource(res *Resource) { - p.cond.L.Lock() - - p.allResources = removeResource(p.allResources, res) - res.status = resourceStatusHijacked - p.destructWG.Done() // not responsible for destructing hijacked resources - - p.cond.L.Unlock() - p.cond.Signal() -} - -func removeResource(slice []*Resource, res *Resource) []*Resource { - for i := range slice { - if slice[i] == res { - slice[i] = slice[len(slice)-1] - slice[len(slice)-1] = nil // Avoid memory leak - return slice[:len(slice)-1] - } - } - - panic("BUG: removeResource could not find res in slice") -} - -func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) { - return p.constructor(ctx) -} - -func (p *Pool) destructResourceValue(value interface{}) { - p.destructor(value) - p.destructWG.Done() + return puddleg.NewPool(constructor, destructor, maxSize) } diff --git a/puddleg/doc.go b/puddleg/doc.go new file mode 100644 index 0000000..a30beec --- /dev/null +++ b/puddleg/doc.go @@ -0,0 +1,11 @@ +// Package puddleg is a generic resource pool with type-parametrized api. +/* + +Puddle is a tiny generic resource pool library for Go that uses the standard +context library to signal cancellation of acquires. It is designed to contain +the minimum functionality a resource pool needs that cannot be implemented +without concurrency concerns. For example, a database connection pool may use +puddle internally and implement health checks and keep-alive behavior without +needing to implement any concurrent code of its own. +*/ +package puddleg diff --git a/internal_test.go b/puddleg/internal_test.go similarity index 56% rename from internal_test.go rename to puddleg/internal_test.go index cd8bc78..bd01925 100644 --- a/internal_test.go +++ b/puddleg/internal_test.go @@ -1,4 +1,4 @@ -package puddle +package puddleg import ( "testing" @@ -7,6 +7,6 @@ import ( ) func TestRemoveResourcePanicsWithBugReportIfResourceDoesNotExist(t *testing.T) { - s := []*Resource{new(Resource), new(Resource), new(Resource)} - assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource)) }) + s := []*Resource[any]{new(Resource[any]), new(Resource[any]), new(Resource[any])} + assert.PanicsWithValue(t, "BUG: removeResource could not find res in slice", func() { removeResource(s, new(Resource[any])) }) } diff --git a/nanotime_time.go b/puddleg/nanotime_time.go similarity index 74% rename from nanotime_time.go rename to puddleg/nanotime_time.go index 2bca251..12d6f62 100644 --- a/nanotime_time.go +++ b/puddleg/nanotime_time.go @@ -1,8 +1,8 @@ -// +build purego appengine js +//go:build purego || appengine || js // This file contains the safe implementation of nanotime using time.Now(). -package puddle +package puddleg import ( "time" diff --git a/nanotime_unsafe.go b/puddleg/nanotime_unsafe.go similarity index 76% rename from nanotime_unsafe.go rename to puddleg/nanotime_unsafe.go index 99d80ee..edaf500 100644 --- a/nanotime_unsafe.go +++ b/puddleg/nanotime_unsafe.go @@ -1,8 +1,8 @@ -// +build !purego,!appengine,!js +//go:build !purego && !appengine && !js // This file contains the implementation of nanotime using runtime.nanotime. -package puddle +package puddleg import "unsafe" diff --git a/puddleg/pool.go b/puddleg/pool.go new file mode 100644 index 0000000..ac761c6 --- /dev/null +++ b/puddleg/pool.go @@ -0,0 +1,532 @@ +package puddleg + +import ( + "context" + "errors" + "sync" + "time" +) + +const ( + resourceStatusConstructing = 0 + resourceStatusIdle = iota + resourceStatusAcquired = iota + resourceStatusHijacked = iota +) + +// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool +// or a pool that is closed while the acquire is waiting. +var ErrClosedPool = errors.New("closed pool") + +// ErrNotAvailable occurs on an attempt to acquire a resource from a pool +// that is at maximum capacity and has no available resources. +var ErrNotAvailable = errors.New("resource not available") + +// Constructor is a function called by the pool to construct a resource. +type Constructor[T any] func(ctx context.Context) (res T, err error) + +// Destructor is a function called by the pool to destroy a resource. +type Destructor[T any] func(res T) + +// Resource is the resource handle returned by acquiring from the pool. +type Resource[T any] struct { + value T + pool *Pool[T] + creationTime time.Time + lastUsedNano int64 + status byte +} + +// Value returns the resource value. +func (res *Resource[T]) Value() T { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.value +} + +// Release returns the resource to the pool. res must not be subsequently used. +func (res *Resource[T]) Release() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, nanotime()) +} + +// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime +// will not change. res must not be subsequently used. +func (res *Resource[T]) ReleaseUnused() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, res.lastUsedNano) +} + +// Destroy returns the resource to the pool for destruction. res must not be +// subsequently used. +func (res *Resource[T]) Destroy() { + if res.status != resourceStatusAcquired { + panic("tried to destroy resource that is not acquired") + } + go res.pool.destroyAcquiredResource(res) +} + +// Hijack assumes ownership of the resource from the pool. Caller is responsible +// for cleanup of resource value. +func (res *Resource[T]) Hijack() { + if res.status != resourceStatusAcquired { + panic("tried to hijack resource that is not acquired") + } + res.pool.hijackAcquiredResource(res) +} + +// CreationTime returns when the resource was created by the pool. +func (res *Resource[T]) CreationTime() time.Time { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.creationTime +} + +// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time +// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with +// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead. +func (res *Resource[T]) LastUsedNanotime() int64 { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return res.lastUsedNano +} + +// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting +// LastUsedNanotime to the current nanotime. +func (res *Resource[T]) IdleDuration() time.Duration { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return time.Duration(nanotime() - res.lastUsedNano) +} + +// Pool is a concurrency-safe resource pool. +type Pool[T any] struct { + cond *sync.Cond + destructWG *sync.WaitGroup + + allResources []*Resource[T] + idleResources []*Resource[T] + + constructor Constructor[T] + destructor Destructor[T] + maxSize int32 + + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount int64 + + closed bool +} + +// NewPool creates a new pool. Panics if maxSize is less than 1. +func NewPool[T any](constructor Constructor[T], destructor Destructor[T], maxSize int32) *Pool[T] { + if maxSize < 1 { + panic("maxSize is less than 1") + } + + return &Pool[T]{ + cond: sync.NewCond(new(sync.Mutex)), + destructWG: &sync.WaitGroup{}, + maxSize: maxSize, + constructor: constructor, + destructor: destructor, + } +} + +// Close destroys all resources in the pool and rejects future Acquire calls. +// Blocks until all resources are returned to pool and destroyed. +func (p *Pool[T]) Close() { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return + } + p.closed = true + + for _, res := range p.idleResources { + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + } + p.idleResources = nil + p.cond.L.Unlock() + + // Wake up all go routines waiting for a resource to be returned so they can terminate. + p.cond.Broadcast() + + p.destructWG.Wait() +} + +// Stat is a snapshot of Pool statistics. +type Stat struct { + constructingResources int32 + acquiredResources int32 + idleResources int32 + maxResources int32 + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount int64 +} + +// TotalResources returns the total number of resources currently in the pool. +// The value is the sum of ConstructingResources, AcquiredResources, and +// IdleResources. +func (s *Stat) TotalResources() int32 { + return s.constructingResources + s.acquiredResources + s.idleResources +} + +// ConstructingResources returns the number of resources with construction in progress in +// the pool. +func (s *Stat) ConstructingResources() int32 { + return s.constructingResources +} + +// AcquiredResources returns the number of currently acquired resources in the pool. +func (s *Stat) AcquiredResources() int32 { + return s.acquiredResources +} + +// IdleResources returns the number of currently idle resources in the pool. +func (s *Stat) IdleResources() int32 { + return s.idleResources +} + +// MaxResources returns the maximum size of the pool. +func (s *Stat) MaxResources() int32 { + return s.maxResources +} + +// AcquireCount returns the cumulative count of successful acquires from the pool. +func (s *Stat) AcquireCount() int64 { + return s.acquireCount +} + +// AcquireDuration returns the total duration of all successful acquires from +// the pool. +func (s *Stat) AcquireDuration() time.Duration { + return s.acquireDuration +} + +// EmptyAcquireCount returns the cumulative count of successful acquires from the pool +// that waited for a resource to be released or constructed because the pool was +// empty. +func (s *Stat) EmptyAcquireCount() int64 { + return s.emptyAcquireCount +} + +// CanceledAcquireCount returns the cumulative count of acquires from the pool +// that were canceled by a context. +func (s *Stat) CanceledAcquireCount() int64 { + return s.canceledAcquireCount +} + +// Stat returns the current pool statistics. +func (p *Pool[T]) Stat() *Stat { + p.cond.L.Lock() + s := &Stat{ + maxResources: p.maxSize, + acquireCount: p.acquireCount, + emptyAcquireCount: p.emptyAcquireCount, + canceledAcquireCount: p.canceledAcquireCount, + acquireDuration: p.acquireDuration, + } + + for _, res := range p.allResources { + switch res.status { + case resourceStatusConstructing: + s.constructingResources += 1 + case resourceStatusIdle: + s.idleResources += 1 + case resourceStatusAcquired: + s.acquiredResources += 1 + } + } + + p.cond.L.Unlock() + return s +} + +// Acquire gets a resource from the pool. If no resources are available and the pool +// is not at maximum capacity it will create a new resource. If the pool is at +// maximum capacity it will block until a resource is available. ctx can be used +// to cancel the Acquire. +func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { + startNano := nanotime() + if doneChan := ctx.Done(); doneChan != nil { + select { + case <-ctx.Done(): + p.cond.L.Lock() + p.canceledAcquireCount += 1 + p.cond.L.Unlock() + return nil, ctx.Err() + default: + } + } + + p.cond.L.Lock() + + emptyAcquire := false + + for { + if p.closed { + p.cond.L.Unlock() + return nil, ErrClosedPool + } + + // If a resource is available now + if len(p.idleResources) > 0 { + res := p.idleResources[len(p.idleResources)-1] + p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak + p.idleResources = p.idleResources[:len(p.idleResources)-1] + res.status = resourceStatusAcquired + if emptyAcquire { + p.emptyAcquireCount += 1 + } + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.cond.L.Unlock() + return res, nil + } + + emptyAcquire = true + + // If there is room to create a resource do so + if len(p.allResources) < int(p.maxSize) { + res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + p.allResources = append(p.allResources, res) + p.destructWG.Add(1) + p.cond.L.Unlock() + + value, err := p.constructResourceValue(ctx) + p.cond.L.Lock() + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + + select { + case <-ctx.Done(): + if err == ctx.Err() { + p.canceledAcquireCount += 1 + } + default: + } + + p.cond.L.Unlock() + p.cond.Signal() + return nil, err + } + + res.value = value + res.status = resourceStatusAcquired + p.emptyAcquireCount += 1 + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.cond.L.Unlock() + return res, nil + } + + if ctx.Done() == nil { + p.cond.Wait() + } else { + // Convert p.cond.Wait into a channel + waitChan := make(chan struct{}, 1) + go func() { + p.cond.Wait() + waitChan <- struct{}{} + }() + + select { + case <-ctx.Done(): + // Allow goroutine waiting for signal to exit. Re-signal since we couldn't + // do anything with it. Another goroutine might be waiting. + go func() { + <-waitChan + p.cond.Signal() + p.cond.L.Unlock() + }() + + p.cond.L.Lock() + p.canceledAcquireCount += 1 + p.cond.L.Unlock() + return nil, ctx.Err() + case <-waitChan: + } + } + } +} + +// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no +// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only +// used to cancel the background creation. +func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if p.closed { + return nil, ErrClosedPool + } + + // If a resource is available now + if len(p.idleResources) > 0 { + res := p.idleResources[len(p.idleResources)-1] + p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak + p.idleResources = p.idleResources[:len(p.idleResources)-1] + p.acquireCount += 1 + res.status = resourceStatusAcquired + return res, nil + } + + if len(p.allResources) < int(p.maxSize) { + res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + p.allResources = append(p.allResources, res) + p.destructWG.Add(1) + + go func() { + value, err := p.constructResourceValue(ctx) + defer p.cond.Signal() + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + return + } + + res.value = value + res.status = resourceStatusIdle + p.idleResources = append(p.idleResources, res) + }() + } + + return nil, ErrNotAvailable +} + +// AcquireAllIdle atomically acquires all currently idle resources. Its intended +// use is for health check and keep-alive functionality. It does not update pool +// statistics. +func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return nil + } + + for _, res := range p.idleResources { + res.status = resourceStatusAcquired + } + resources := p.idleResources // Swap out current slice + p.idleResources = nil + + p.cond.L.Unlock() + return resources +} + +// CreateResource constructs a new resource without acquiring it. +// It goes straight in the IdlePool. It does not check against maxSize. +// It can be useful to maintain warm resources under little load. +func (p *Pool[T]) CreateResource(ctx context.Context) error { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return ErrClosedPool + } + p.cond.L.Unlock() + + value, err := p.constructResourceValue(ctx) + if err != nil { + return err + } + + res := &Resource[T]{ + pool: p, + creationTime: time.Now(), + status: resourceStatusIdle, + value: value, + lastUsedNano: nanotime(), + } + p.destructWG.Add(1) + + p.cond.L.Lock() + // If closed while constructing resource then destroy it and return an error + if p.closed { + go p.destructResourceValue(res.value) + p.cond.L.Unlock() + return ErrClosedPool + } + p.allResources = append(p.allResources, res) + p.idleResources = append(p.idleResources, res) + p.cond.L.Unlock() + + return nil +} + +// releaseAcquiredResource returns res to the the pool. +func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { + p.cond.L.Lock() + + if !p.closed { + res.lastUsedNano = lastUsedNano + res.status = resourceStatusIdle + p.idleResources = append(p.idleResources, res) + } else { + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + } + + p.cond.L.Unlock() + p.cond.Signal() +} + +// Remove removes res from the pool and closes it. If res is not part of the +// pool Remove will panic. +func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { + p.destructResourceValue(res.value) + p.cond.L.Lock() + p.allResources = removeResource(p.allResources, res) + p.cond.L.Unlock() + p.cond.Signal() +} + +func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { + p.cond.L.Lock() + + p.allResources = removeResource(p.allResources, res) + res.status = resourceStatusHijacked + p.destructWG.Done() // not responsible for destructing hijacked resources + + p.cond.L.Unlock() + p.cond.Signal() +} + +func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] { + for i := range slice { + if slice[i] == res { + slice[i] = slice[len(slice)-1] + slice[len(slice)-1] = nil // Avoid memory leak + return slice[:len(slice)-1] + } + } + + panic("BUG: removeResource could not find res in slice") +} + +func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) { + return p.constructor(ctx) +} + +func (p *Pool[T]) destructResourceValue(value T) { + p.destructor(value) + p.destructWG.Done() +} diff --git a/pool_test.go b/puddleg/pool_test.go similarity index 82% rename from pool_test.go rename to puddleg/pool_test.go index b0d07c4..9bfbcd7 100644 --- a/pool_test.go +++ b/puddleg/pool_test.go @@ -1,4 +1,4 @@ -package puddle_test +package puddleg_test import ( "context" @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/jackc/puddle" + "github.com/jackc/puddle/puddleg" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -41,48 +41,25 @@ func (c *Counter) Value() int { return n } -func createConstructor() (puddle.Constructor, *Counter) { +func createConstructor() (puddleg.Constructor[int], *Counter) { var c Counter - f := func(ctx context.Context) (interface{}, error) { + f := func(ctx context.Context) (int, error) { return c.Next(), nil } return f, &c } -func createConstructorWithNotifierChan() (puddle.Constructor, *Counter, chan int) { - ch := make(chan int) - var c Counter - f := func(ctx context.Context) (interface{}, error) { - n := c.Next() - - // Because the tests will not read from ch until after the create function f returns. - go func() { ch <- n }() - - return n, nil - } - return f, &c, ch -} - -func stubDestructor(interface{}) {} - -func waitForRead(ch chan int) bool { - select { - case <-ch: - return true - case <-time.NewTimer(time.Second).C: - return false - } -} +func stubDestructor(int) {} func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) { constructor, _ := createConstructor() - assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, -1) }) - assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, 0) }) + assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, -1) }) + assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, 0) }) } func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) defer pool.Close() res, err := pool.Acquire(context.Background()) @@ -94,7 +71,7 @@ func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) { func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) { constructor, createCounter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) wg := &sync.WaitGroup{} @@ -119,7 +96,7 @@ func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) func TestPoolAcquireWithCancellableContext(t *testing.T) { constructor, createCounter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) wg := &sync.WaitGroup{} @@ -146,10 +123,10 @@ func TestPoolAcquireWithCancellableContext(t *testing.T) { func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) { errCreateFailed := errors.New("create failed") - constructor := func(ctx context.Context) (interface{}, error) { - return nil, errCreateFailed + constructor := func(ctx context.Context) (int, error) { + return 0, errCreateFailed } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.Acquire(context.Background()) assert.Equal(t, errCreateFailed, err) @@ -158,7 +135,7 @@ func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) { func TestPoolAcquireReusesResources(t *testing.T) { constructor, createCounter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -177,11 +154,11 @@ func TestPoolAcquireReusesResources(t *testing.T) { func TestPoolTryAcquire(t *testing.T) { constructor, createCounter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) // Pool is initially empty so TryAcquire fails but starts construction of resource in the background. res, err := pool.TryAcquire(context.Background()) - require.EqualError(t, err, puddle.ErrNotAvailable.Error()) + require.EqualError(t, err, puddleg.ErrNotAvailable.Error()) assert.Nil(t, res) // Wait for background creation to complete. @@ -193,7 +170,7 @@ func TestPoolTryAcquire(t *testing.T) { defer res.Release() res, err = pool.TryAcquire(context.Background()) - require.EqualError(t, err, puddle.ErrNotAvailable.Error()) + require.EqualError(t, err, puddleg.ErrNotAvailable.Error()) assert.Nil(t, res) assert.Equal(t, 1, createCounter.Value()) @@ -201,29 +178,29 @@ func TestPoolTryAcquire(t *testing.T) { func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) pool.Close() res, err := pool.TryAcquire(context.Background()) - assert.Equal(t, puddle.ErrClosedPool, err) + assert.Equal(t, puddleg.ErrClosedPool, err) assert.Nil(t, res) } func TestPoolTryAcquireWithFailedResourceCreate(t *testing.T) { errCreateFailed := errors.New("create failed") - constructor := func(ctx context.Context) (interface{}, error) { - return nil, errCreateFailed + constructor := func(ctx context.Context) (int, error) { + return 0, errCreateFailed } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.TryAcquire(context.Background()) - require.EqualError(t, err, puddle.ErrNotAvailable.Error()) + require.EqualError(t, err, puddleg.ErrNotAvailable.Error()) assert.Nil(t, res) } func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) { constructor, createCounter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) assert.Panics(t, func() { pool.Acquire(nil) }) @@ -236,10 +213,10 @@ func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) { } func TestPoolAcquireContextAlreadyCanceled(t *testing.T) { - constructor := func(ctx context.Context) (interface{}, error) { + constructor := func(ctx context.Context) (int, error) { panic("should never be called") } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -254,15 +231,15 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) { timeoutChan := time.After(1 * time.Second) var constructorCalls Counter - constructor := func(ctx context.Context) (interface{}, error) { + constructor := func(ctx context.Context) (int, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return 0, ctx.Err() case <-timeoutChan: } return constructorCalls.Next(), nil } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.Acquire(ctx) assert.Equal(t, context.Canceled, err) @@ -271,10 +248,10 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) { func TestPoolAcquireAllIdle(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) defer pool.Close() - resources := make([]*puddle.Resource, 4) + resources := make([]*puddleg.Resource[int], 4) var err error resources[0], err = pool.Acquire(context.Background()) @@ -291,7 +268,7 @@ func TestPoolAcquireAllIdle(t *testing.T) { resources[0].Release() resources[3].Release() - assert.ElementsMatch(t, []*puddle.Resource{resources[0], resources[3]}, pool.AcquireAllIdle()) + assert.ElementsMatch(t, []*puddleg.Resource[int]{resources[0], resources[3]}, pool.AcquireAllIdle()) resources[0].Release() resources[3].Release() @@ -308,14 +285,14 @@ func TestPoolAcquireAllIdle(t *testing.T) { func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) pool.Close() assert.Nil(t, pool.AcquireAllIdle()) } func TestPoolCreateResource(t *testing.T) { constructor, counter := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) defer pool.Close() var err error @@ -339,10 +316,10 @@ func TestPoolCreateResource(t *testing.T) { func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) { errCreateFailed := errors.New("create failed") - constructor := func(ctx context.Context) (interface{}, error) { - return nil, errCreateFailed + constructor := func(ctx context.Context) (int, error) { + return 0, errCreateFailed } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) err := pool.CreateResource(context.Background()) assert.Equal(t, errCreateFailed, err) @@ -350,20 +327,20 @@ func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) { func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) pool.Close() err := pool.CreateResource(context.Background()) - assert.Equal(t, puddle.ErrClosedPool, err) + assert.Equal(t, puddleg.ErrClosedPool, err) } func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) { // There is no way to guarantee the correct order of the pool being closed while the resource is being constructed. // But these sleeps should make it extremely likely. (Ah, the lengths we go for 100% test coverage...) - constructor := func(ctx context.Context) (interface{}, error) { + constructor := func(ctx context.Context) (int, error) { time.Sleep(500 * time.Millisecond) - return "abc", nil + return 123, nil } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) acquireErrChan := make(chan error) go func() { @@ -375,20 +352,20 @@ func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testin pool.Close() err := <-acquireErrChan - assert.Equal(t, puddle.ErrClosedPool, err) + assert.Equal(t, puddleg.ErrClosedPool, err) } func TestPoolCloseClosesAllIdleResources(t *testing.T) { constructor, _ := createConstructor() var destructorCalls Counter - destructor := func(interface{}) { + destructor := func(int) { destructorCalls.Next() } - p := puddle.NewPool(constructor, destructor, 10) + p := puddleg.NewPool(constructor, destructor, 10) - resources := make([]*puddle.Resource, 4) + resources := make([]*puddleg.Resource[int], 4) for i := range resources { var err error resources[i], err = p.Acquire(context.Background()) @@ -407,13 +384,13 @@ func TestPoolCloseClosesAllIdleResources(t *testing.T) { func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) { constructor, _ := createConstructor() var destructorCalls Counter - destructor := func(interface{}) { + destructor := func(int) { destructorCalls.Next() } - p := puddle.NewPool(constructor, destructor, 10) + p := puddleg.NewPool(constructor, destructor, 10) - resources := make([]*puddle.Resource, 4) + resources := make([]*puddleg.Resource[int], 4) for i := range resources { var err error resources[i], err = p.Acquire(context.Background()) @@ -421,7 +398,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) { } for _, res := range resources { - go func(res *puddle.Resource) { + go func(res *puddleg.Resource[int]) { time.Sleep(100 * time.Millisecond) res.Release() }(res) @@ -434,7 +411,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) { func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) { constructor, _ := createConstructor() - p := puddle.NewPool(constructor, stubDestructor, 10) + p := puddleg.NewPool(constructor, stubDestructor, 10) p.Close() p.Close() @@ -446,7 +423,7 @@ func TestPoolStatResources(t *testing.T) { endWaitChan := make(chan struct{}) var constructorCalls Counter - constructor := func(ctx context.Context) (interface{}, error) { + constructor := func(ctx context.Context) (int, error) { select { case <-startWaitChan: close(waitingChan) @@ -456,7 +433,7 @@ func TestPoolStatResources(t *testing.T) { return constructorCalls.Next(), nil } - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) defer pool.Close() resAcquired, err := pool.Acquire(context.Background()) @@ -491,7 +468,7 @@ func TestPoolStatResources(t *testing.T) { func TestPoolStatSuccessfulAcquireCounters(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) defer pool.Close() res, err := pool.Acquire(context.Background()) @@ -537,7 +514,7 @@ func TestPoolStatSuccessfulAcquireCounters(t *testing.T) { func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) defer pool.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -551,12 +528,12 @@ func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) { } func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) { - constructor := func(ctx context.Context) (interface{}, error) { + constructor := func(ctx context.Context) (int, error) { <-ctx.Done() - return nil, ctx.Err() + return 0, ctx.Err() } - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) defer pool.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -571,7 +548,7 @@ func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) { func TestPoolStatCanceledAcquireDuringWait(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) defer pool.Close() res, err := pool.Acquire(context.Background()) @@ -592,11 +569,11 @@ func TestPoolStatCanceledAcquireDuringWait(t *testing.T) { func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) { constructor, _ := createConstructor() var destructorCalls Counter - destructor := func(interface{}) { + destructor := func(int) { destructorCalls.Next() } - pool := puddle.NewPool(constructor, destructor, 10) + pool := puddleg.NewPool(constructor, destructor, 10) res, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -615,7 +592,7 @@ func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) { func TestResourceDestroyRemovesResourceFromPool(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -635,7 +612,7 @@ func TestResourceDestroyRemovesResourceFromPool(t *testing.T) { func TestResourceLastUsageTimeTracking(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 1) + pool := puddleg.NewPool(constructor, stubDestructor, 1) res, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -669,7 +646,7 @@ func TestResourceLastUsageTimeTracking(t *testing.T) { func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) res, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -687,11 +664,11 @@ func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) { func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) { constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, 10) + pool := puddleg.NewPool(constructor, stubDestructor, 10) pool.Close() res, err := pool.Acquire(context.Background()) - assert.Equal(t, puddle.ErrClosedPool, err) + assert.Equal(t, puddleg.ErrClosedPool, err) assert.Nil(t, res) } @@ -705,7 +682,7 @@ func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) { } destructor := func(value interface{}) {} - pool := puddle.NewPool(constructor, destructor, 1) + pool := puddleg.NewPool(constructor, destructor, 1) res1, err := pool.Acquire(context.Background()) require.NoError(t, err) @@ -728,7 +705,7 @@ func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) { func TestStress(t *testing.T) { constructor, _ := createConstructor() var destructorCalls Counter - destructor := func(interface{}) { + destructor := func(int) { destructorCalls.Next() } @@ -737,16 +714,16 @@ func TestStress(t *testing.T) { poolSize = 4 } - pool := puddle.NewPool(constructor, destructor, int32(poolSize)) + pool := puddleg.NewPool(constructor, destructor, int32(poolSize)) finishChan := make(chan struct{}) wg := &sync.WaitGroup{} - releaseOrDestroyOrHijack := func(res *puddle.Resource) { + releaseOrDestroyOrHijack := func(res *puddleg.Resource[int]) { n := rand.Intn(100) if n < 5 { res.Hijack() - destructor(res) + destructor(res.Value()) } else if n < 10 { res.Destroy() } else { @@ -759,7 +736,7 @@ func TestStress(t *testing.T) { func() { res, err := pool.Acquire(context.Background()) if err != nil { - if err != puddle.ErrClosedPool { + if err != puddleg.ErrClosedPool { assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err) } return @@ -774,7 +751,7 @@ func TestStress(t *testing.T) { defer cancel() res, err := pool.Acquire(ctx) if err != nil { - if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded { + if err != puddleg.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded { assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err) } return @@ -787,7 +764,7 @@ func TestStress(t *testing.T) { func() { res, err := pool.TryAcquire(context.Background()) if err != nil { - if err != puddle.ErrClosedPool && err != puddle.ErrNotAvailable { + if err != puddleg.ErrClosedPool && err != puddleg.ErrNotAvailable { assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err) } return @@ -873,7 +850,7 @@ func ExamplePool() { } maxPoolSize := int32(10) - pool := puddle.NewPool(constructor, destructor, maxPoolSize) + pool := puddleg.NewPool(constructor, destructor, maxPoolSize) // Use pool multiple times for i := 0; i < 10; i++ { @@ -972,7 +949,7 @@ func BenchmarkPoolAcquireAndRelease(b *testing.B) { wg := &sync.WaitGroup{} constructor, _ := createConstructor() - pool := puddle.NewPool(constructor, stubDestructor, bm.poolSize) + pool := puddleg.NewPool(constructor, stubDestructor, bm.poolSize) for i := 0; i < bm.clientCount; i++ { wg.Add(1)