Add max resource uses
This commit is contained in:
@@ -29,9 +29,10 @@ type CloseFunc func(res interface{}) (err error)
|
|||||||
type BackgroundErrorHandler func(err error)
|
type BackgroundErrorHandler func(err error)
|
||||||
|
|
||||||
type resourceWrapper struct {
|
type resourceWrapper struct {
|
||||||
resource interface{}
|
resource interface{}
|
||||||
creationTime time.Time
|
creationTime time.Time
|
||||||
status byte
|
checkoutCount uint64
|
||||||
|
status byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool is a thread-safe resource pool.
|
// Pool is a thread-safe resource pool.
|
||||||
@@ -43,6 +44,7 @@ type Pool struct {
|
|||||||
minSize int
|
minSize int
|
||||||
maxSize int
|
maxSize int
|
||||||
maxResourceDuration time.Duration
|
maxResourceDuration time.Duration
|
||||||
|
maxResourceUses uint64
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
createRes CreateFunc
|
createRes CreateFunc
|
||||||
@@ -56,6 +58,7 @@ func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool {
|
|||||||
allResources: make(map[interface{}]*resourceWrapper),
|
allResources: make(map[interface{}]*resourceWrapper),
|
||||||
maxSize: maxInt,
|
maxSize: maxInt,
|
||||||
maxResourceDuration: math.MaxInt64,
|
maxResourceDuration: math.MaxInt64,
|
||||||
|
maxResourceUses: math.MaxUint64,
|
||||||
createRes: createRes,
|
createRes: createRes,
|
||||||
closeRes: closeRes,
|
closeRes: closeRes,
|
||||||
backgroundErrorHandler: func(error) {},
|
backgroundErrorHandler: func(error) {},
|
||||||
@@ -147,6 +150,24 @@ func (p *Pool) SetMaxResourceDuration(d time.Duration) {
|
|||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaxResourceUses returns the current maximum uses per resource of the pool.
|
||||||
|
func (p *Pool) MaxResourceUses() uint64 {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
n := p.maxResourceUses
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMaxResourceUses sets the maximum maximum resource duration of the pool. It panics if n < 1.
|
||||||
|
func (p *Pool) SetMaxResourceUses(n uint64) {
|
||||||
|
if n < 0 {
|
||||||
|
panic("pool MaxResourceUses cannot be < 1")
|
||||||
|
}
|
||||||
|
p.cond.L.Lock()
|
||||||
|
p.maxResourceUses = n
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// SetBackgroundErrorHandler assigns a handler for errors that have no other
|
// SetBackgroundErrorHandler assigns a handler for errors that have no other
|
||||||
// place to be reported. For example, Get is called when no resources are
|
// place to be reported. For example, Get is called when no resources are
|
||||||
// available. Get begins creating a new resource (in a goroutine). Before the
|
// available. Get begins creating a new resource (in a goroutine). Before the
|
||||||
@@ -252,6 +273,7 @@ func (p *Pool) lockedAvailableGet() interface{} {
|
|||||||
panic("BUG: unavailable resource gotten from availableResources")
|
panic("BUG: unavailable resource gotten from availableResources")
|
||||||
}
|
}
|
||||||
rw.status = resourceStatusBorrowed
|
rw.status = resourceStatusBorrowed
|
||||||
|
rw.checkoutCount += 1
|
||||||
return rw.resource
|
return rw.resource
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -277,7 +299,7 @@ func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed}
|
rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1}
|
||||||
p.allResources[res] = rw
|
p.allResources[res] = rw
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
resChan <- res
|
resChan <- res
|
||||||
@@ -326,8 +348,16 @@ func (p *Pool) Return(res interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
closeResource := true
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if now.Sub(rw.creationTime) > p.maxResourceDuration {
|
if now.Sub(rw.creationTime) > p.maxResourceDuration {
|
||||||
|
} else if p.maxResourceUses <= rw.checkoutCount { // use <= instead of == as maxResourceUses may be lowered while pool is in use
|
||||||
|
} else {
|
||||||
|
closeResource = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if closeResource {
|
||||||
delete(p.allResources, rw.resource)
|
delete(p.allResources, rw.resource)
|
||||||
p.ensureMinResources()
|
p.ensureMinResources()
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
|
|||||||
@@ -186,6 +186,43 @@ func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T)
|
|||||||
assert.Equal(t, 0, pool.Size())
|
assert.Equal(t, 0, pool.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolReturnClosesAndRemovesResourceIfMoreUsesThanMaxResourceUses(t *testing.T) {
|
||||||
|
closeCallsChan := make(chan int, 4)
|
||||||
|
|
||||||
|
waitForRead := func(ch chan int) bool {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
return true
|
||||||
|
case <-time.NewTimer(time.Second).C:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var createCalls Counter
|
||||||
|
createFunc := func() (interface{}, error) {
|
||||||
|
return createCalls.Next(), nil
|
||||||
|
}
|
||||||
|
var closeCalls Counter
|
||||||
|
closeFunc := func(interface{}) error {
|
||||||
|
n := closeCalls.Next()
|
||||||
|
closeCallsChan <- n
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := puddle.NewPool(createFunc, closeFunc)
|
||||||
|
pool.SetMaxResourceUses(1)
|
||||||
|
|
||||||
|
res, err := pool.Get(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pool.Return(res)
|
||||||
|
|
||||||
|
waitForRead(closeCallsChan)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, closeCalls.Value())
|
||||||
|
assert.Equal(t, 0, pool.Size())
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
|
func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
|
||||||
var createCalls Counter
|
var createCalls Counter
|
||||||
createFunc := func() (interface{}, error) {
|
createFunc := func() (interface{}, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user