Remove compat layer and use generics directly
Backwards compatibility is not required since moving to v2.
This commit is contained in:
@@ -30,7 +30,7 @@ destructor := func(value net.Conn) {
|
|||||||
}
|
}
|
||||||
maxPoolSize := 10
|
maxPoolSize := 10
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, destructor, maxPoolSize)
|
pool := puddle.NewPool(constructor, destructor, maxPoolSize)
|
||||||
|
|
||||||
// Acquire resource from the pool.
|
// Acquire resource from the pool.
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// Package puddle is a generic resource pool.
|
// Package puddle is a generic resource pool with type-parametrized api.
|
||||||
/*
|
/*
|
||||||
|
|
||||||
Puddle is a tiny generic resource pool library for Go that uses the standard
|
Puddle is a tiny generic resource pool library for Go that uses the standard
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package puddleg
|
package puddle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
// This file contains the safe implementation of nanotime using time.Now().
|
// This file contains the safe implementation of nanotime using time.Now().
|
||||||
|
|
||||||
package puddleg
|
package puddle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
// This file contains the implementation of nanotime using runtime.nanotime.
|
// This file contains the implementation of nanotime using runtime.nanotime.
|
||||||
|
|
||||||
package puddleg
|
package puddle
|
||||||
|
|
||||||
import "unsafe"
|
import "unsafe"
|
||||||
|
|
||||||
@@ -1,35 +1,532 @@
|
|||||||
package puddle
|
package puddle
|
||||||
|
|
||||||
import "github.com/jackc/puddle/puddleg"
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
resourceStatusConstructing = 0
|
||||||
|
resourceStatusIdle = iota
|
||||||
|
resourceStatusAcquired = iota
|
||||||
|
resourceStatusHijacked = iota
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
|
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
|
||||||
// or a pool that is closed while the acquire is waiting.
|
// or a pool that is closed while the acquire is waiting.
|
||||||
ErrClosedPool = puddleg.ErrClosedPool
|
var ErrClosedPool = errors.New("closed pool")
|
||||||
|
|
||||||
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
|
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
|
||||||
// that is at maximum capacity and has no available resources.
|
// that is at maximum capacity and has no available resources.
|
||||||
ErrNotAvailable = puddleg.ErrNotAvailable
|
var ErrNotAvailable = errors.New("resource not available")
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
// Constructor is a function called by the pool to construct a resource.
|
// Constructor is a function called by the pool to construct a resource.
|
||||||
Constructor = puddleg.Constructor[any]
|
type Constructor[T any] func(ctx context.Context) (res T, err error)
|
||||||
|
|
||||||
// Destructor is a function called by the pool to destroy a resource.
|
// Destructor is a function called by the pool to destroy a resource.
|
||||||
Destructor = puddleg.Destructor[any]
|
type Destructor[T any] func(res T)
|
||||||
|
|
||||||
// Resource is the resource handle returned by acquiring from the pool.
|
// Resource is the resource handle returned by acquiring from the pool.
|
||||||
Resource = puddleg.Resource[any]
|
type Resource[T any] struct {
|
||||||
|
value T
|
||||||
|
pool *Pool[T]
|
||||||
|
creationTime time.Time
|
||||||
|
lastUsedNano int64
|
||||||
|
status byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Value returns the resource value.
|
||||||
|
func (res *Resource[T]) Value() T {
|
||||||
|
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
||||||
|
panic("tried to access resource that is not acquired or hijacked")
|
||||||
|
}
|
||||||
|
return res.value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release returns the resource to the pool. res must not be subsequently used.
|
||||||
|
func (res *Resource[T]) Release() {
|
||||||
|
if res.status != resourceStatusAcquired {
|
||||||
|
panic("tried to release resource that is not acquired")
|
||||||
|
}
|
||||||
|
res.pool.releaseAcquiredResource(res, nanotime())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
|
||||||
|
// will not change. res must not be subsequently used.
|
||||||
|
func (res *Resource[T]) ReleaseUnused() {
|
||||||
|
if res.status != resourceStatusAcquired {
|
||||||
|
panic("tried to release resource that is not acquired")
|
||||||
|
}
|
||||||
|
res.pool.releaseAcquiredResource(res, res.lastUsedNano)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy returns the resource to the pool for destruction. res must not be
|
||||||
|
// subsequently used.
|
||||||
|
func (res *Resource[T]) Destroy() {
|
||||||
|
if res.status != resourceStatusAcquired {
|
||||||
|
panic("tried to destroy resource that is not acquired")
|
||||||
|
}
|
||||||
|
go res.pool.destroyAcquiredResource(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hijack assumes ownership of the resource from the pool. Caller is responsible
|
||||||
|
// for cleanup of resource value.
|
||||||
|
func (res *Resource[T]) Hijack() {
|
||||||
|
if res.status != resourceStatusAcquired {
|
||||||
|
panic("tried to hijack resource that is not acquired")
|
||||||
|
}
|
||||||
|
res.pool.hijackAcquiredResource(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreationTime returns when the resource was created by the pool.
|
||||||
|
func (res *Resource[T]) CreationTime() time.Time {
|
||||||
|
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
||||||
|
panic("tried to access resource that is not acquired or hijacked")
|
||||||
|
}
|
||||||
|
return res.creationTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
|
||||||
|
// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
|
||||||
|
// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
|
||||||
|
func (res *Resource[T]) LastUsedNanotime() int64 {
|
||||||
|
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
||||||
|
panic("tried to access resource that is not acquired or hijacked")
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.lastUsedNano
|
||||||
|
}
|
||||||
|
|
||||||
|
// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
|
||||||
|
// LastUsedNanotime to the current nanotime.
|
||||||
|
func (res *Resource[T]) IdleDuration() time.Duration {
|
||||||
|
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
||||||
|
panic("tried to access resource that is not acquired or hijacked")
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(nanotime() - res.lastUsedNano)
|
||||||
|
}
|
||||||
|
|
||||||
// Pool is a concurrency-safe resource pool.
|
// Pool is a concurrency-safe resource pool.
|
||||||
Pool = puddleg.Pool[any]
|
type Pool[T any] struct {
|
||||||
|
cond *sync.Cond
|
||||||
|
destructWG *sync.WaitGroup
|
||||||
|
|
||||||
// Stat is a snapshot of Pool statistics.
|
allResources []*Resource[T]
|
||||||
Stat = puddleg.Stat
|
idleResources []*Resource[T]
|
||||||
)
|
|
||||||
|
constructor Constructor[T]
|
||||||
|
destructor Destructor[T]
|
||||||
|
maxSize int32
|
||||||
|
|
||||||
|
acquireCount int64
|
||||||
|
acquireDuration time.Duration
|
||||||
|
emptyAcquireCount int64
|
||||||
|
canceledAcquireCount int64
|
||||||
|
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewPool creates a new pool. Panics if maxSize is less than 1.
|
// NewPool creates a new pool. Panics if maxSize is less than 1.
|
||||||
func NewPool(constructor Constructor, destructor Destructor, maxSize int32) *Pool {
|
func NewPool[T any](constructor Constructor[T], destructor Destructor[T], maxSize int32) *Pool[T] {
|
||||||
return puddleg.NewPool(constructor, destructor, maxSize)
|
if maxSize < 1 {
|
||||||
|
panic("maxSize is less than 1")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Pool[T]{
|
||||||
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
|
destructWG: &sync.WaitGroup{},
|
||||||
|
maxSize: maxSize,
|
||||||
|
constructor: constructor,
|
||||||
|
destructor: destructor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close destroys all resources in the pool and rejects future Acquire calls.
|
||||||
|
// Blocks until all resources are returned to pool and destroyed.
|
||||||
|
func (p *Pool[T]) Close() {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
if p.closed {
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.closed = true
|
||||||
|
|
||||||
|
for _, res := range p.idleResources {
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
go p.destructResourceValue(res.value)
|
||||||
|
}
|
||||||
|
p.idleResources = nil
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
|
// Wake up all go routines waiting for a resource to be returned so they can terminate.
|
||||||
|
p.cond.Broadcast()
|
||||||
|
|
||||||
|
p.destructWG.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat is a snapshot of Pool statistics.
|
||||||
|
type Stat struct {
|
||||||
|
constructingResources int32
|
||||||
|
acquiredResources int32
|
||||||
|
idleResources int32
|
||||||
|
maxResources int32
|
||||||
|
acquireCount int64
|
||||||
|
acquireDuration time.Duration
|
||||||
|
emptyAcquireCount int64
|
||||||
|
canceledAcquireCount int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TotalResources returns the total number of resources currently in the pool.
|
||||||
|
// The value is the sum of ConstructingResources, AcquiredResources, and
|
||||||
|
// IdleResources.
|
||||||
|
func (s *Stat) TotalResources() int32 {
|
||||||
|
return s.constructingResources + s.acquiredResources + s.idleResources
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConstructingResources returns the number of resources with construction in progress in
|
||||||
|
// the pool.
|
||||||
|
func (s *Stat) ConstructingResources() int32 {
|
||||||
|
return s.constructingResources
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquiredResources returns the number of currently acquired resources in the pool.
|
||||||
|
func (s *Stat) AcquiredResources() int32 {
|
||||||
|
return s.acquiredResources
|
||||||
|
}
|
||||||
|
|
||||||
|
// IdleResources returns the number of currently idle resources in the pool.
|
||||||
|
func (s *Stat) IdleResources() int32 {
|
||||||
|
return s.idleResources
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxResources returns the maximum size of the pool.
|
||||||
|
func (s *Stat) MaxResources() int32 {
|
||||||
|
return s.maxResources
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireCount returns the cumulative count of successful acquires from the pool.
|
||||||
|
func (s *Stat) AcquireCount() int64 {
|
||||||
|
return s.acquireCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireDuration returns the total duration of all successful acquires from
|
||||||
|
// the pool.
|
||||||
|
func (s *Stat) AcquireDuration() time.Duration {
|
||||||
|
return s.acquireDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
|
||||||
|
// that waited for a resource to be released or constructed because the pool was
|
||||||
|
// empty.
|
||||||
|
func (s *Stat) EmptyAcquireCount() int64 {
|
||||||
|
return s.emptyAcquireCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanceledAcquireCount returns the cumulative count of acquires from the pool
|
||||||
|
// that were canceled by a context.
|
||||||
|
func (s *Stat) CanceledAcquireCount() int64 {
|
||||||
|
return s.canceledAcquireCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat returns the current pool statistics.
|
||||||
|
func (p *Pool[T]) Stat() *Stat {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
s := &Stat{
|
||||||
|
maxResources: p.maxSize,
|
||||||
|
acquireCount: p.acquireCount,
|
||||||
|
emptyAcquireCount: p.emptyAcquireCount,
|
||||||
|
canceledAcquireCount: p.canceledAcquireCount,
|
||||||
|
acquireDuration: p.acquireDuration,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, res := range p.allResources {
|
||||||
|
switch res.status {
|
||||||
|
case resourceStatusConstructing:
|
||||||
|
s.constructingResources += 1
|
||||||
|
case resourceStatusIdle:
|
||||||
|
s.idleResources += 1
|
||||||
|
case resourceStatusAcquired:
|
||||||
|
s.acquiredResources += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acquire gets a resource from the pool. If no resources are available and the pool
|
||||||
|
// is not at maximum capacity it will create a new resource. If the pool is at
|
||||||
|
// maximum capacity it will block until a resource is available. ctx can be used
|
||||||
|
// to cancel the Acquire.
|
||||||
|
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
||||||
|
startNano := nanotime()
|
||||||
|
if doneChan := ctx.Done(); doneChan != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.cond.L.Lock()
|
||||||
|
|
||||||
|
emptyAcquire := false
|
||||||
|
|
||||||
|
for {
|
||||||
|
if p.closed {
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return nil, ErrClosedPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a resource is available now
|
||||||
|
if len(p.idleResources) > 0 {
|
||||||
|
res := p.idleResources[len(p.idleResources)-1]
|
||||||
|
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
|
||||||
|
p.idleResources = p.idleResources[:len(p.idleResources)-1]
|
||||||
|
res.status = resourceStatusAcquired
|
||||||
|
if emptyAcquire {
|
||||||
|
p.emptyAcquireCount += 1
|
||||||
|
}
|
||||||
|
p.acquireCount += 1
|
||||||
|
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
emptyAcquire = true
|
||||||
|
|
||||||
|
// If there is room to create a resource do so
|
||||||
|
if len(p.allResources) < int(p.maxSize) {
|
||||||
|
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
||||||
|
p.allResources = append(p.allResources, res)
|
||||||
|
p.destructWG.Add(1)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
|
value, err := p.constructResourceValue(ctx)
|
||||||
|
p.cond.L.Lock()
|
||||||
|
if err != nil {
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
p.destructWG.Done()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if err == ctx.Err() {
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res.value = value
|
||||||
|
res.status = resourceStatusAcquired
|
||||||
|
p.emptyAcquireCount += 1
|
||||||
|
p.acquireCount += 1
|
||||||
|
p.acquireDuration += time.Duration(nanotime() - startNano)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Done() == nil {
|
||||||
|
p.cond.Wait()
|
||||||
|
} else {
|
||||||
|
// Convert p.cond.Wait into a channel
|
||||||
|
waitChan := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
p.cond.Wait()
|
||||||
|
waitChan <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
|
||||||
|
// do anything with it. Another goroutine might be waiting.
|
||||||
|
go func() {
|
||||||
|
<-waitChan
|
||||||
|
p.cond.Signal()
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.canceledAcquireCount += 1
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-waitChan:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
|
||||||
|
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
|
||||||
|
// used to cancel the background creation.
|
||||||
|
func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
defer p.cond.L.Unlock()
|
||||||
|
|
||||||
|
if p.closed {
|
||||||
|
return nil, ErrClosedPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a resource is available now
|
||||||
|
if len(p.idleResources) > 0 {
|
||||||
|
res := p.idleResources[len(p.idleResources)-1]
|
||||||
|
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
|
||||||
|
p.idleResources = p.idleResources[:len(p.idleResources)-1]
|
||||||
|
p.acquireCount += 1
|
||||||
|
res.status = resourceStatusAcquired
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(p.allResources) < int(p.maxSize) {
|
||||||
|
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
||||||
|
p.allResources = append(p.allResources, res)
|
||||||
|
p.destructWG.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
value, err := p.constructResourceValue(ctx)
|
||||||
|
defer p.cond.Signal()
|
||||||
|
p.cond.L.Lock()
|
||||||
|
defer p.cond.L.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
p.destructWG.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res.value = value
|
||||||
|
res.status = resourceStatusIdle
|
||||||
|
p.idleResources = append(p.idleResources, res)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, ErrNotAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireAllIdle atomically acquires all currently idle resources. Its intended
|
||||||
|
// use is for health check and keep-alive functionality. It does not update pool
|
||||||
|
// statistics.
|
||||||
|
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
if p.closed {
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, res := range p.idleResources {
|
||||||
|
res.status = resourceStatusAcquired
|
||||||
|
}
|
||||||
|
resources := p.idleResources // Swap out current slice
|
||||||
|
p.idleResources = nil
|
||||||
|
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return resources
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateResource constructs a new resource without acquiring it.
|
||||||
|
// It goes straight in the IdlePool. It does not check against maxSize.
|
||||||
|
// It can be useful to maintain warm resources under little load.
|
||||||
|
func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
if p.closed {
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return ErrClosedPool
|
||||||
|
}
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
|
value, err := p.constructResourceValue(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
res := &Resource[T]{
|
||||||
|
pool: p,
|
||||||
|
creationTime: time.Now(),
|
||||||
|
status: resourceStatusIdle,
|
||||||
|
value: value,
|
||||||
|
lastUsedNano: nanotime(),
|
||||||
|
}
|
||||||
|
p.destructWG.Add(1)
|
||||||
|
|
||||||
|
p.cond.L.Lock()
|
||||||
|
// If closed while constructing resource then destroy it and return an error
|
||||||
|
if p.closed {
|
||||||
|
go p.destructResourceValue(res.value)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return ErrClosedPool
|
||||||
|
}
|
||||||
|
p.allResources = append(p.allResources, res)
|
||||||
|
p.idleResources = append(p.idleResources, res)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// releaseAcquiredResource returns res to the the pool.
|
||||||
|
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
|
||||||
|
if !p.closed {
|
||||||
|
res.lastUsedNano = lastUsedNano
|
||||||
|
res.status = resourceStatusIdle
|
||||||
|
p.idleResources = append(p.idleResources, res)
|
||||||
|
} else {
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
go p.destructResourceValue(res.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove removes res from the pool and closes it. If res is not part of the
|
||||||
|
// pool Remove will panic.
|
||||||
|
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
|
||||||
|
p.destructResourceValue(res.value)
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
res.status = resourceStatusHijacked
|
||||||
|
p.destructWG.Done() // not responsible for destructing hijacked resources
|
||||||
|
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {
|
||||||
|
for i := range slice {
|
||||||
|
if slice[i] == res {
|
||||||
|
slice[i] = slice[len(slice)-1]
|
||||||
|
slice[len(slice)-1] = nil // Avoid memory leak
|
||||||
|
return slice[:len(slice)-1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic("BUG: removeResource could not find res in slice")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) {
|
||||||
|
return p.constructor(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool[T]) destructResourceValue(value T) {
|
||||||
|
p.destructor(value)
|
||||||
|
p.destructWG.Done()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package puddleg_test
|
package puddle_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/puddle/puddleg"
|
"github.com/jackc/puddle"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@@ -41,7 +41,7 @@ func (c *Counter) Value() int {
|
|||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
func createConstructor() (puddleg.Constructor[int], *Counter) {
|
func createConstructor() (puddle.Constructor[int], *Counter) {
|
||||||
var c Counter
|
var c Counter
|
||||||
f := func(ctx context.Context) (int, error) {
|
f := func(ctx context.Context) (int, error) {
|
||||||
return c.Next(), nil
|
return c.Next(), nil
|
||||||
@@ -53,13 +53,13 @@ func stubDestructor(int) {}
|
|||||||
|
|
||||||
func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) {
|
func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, -1) })
|
assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, -1) })
|
||||||
assert.Panics(t, func() { puddleg.NewPool(constructor, stubDestructor, 0) })
|
assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, 0) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
|
func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
@@ -71,7 +71,7 @@ func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
|
func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
@@ -96,7 +96,7 @@ func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T)
|
|||||||
|
|
||||||
func TestPoolAcquireWithCancellableContext(t *testing.T) {
|
func TestPoolAcquireWithCancellableContext(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
@@ -126,7 +126,7 @@ func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
|
|||||||
constructor := func(ctx context.Context) (int, error) {
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
return 0, errCreateFailed
|
return 0, errCreateFailed
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
assert.Equal(t, errCreateFailed, err)
|
assert.Equal(t, errCreateFailed, err)
|
||||||
@@ -135,7 +135,7 @@ func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireReusesResources(t *testing.T) {
|
func TestPoolAcquireReusesResources(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -154,11 +154,11 @@ func TestPoolAcquireReusesResources(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolTryAcquire(t *testing.T) {
|
func TestPoolTryAcquire(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
|
|
||||||
// Pool is initially empty so TryAcquire fails but starts construction of resource in the background.
|
// Pool is initially empty so TryAcquire fails but starts construction of resource in the background.
|
||||||
res, err := pool.TryAcquire(context.Background())
|
res, err := pool.TryAcquire(context.Background())
|
||||||
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
|
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
|
||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
|
|
||||||
// Wait for background creation to complete.
|
// Wait for background creation to complete.
|
||||||
@@ -170,7 +170,7 @@ func TestPoolTryAcquire(t *testing.T) {
|
|||||||
defer res.Release()
|
defer res.Release()
|
||||||
|
|
||||||
res, err = pool.TryAcquire(context.Background())
|
res, err = pool.TryAcquire(context.Background())
|
||||||
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
|
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
|
||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
|
|
||||||
assert.Equal(t, 1, createCounter.Value())
|
assert.Equal(t, 1, createCounter.Value())
|
||||||
@@ -178,11 +178,11 @@ func TestPoolTryAcquire(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
|
func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
pool.Close()
|
pool.Close()
|
||||||
|
|
||||||
res, err := pool.TryAcquire(context.Background())
|
res, err := pool.TryAcquire(context.Background())
|
||||||
assert.Equal(t, puddleg.ErrClosedPool, err)
|
assert.Equal(t, puddle.ErrClosedPool, err)
|
||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,16 +191,16 @@ func TestPoolTryAcquireWithFailedResourceCreate(t *testing.T) {
|
|||||||
constructor := func(ctx context.Context) (int, error) {
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
return 0, errCreateFailed
|
return 0, errCreateFailed
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.TryAcquire(context.Background())
|
res, err := pool.TryAcquire(context.Background())
|
||||||
require.EqualError(t, err, puddleg.ErrNotAvailable.Error())
|
require.EqualError(t, err, puddle.ErrNotAvailable.Error())
|
||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
|
func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
|
||||||
constructor, createCounter := createConstructor()
|
constructor, createCounter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
assert.Panics(t, func() { pool.Acquire(nil) })
|
assert.Panics(t, func() { pool.Acquire(nil) })
|
||||||
|
|
||||||
@@ -216,7 +216,7 @@ func TestPoolAcquireContextAlreadyCanceled(t *testing.T) {
|
|||||||
constructor := func(ctx context.Context) (int, error) {
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
panic("should never be called")
|
panic("should never be called")
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cancel()
|
cancel()
|
||||||
@@ -239,7 +239,7 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return constructorCalls.Next(), nil
|
return constructorCalls.Next(), nil
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(ctx)
|
res, err := pool.Acquire(ctx)
|
||||||
assert.Equal(t, context.Canceled, err)
|
assert.Equal(t, context.Canceled, err)
|
||||||
@@ -248,10 +248,10 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireAllIdle(t *testing.T) {
|
func TestPoolAcquireAllIdle(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
resources := make([]*puddleg.Resource[int], 4)
|
resources := make([]*puddle.Resource[int], 4)
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
resources[0], err = pool.Acquire(context.Background())
|
resources[0], err = pool.Acquire(context.Background())
|
||||||
@@ -268,7 +268,7 @@ func TestPoolAcquireAllIdle(t *testing.T) {
|
|||||||
resources[0].Release()
|
resources[0].Release()
|
||||||
resources[3].Release()
|
resources[3].Release()
|
||||||
|
|
||||||
assert.ElementsMatch(t, []*puddleg.Resource[int]{resources[0], resources[3]}, pool.AcquireAllIdle())
|
assert.ElementsMatch(t, []*puddle.Resource[int]{resources[0], resources[3]}, pool.AcquireAllIdle())
|
||||||
|
|
||||||
resources[0].Release()
|
resources[0].Release()
|
||||||
resources[3].Release()
|
resources[3].Release()
|
||||||
@@ -285,14 +285,14 @@ func TestPoolAcquireAllIdle(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) {
|
func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
pool.Close()
|
pool.Close()
|
||||||
assert.Nil(t, pool.AcquireAllIdle())
|
assert.Nil(t, pool.AcquireAllIdle())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolCreateResource(t *testing.T) {
|
func TestPoolCreateResource(t *testing.T) {
|
||||||
constructor, counter := createConstructor()
|
constructor, counter := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@@ -319,7 +319,7 @@ func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
|
|||||||
constructor := func(ctx context.Context) (int, error) {
|
constructor := func(ctx context.Context) (int, error) {
|
||||||
return 0, errCreateFailed
|
return 0, errCreateFailed
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
err := pool.CreateResource(context.Background())
|
err := pool.CreateResource(context.Background())
|
||||||
assert.Equal(t, errCreateFailed, err)
|
assert.Equal(t, errCreateFailed, err)
|
||||||
@@ -327,10 +327,10 @@ func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) {
|
func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
pool.Close()
|
pool.Close()
|
||||||
err := pool.CreateResource(context.Background())
|
err := pool.CreateResource(context.Background())
|
||||||
assert.Equal(t, puddleg.ErrClosedPool, err)
|
assert.Equal(t, puddle.ErrClosedPool, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) {
|
func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) {
|
||||||
@@ -340,7 +340,7 @@ func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testin
|
|||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
return 123, nil
|
return 123, nil
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
acquireErrChan := make(chan error)
|
acquireErrChan := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -352,7 +352,7 @@ func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testin
|
|||||||
pool.Close()
|
pool.Close()
|
||||||
|
|
||||||
err := <-acquireErrChan
|
err := <-acquireErrChan
|
||||||
assert.Equal(t, puddleg.ErrClosedPool, err)
|
assert.Equal(t, puddle.ErrClosedPool, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolCloseClosesAllIdleResources(t *testing.T) {
|
func TestPoolCloseClosesAllIdleResources(t *testing.T) {
|
||||||
@@ -363,9 +363,9 @@ func TestPoolCloseClosesAllIdleResources(t *testing.T) {
|
|||||||
destructorCalls.Next()
|
destructorCalls.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
p := puddleg.NewPool(constructor, destructor, 10)
|
p := puddle.NewPool(constructor, destructor, 10)
|
||||||
|
|
||||||
resources := make([]*puddleg.Resource[int], 4)
|
resources := make([]*puddle.Resource[int], 4)
|
||||||
for i := range resources {
|
for i := range resources {
|
||||||
var err error
|
var err error
|
||||||
resources[i], err = p.Acquire(context.Background())
|
resources[i], err = p.Acquire(context.Background())
|
||||||
@@ -388,9 +388,9 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
|
|||||||
destructorCalls.Next()
|
destructorCalls.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
p := puddleg.NewPool(constructor, destructor, 10)
|
p := puddle.NewPool(constructor, destructor, 10)
|
||||||
|
|
||||||
resources := make([]*puddleg.Resource[int], 4)
|
resources := make([]*puddle.Resource[int], 4)
|
||||||
for i := range resources {
|
for i := range resources {
|
||||||
var err error
|
var err error
|
||||||
resources[i], err = p.Acquire(context.Background())
|
resources[i], err = p.Acquire(context.Background())
|
||||||
@@ -398,7 +398,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, res := range resources {
|
for _, res := range resources {
|
||||||
go func(res *puddleg.Resource[int]) {
|
go func(res *puddle.Resource[int]) {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
res.Release()
|
res.Release()
|
||||||
}(res)
|
}(res)
|
||||||
@@ -411,7 +411,7 @@ func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
|
|||||||
func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
|
func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
|
|
||||||
p := puddleg.NewPool(constructor, stubDestructor, 10)
|
p := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
p.Close()
|
p.Close()
|
||||||
p.Close()
|
p.Close()
|
||||||
@@ -433,7 +433,7 @@ func TestPoolStatResources(t *testing.T) {
|
|||||||
|
|
||||||
return constructorCalls.Next(), nil
|
return constructorCalls.Next(), nil
|
||||||
}
|
}
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
resAcquired, err := pool.Acquire(context.Background())
|
resAcquired, err := pool.Acquire(context.Background())
|
||||||
@@ -468,7 +468,7 @@ func TestPoolStatResources(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
|
func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
@@ -514,7 +514,7 @@ func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
|
func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@@ -533,7 +533,7 @@ func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
|
|||||||
return 0, ctx.Err()
|
return 0, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@@ -548,7 +548,7 @@ func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
|
func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
@@ -573,7 +573,7 @@ func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
|
|||||||
destructorCalls.Next()
|
destructorCalls.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, destructor, 10)
|
pool := puddle.NewPool(constructor, destructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -592,7 +592,7 @@ func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
|
|||||||
|
|
||||||
func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
|
func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -612,7 +612,7 @@ func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
|
|||||||
|
|
||||||
func TestResourceLastUsageTimeTracking(t *testing.T) {
|
func TestResourceLastUsageTimeTracking(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 1)
|
pool := puddle.NewPool(constructor, stubDestructor, 1)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -646,7 +646,7 @@ func TestResourceLastUsageTimeTracking(t *testing.T) {
|
|||||||
|
|
||||||
func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
|
func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -664,11 +664,11 @@ func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
|
func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, 10)
|
pool := puddle.NewPool(constructor, stubDestructor, 10)
|
||||||
pool.Close()
|
pool.Close()
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
assert.Equal(t, puddleg.ErrClosedPool, err)
|
assert.Equal(t, puddle.ErrClosedPool, err)
|
||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -682,7 +682,7 @@ func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
destructor := func(value interface{}) {}
|
destructor := func(value interface{}) {}
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, destructor, 1)
|
pool := puddle.NewPool(constructor, destructor, 1)
|
||||||
|
|
||||||
res1, err := pool.Acquire(context.Background())
|
res1, err := pool.Acquire(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -714,12 +714,12 @@ func TestStress(t *testing.T) {
|
|||||||
poolSize = 4
|
poolSize = 4
|
||||||
}
|
}
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, destructor, int32(poolSize))
|
pool := puddle.NewPool(constructor, destructor, int32(poolSize))
|
||||||
|
|
||||||
finishChan := make(chan struct{})
|
finishChan := make(chan struct{})
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
releaseOrDestroyOrHijack := func(res *puddleg.Resource[int]) {
|
releaseOrDestroyOrHijack := func(res *puddle.Resource[int]) {
|
||||||
n := rand.Intn(100)
|
n := rand.Intn(100)
|
||||||
if n < 5 {
|
if n < 5 {
|
||||||
res.Hijack()
|
res.Hijack()
|
||||||
@@ -736,7 +736,7 @@ func TestStress(t *testing.T) {
|
|||||||
func() {
|
func() {
|
||||||
res, err := pool.Acquire(context.Background())
|
res, err := pool.Acquire(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != puddleg.ErrClosedPool {
|
if err != puddle.ErrClosedPool {
|
||||||
assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err)
|
assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -751,7 +751,7 @@ func TestStress(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
res, err := pool.Acquire(ctx)
|
res, err := pool.Acquire(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != puddleg.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
|
if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
|
||||||
assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err)
|
assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -764,7 +764,7 @@ func TestStress(t *testing.T) {
|
|||||||
func() {
|
func() {
|
||||||
res, err := pool.TryAcquire(context.Background())
|
res, err := pool.TryAcquire(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != puddleg.ErrClosedPool && err != puddleg.ErrNotAvailable {
|
if err != puddle.ErrClosedPool && err != puddle.ErrNotAvailable {
|
||||||
assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err)
|
assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -850,7 +850,7 @@ func ExamplePool() {
|
|||||||
}
|
}
|
||||||
maxPoolSize := int32(10)
|
maxPoolSize := int32(10)
|
||||||
|
|
||||||
pool := puddleg.NewPool(constructor, destructor, maxPoolSize)
|
pool := puddle.NewPool(constructor, destructor, maxPoolSize)
|
||||||
|
|
||||||
// Use pool multiple times
|
// Use pool multiple times
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@@ -949,7 +949,7 @@ func BenchmarkPoolAcquireAndRelease(b *testing.B) {
|
|||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
constructor, _ := createConstructor()
|
constructor, _ := createConstructor()
|
||||||
pool := puddleg.NewPool(constructor, stubDestructor, bm.poolSize)
|
pool := puddle.NewPool(constructor, stubDestructor, bm.poolSize)
|
||||||
|
|
||||||
for i := 0; i < bm.clientCount; i++ {
|
for i := 0; i < bm.clientCount; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
// Package puddleg is a generic resource pool with type-parametrized api.
|
|
||||||
/*
|
|
||||||
|
|
||||||
Puddle is a tiny generic resource pool library for Go that uses the standard
|
|
||||||
context library to signal cancellation of acquires. It is designed to contain
|
|
||||||
the minimum functionality a resource pool needs that cannot be implemented
|
|
||||||
without concurrency concerns. For example, a database connection pool may use
|
|
||||||
puddle internally and implement health checks and keep-alive behavior without
|
|
||||||
needing to implement any concurrent code of its own.
|
|
||||||
*/
|
|
||||||
package puddleg
|
|
||||||
-532
@@ -1,532 +0,0 @@
|
|||||||
package puddleg
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
resourceStatusConstructing = 0
|
|
||||||
resourceStatusIdle = iota
|
|
||||||
resourceStatusAcquired = iota
|
|
||||||
resourceStatusHijacked = iota
|
|
||||||
)
|
|
||||||
|
|
||||||
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
|
|
||||||
// or a pool that is closed while the acquire is waiting.
|
|
||||||
var ErrClosedPool = errors.New("closed pool")
|
|
||||||
|
|
||||||
// ErrNotAvailable occurs on an attempt to acquire a resource from a pool
|
|
||||||
// that is at maximum capacity and has no available resources.
|
|
||||||
var ErrNotAvailable = errors.New("resource not available")
|
|
||||||
|
|
||||||
// Constructor is a function called by the pool to construct a resource.
|
|
||||||
type Constructor[T any] func(ctx context.Context) (res T, err error)
|
|
||||||
|
|
||||||
// Destructor is a function called by the pool to destroy a resource.
|
|
||||||
type Destructor[T any] func(res T)
|
|
||||||
|
|
||||||
// Resource is the resource handle returned by acquiring from the pool.
|
|
||||||
type Resource[T any] struct {
|
|
||||||
value T
|
|
||||||
pool *Pool[T]
|
|
||||||
creationTime time.Time
|
|
||||||
lastUsedNano int64
|
|
||||||
status byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// Value returns the resource value.
|
|
||||||
func (res *Resource[T]) Value() T {
|
|
||||||
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
|
||||||
panic("tried to access resource that is not acquired or hijacked")
|
|
||||||
}
|
|
||||||
return res.value
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release returns the resource to the pool. res must not be subsequently used.
|
|
||||||
func (res *Resource[T]) Release() {
|
|
||||||
if res.status != resourceStatusAcquired {
|
|
||||||
panic("tried to release resource that is not acquired")
|
|
||||||
}
|
|
||||||
res.pool.releaseAcquiredResource(res, nanotime())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
|
|
||||||
// will not change. res must not be subsequently used.
|
|
||||||
func (res *Resource[T]) ReleaseUnused() {
|
|
||||||
if res.status != resourceStatusAcquired {
|
|
||||||
panic("tried to release resource that is not acquired")
|
|
||||||
}
|
|
||||||
res.pool.releaseAcquiredResource(res, res.lastUsedNano)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Destroy returns the resource to the pool for destruction. res must not be
|
|
||||||
// subsequently used.
|
|
||||||
func (res *Resource[T]) Destroy() {
|
|
||||||
if res.status != resourceStatusAcquired {
|
|
||||||
panic("tried to destroy resource that is not acquired")
|
|
||||||
}
|
|
||||||
go res.pool.destroyAcquiredResource(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hijack assumes ownership of the resource from the pool. Caller is responsible
|
|
||||||
// for cleanup of resource value.
|
|
||||||
func (res *Resource[T]) Hijack() {
|
|
||||||
if res.status != resourceStatusAcquired {
|
|
||||||
panic("tried to hijack resource that is not acquired")
|
|
||||||
}
|
|
||||||
res.pool.hijackAcquiredResource(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreationTime returns when the resource was created by the pool.
|
|
||||||
func (res *Resource[T]) CreationTime() time.Time {
|
|
||||||
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
|
||||||
panic("tried to access resource that is not acquired or hijacked")
|
|
||||||
}
|
|
||||||
return res.creationTime
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
|
|
||||||
// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
|
|
||||||
// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
|
|
||||||
func (res *Resource[T]) LastUsedNanotime() int64 {
|
|
||||||
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
|
||||||
panic("tried to access resource that is not acquired or hijacked")
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.lastUsedNano
|
|
||||||
}
|
|
||||||
|
|
||||||
// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
|
|
||||||
// LastUsedNanotime to the current nanotime.
|
|
||||||
func (res *Resource[T]) IdleDuration() time.Duration {
|
|
||||||
if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
|
|
||||||
panic("tried to access resource that is not acquired or hijacked")
|
|
||||||
}
|
|
||||||
|
|
||||||
return time.Duration(nanotime() - res.lastUsedNano)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pool is a concurrency-safe resource pool.
|
|
||||||
type Pool[T any] struct {
|
|
||||||
cond *sync.Cond
|
|
||||||
destructWG *sync.WaitGroup
|
|
||||||
|
|
||||||
allResources []*Resource[T]
|
|
||||||
idleResources []*Resource[T]
|
|
||||||
|
|
||||||
constructor Constructor[T]
|
|
||||||
destructor Destructor[T]
|
|
||||||
maxSize int32
|
|
||||||
|
|
||||||
acquireCount int64
|
|
||||||
acquireDuration time.Duration
|
|
||||||
emptyAcquireCount int64
|
|
||||||
canceledAcquireCount int64
|
|
||||||
|
|
||||||
closed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPool creates a new pool. Panics if maxSize is less than 1.
|
|
||||||
func NewPool[T any](constructor Constructor[T], destructor Destructor[T], maxSize int32) *Pool[T] {
|
|
||||||
if maxSize < 1 {
|
|
||||||
panic("maxSize is less than 1")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Pool[T]{
|
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
|
||||||
destructWG: &sync.WaitGroup{},
|
|
||||||
maxSize: maxSize,
|
|
||||||
constructor: constructor,
|
|
||||||
destructor: destructor,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close destroys all resources in the pool and rejects future Acquire calls.
|
|
||||||
// Blocks until all resources are returned to pool and destroyed.
|
|
||||||
func (p *Pool[T]) Close() {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.closed = true
|
|
||||||
|
|
||||||
for _, res := range p.idleResources {
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
go p.destructResourceValue(res.value)
|
|
||||||
}
|
|
||||||
p.idleResources = nil
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
// Wake up all go routines waiting for a resource to be returned so they can terminate.
|
|
||||||
p.cond.Broadcast()
|
|
||||||
|
|
||||||
p.destructWG.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat is a snapshot of Pool statistics.
|
|
||||||
type Stat struct {
|
|
||||||
constructingResources int32
|
|
||||||
acquiredResources int32
|
|
||||||
idleResources int32
|
|
||||||
maxResources int32
|
|
||||||
acquireCount int64
|
|
||||||
acquireDuration time.Duration
|
|
||||||
emptyAcquireCount int64
|
|
||||||
canceledAcquireCount int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// TotalResources returns the total number of resources currently in the pool.
|
|
||||||
// The value is the sum of ConstructingResources, AcquiredResources, and
|
|
||||||
// IdleResources.
|
|
||||||
func (s *Stat) TotalResources() int32 {
|
|
||||||
return s.constructingResources + s.acquiredResources + s.idleResources
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConstructingResources returns the number of resources with construction in progress in
|
|
||||||
// the pool.
|
|
||||||
func (s *Stat) ConstructingResources() int32 {
|
|
||||||
return s.constructingResources
|
|
||||||
}
|
|
||||||
|
|
||||||
// AcquiredResources returns the number of currently acquired resources in the pool.
|
|
||||||
func (s *Stat) AcquiredResources() int32 {
|
|
||||||
return s.acquiredResources
|
|
||||||
}
|
|
||||||
|
|
||||||
// IdleResources returns the number of currently idle resources in the pool.
|
|
||||||
func (s *Stat) IdleResources() int32 {
|
|
||||||
return s.idleResources
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxResources returns the maximum size of the pool.
|
|
||||||
func (s *Stat) MaxResources() int32 {
|
|
||||||
return s.maxResources
|
|
||||||
}
|
|
||||||
|
|
||||||
// AcquireCount returns the cumulative count of successful acquires from the pool.
|
|
||||||
func (s *Stat) AcquireCount() int64 {
|
|
||||||
return s.acquireCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// AcquireDuration returns the total duration of all successful acquires from
|
|
||||||
// the pool.
|
|
||||||
func (s *Stat) AcquireDuration() time.Duration {
|
|
||||||
return s.acquireDuration
|
|
||||||
}
|
|
||||||
|
|
||||||
// EmptyAcquireCount returns the cumulative count of successful acquires from the pool
|
|
||||||
// that waited for a resource to be released or constructed because the pool was
|
|
||||||
// empty.
|
|
||||||
func (s *Stat) EmptyAcquireCount() int64 {
|
|
||||||
return s.emptyAcquireCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// CanceledAcquireCount returns the cumulative count of acquires from the pool
|
|
||||||
// that were canceled by a context.
|
|
||||||
func (s *Stat) CanceledAcquireCount() int64 {
|
|
||||||
return s.canceledAcquireCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat returns the current pool statistics.
|
|
||||||
func (p *Pool[T]) Stat() *Stat {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
s := &Stat{
|
|
||||||
maxResources: p.maxSize,
|
|
||||||
acquireCount: p.acquireCount,
|
|
||||||
emptyAcquireCount: p.emptyAcquireCount,
|
|
||||||
canceledAcquireCount: p.canceledAcquireCount,
|
|
||||||
acquireDuration: p.acquireDuration,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, res := range p.allResources {
|
|
||||||
switch res.status {
|
|
||||||
case resourceStatusConstructing:
|
|
||||||
s.constructingResources += 1
|
|
||||||
case resourceStatusIdle:
|
|
||||||
s.idleResources += 1
|
|
||||||
case resourceStatusAcquired:
|
|
||||||
s.acquiredResources += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire gets a resource from the pool. If no resources are available and the pool
|
|
||||||
// is not at maximum capacity it will create a new resource. If the pool is at
|
|
||||||
// maximum capacity it will block until a resource is available. ctx can be used
|
|
||||||
// to cancel the Acquire.
|
|
||||||
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
|
||||||
startNano := nanotime()
|
|
||||||
if doneChan := ctx.Done(); doneChan != nil {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
p.cond.L.Lock()
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.cond.L.Lock()
|
|
||||||
|
|
||||||
emptyAcquire := false
|
|
||||||
|
|
||||||
for {
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, ErrClosedPool
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a resource is available now
|
|
||||||
if len(p.idleResources) > 0 {
|
|
||||||
res := p.idleResources[len(p.idleResources)-1]
|
|
||||||
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
|
|
||||||
p.idleResources = p.idleResources[:len(p.idleResources)-1]
|
|
||||||
res.status = resourceStatusAcquired
|
|
||||||
if emptyAcquire {
|
|
||||||
p.emptyAcquireCount += 1
|
|
||||||
}
|
|
||||||
p.acquireCount += 1
|
|
||||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
emptyAcquire = true
|
|
||||||
|
|
||||||
// If there is room to create a resource do so
|
|
||||||
if len(p.allResources) < int(p.maxSize) {
|
|
||||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
|
||||||
p.allResources = append(p.allResources, res)
|
|
||||||
p.destructWG.Add(1)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
value, err := p.constructResourceValue(ctx)
|
|
||||||
p.cond.L.Lock()
|
|
||||||
if err != nil {
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
p.destructWG.Done()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
if err == ctx.Err() {
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
p.cond.Signal()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res.value = value
|
|
||||||
res.status = resourceStatusAcquired
|
|
||||||
p.emptyAcquireCount += 1
|
|
||||||
p.acquireCount += 1
|
|
||||||
p.acquireDuration += time.Duration(nanotime() - startNano)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctx.Done() == nil {
|
|
||||||
p.cond.Wait()
|
|
||||||
} else {
|
|
||||||
// Convert p.cond.Wait into a channel
|
|
||||||
waitChan := make(chan struct{}, 1)
|
|
||||||
go func() {
|
|
||||||
p.cond.Wait()
|
|
||||||
waitChan <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
|
|
||||||
// do anything with it. Another goroutine might be waiting.
|
|
||||||
go func() {
|
|
||||||
<-waitChan
|
|
||||||
p.cond.Signal()
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
p.cond.L.Lock()
|
|
||||||
p.canceledAcquireCount += 1
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
case <-waitChan:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
|
|
||||||
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
|
|
||||||
// used to cancel the background creation.
|
|
||||||
func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
defer p.cond.L.Unlock()
|
|
||||||
|
|
||||||
if p.closed {
|
|
||||||
return nil, ErrClosedPool
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a resource is available now
|
|
||||||
if len(p.idleResources) > 0 {
|
|
||||||
res := p.idleResources[len(p.idleResources)-1]
|
|
||||||
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
|
|
||||||
p.idleResources = p.idleResources[:len(p.idleResources)-1]
|
|
||||||
p.acquireCount += 1
|
|
||||||
res.status = resourceStatusAcquired
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(p.allResources) < int(p.maxSize) {
|
|
||||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
|
||||||
p.allResources = append(p.allResources, res)
|
|
||||||
p.destructWG.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
value, err := p.constructResourceValue(ctx)
|
|
||||||
defer p.cond.Signal()
|
|
||||||
p.cond.L.Lock()
|
|
||||||
defer p.cond.L.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
p.destructWG.Done()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res.value = value
|
|
||||||
res.status = resourceStatusIdle
|
|
||||||
p.idleResources = append(p.idleResources, res)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, ErrNotAvailable
|
|
||||||
}
|
|
||||||
|
|
||||||
// AcquireAllIdle atomically acquires all currently idle resources. Its intended
|
|
||||||
// use is for health check and keep-alive functionality. It does not update pool
|
|
||||||
// statistics.
|
|
||||||
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, res := range p.idleResources {
|
|
||||||
res.status = resourceStatusAcquired
|
|
||||||
}
|
|
||||||
resources := p.idleResources // Swap out current slice
|
|
||||||
p.idleResources = nil
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return resources
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateResource constructs a new resource without acquiring it.
|
|
||||||
// It goes straight in the IdlePool. It does not check against maxSize.
|
|
||||||
// It can be useful to maintain warm resources under little load.
|
|
||||||
func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
if p.closed {
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return ErrClosedPool
|
|
||||||
}
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
value, err := p.constructResourceValue(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := &Resource[T]{
|
|
||||||
pool: p,
|
|
||||||
creationTime: time.Now(),
|
|
||||||
status: resourceStatusIdle,
|
|
||||||
value: value,
|
|
||||||
lastUsedNano: nanotime(),
|
|
||||||
}
|
|
||||||
p.destructWG.Add(1)
|
|
||||||
|
|
||||||
p.cond.L.Lock()
|
|
||||||
// If closed while constructing resource then destroy it and return an error
|
|
||||||
if p.closed {
|
|
||||||
go p.destructResourceValue(res.value)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return ErrClosedPool
|
|
||||||
}
|
|
||||||
p.allResources = append(p.allResources, res)
|
|
||||||
p.idleResources = append(p.idleResources, res)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// releaseAcquiredResource returns res to the the pool.
|
|
||||||
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
|
|
||||||
if !p.closed {
|
|
||||||
res.lastUsedNano = lastUsedNano
|
|
||||||
res.status = resourceStatusIdle
|
|
||||||
p.idleResources = append(p.idleResources, res)
|
|
||||||
} else {
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
go p.destructResourceValue(res.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
p.cond.Signal()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove removes res from the pool and closes it. If res is not part of the
|
|
||||||
// pool Remove will panic.
|
|
||||||
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
|
|
||||||
p.destructResourceValue(res.value)
|
|
||||||
p.cond.L.Lock()
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
p.cond.Signal()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
res.status = resourceStatusHijacked
|
|
||||||
p.destructWG.Done() // not responsible for destructing hijacked resources
|
|
||||||
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
p.cond.Signal()
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {
|
|
||||||
for i := range slice {
|
|
||||||
if slice[i] == res {
|
|
||||||
slice[i] = slice[len(slice)-1]
|
|
||||||
slice[len(slice)-1] = nil // Avoid memory leak
|
|
||||||
return slice[:len(slice)-1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
panic("BUG: removeResource could not find res in slice")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool[T]) constructResourceValue(ctx context.Context) (T, error) {
|
|
||||||
return p.constructor(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool[T]) destructResourceValue(value T) {
|
|
||||||
p.destructor(value)
|
|
||||||
p.destructWG.Done()
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user