Remove resource limits
Acquire health check will be able to take over this functionality.
This commit is contained in:
@@ -3,9 +3,7 @@ package puddle
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -24,11 +22,9 @@ type CreateFunc func(ctx context.Context) (res interface{}, err error)
|
|||||||
type CloseFunc func(res interface{})
|
type CloseFunc func(res interface{})
|
||||||
|
|
||||||
type Resource struct {
|
type Resource struct {
|
||||||
value interface{}
|
value interface{}
|
||||||
pool *Pool
|
pool *Pool
|
||||||
creationTime time.Time
|
status byte
|
||||||
checkoutCount uint64
|
|
||||||
status byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (res *Resource) Value() interface{} {
|
func (res *Resource) Value() interface{} {
|
||||||
@@ -47,13 +43,11 @@ func (res *Resource) Destroy() {
|
|||||||
type Pool struct {
|
type Pool struct {
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
|
|
||||||
allResources map[interface{}]*Resource
|
allResources map[interface{}]*Resource
|
||||||
availableResources []*Resource
|
availableResources []*Resource
|
||||||
minSize int
|
minSize int
|
||||||
maxSize int
|
maxSize int
|
||||||
maxResourceDuration time.Duration
|
closed bool
|
||||||
maxResourceCheckouts uint64
|
|
||||||
closed bool
|
|
||||||
|
|
||||||
createRes CreateFunc
|
createRes CreateFunc
|
||||||
closeRes CloseFunc
|
closeRes CloseFunc
|
||||||
@@ -61,13 +55,11 @@ type Pool struct {
|
|||||||
|
|
||||||
func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool {
|
func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool {
|
||||||
return &Pool{
|
return &Pool{
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
allResources: make(map[interface{}]*Resource),
|
allResources: make(map[interface{}]*Resource),
|
||||||
maxSize: maxInt,
|
maxSize: maxInt,
|
||||||
maxResourceDuration: math.MaxInt64,
|
createRes: createRes,
|
||||||
maxResourceCheckouts: math.MaxUint64,
|
closeRes: closeRes,
|
||||||
createRes: createRes,
|
|
||||||
closeRes: closeRes,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,42 +125,6 @@ func (p *Pool) SetMaxSize(n int) {
|
|||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxResourceDuration returns the current maximum resource duration of the pool.
|
|
||||||
func (p *Pool) MaxResourceDuration() time.Duration {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
n := p.maxResourceDuration
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetMaxResourceDuration sets the maximum maximum resource duration of the pool. It panics if n < 1.
|
|
||||||
func (p *Pool) SetMaxResourceDuration(d time.Duration) {
|
|
||||||
if d < 0 {
|
|
||||||
panic("pool MaxResourceDuration cannot be < 0")
|
|
||||||
}
|
|
||||||
p.cond.L.Lock()
|
|
||||||
p.maxResourceDuration = d
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxResourceCheckouts returns the current maximum uses per resource of the pool.
|
|
||||||
func (p *Pool) MaxResourceCheckouts() uint64 {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
n := p.maxResourceCheckouts
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetMaxResourceCheckouts sets the maximum maximum resource duration of the pool. It panics if n < 1.
|
|
||||||
func (p *Pool) SetMaxResourceCheckouts(n uint64) {
|
|
||||||
if n < 0 {
|
|
||||||
panic("pool MaxResourceCheckouts cannot be < 1")
|
|
||||||
}
|
|
||||||
p.cond.L.Lock()
|
|
||||||
p.maxResourceCheckouts = n
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire gets a resource from the pool. If no resources are available and the pool
|
// Acquire gets a resource from the pool. If no resources are available and the pool
|
||||||
// is not at maximum capacity it will create a new resource. If the pool is at
|
// is not at maximum capacity it will create a new resource. If the pool is at
|
||||||
// maximum capacity it will block until a resource is available. ctx can be used
|
// maximum capacity it will block until a resource is available. ctx can be used
|
||||||
@@ -200,8 +156,7 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
|
|||||||
if len(p.allResources) < p.maxSize {
|
if len(p.allResources) < p.maxSize {
|
||||||
var localVal int
|
var localVal int
|
||||||
placeholder := &localVal
|
placeholder := &localVal
|
||||||
startTime := time.Now()
|
p.allResources[placeholder] = &Resource{value: placeholder, status: resourceStatusCreating}
|
||||||
p.allResources[placeholder] = &Resource{value: placeholder, creationTime: startTime, status: resourceStatusCreating}
|
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
res, err := p.createRes(ctx)
|
res, err := p.createRes(ctx)
|
||||||
@@ -212,7 +167,7 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rw := &Resource{pool: p, value: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1}
|
rw := &Resource{pool: p, value: res, status: resourceStatusBorrowed}
|
||||||
p.allResources[res] = rw
|
p.allResources[res] = rw
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
return rw, nil
|
return rw, nil
|
||||||
@@ -259,7 +214,6 @@ func (p *Pool) lockedAvailableAcquire() *Resource {
|
|||||||
panic("BUG: unavailable resource gotten from availableResources")
|
panic("BUG: unavailable resource gotten from availableResources")
|
||||||
}
|
}
|
||||||
rw.status = resourceStatusBorrowed
|
rw.status = resourceStatusBorrowed
|
||||||
rw.checkoutCount += 1
|
|
||||||
return rw
|
return rw
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -269,17 +223,7 @@ func (p *Pool) releaseBorrowedResource(res interface{}) {
|
|||||||
|
|
||||||
rw := p.allResources[res]
|
rw := p.allResources[res]
|
||||||
|
|
||||||
closeResource := true
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
if p.closed {
|
if p.closed {
|
||||||
} else if now.Sub(rw.creationTime) > p.maxResourceDuration {
|
|
||||||
} else if p.maxResourceCheckouts <= rw.checkoutCount { // use <= instead of == as maxResourceCheckouts may be lowered while pool is in use
|
|
||||||
} else {
|
|
||||||
closeResource = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if closeResource {
|
|
||||||
delete(p.allResources, rw.value)
|
delete(p.allResources, rw.value)
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
go p.closeRes(rw.value)
|
go p.closeRes(rw.value)
|
||||||
|
|||||||
@@ -182,44 +182,6 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
|
|||||||
assert.Nil(t, res)
|
assert.Nil(t, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestResourceReleaseClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) {
|
|
||||||
createFunc, _ := createCreateResourceFunc()
|
|
||||||
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
|
|
||||||
|
|
||||||
pool := puddle.NewPool(createFunc, closeFunc)
|
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, pool.Size())
|
|
||||||
|
|
||||||
pool.SetMaxResourceDuration(time.Nanosecond)
|
|
||||||
time.Sleep(2 * time.Nanosecond)
|
|
||||||
res.Release()
|
|
||||||
|
|
||||||
waitForRead(closeCallsChan)
|
|
||||||
assert.Equal(t, 0, pool.Size())
|
|
||||||
assert.Equal(t, 1, closeCalls.Value())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestResourceReleaseClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourceCheckouts(t *testing.T) {
|
|
||||||
createFunc, _ := createCreateResourceFunc()
|
|
||||||
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
|
|
||||||
|
|
||||||
pool := puddle.NewPool(createFunc, closeFunc)
|
|
||||||
pool.SetMaxResourceCheckouts(1)
|
|
||||||
|
|
||||||
res, err := pool.Acquire(context.Background())
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
res.Release()
|
|
||||||
|
|
||||||
waitForRead(closeCallsChan)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, closeCalls.Value())
|
|
||||||
assert.Equal(t, 0, pool.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
|
func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
|
||||||
createFunc, _ := createCreateResourceFunc()
|
createFunc, _ := createCreateResourceFunc()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user