2
0

Add max resource lifetime checking on return

This commit is contained in:
Jack Christensen
2018-12-23 21:28:18 -06:00
parent 778ac737e5
commit 76b0c06b8b
3 changed files with 109 additions and 25 deletions
+2
View File
@@ -12,3 +12,5 @@ Puddle is a generic resource pool library for Go.
* Reset pool * Reset pool
* Shrink pool * Shrink pool
* Stress test * Stress test
* Stat - supercede Size, include available resources, checked out resources, get count, slow get count, slow get wait time, total create count, total background error count
* Refactor createCalls and closeCalls in tests
+63 -22
View File
@@ -3,7 +3,9 @@ package puddle
import ( import (
"context" "context"
"errors" "errors"
"math"
"sync" "sync"
"time"
) )
const ( const (
@@ -27,31 +29,34 @@ type CloseFunc func(res interface{}) (err error)
type BackgroundErrorHandler func(err error) type BackgroundErrorHandler func(err error)
type resourceWrapper struct { type resourceWrapper struct {
resource interface{} resource interface{}
status byte creationTime time.Time
status byte
} }
// Pool is a thread-safe resource pool. // Pool is a thread-safe resource pool.
type Pool struct { type Pool struct {
cond *sync.Cond cond *sync.Cond
allResources map[interface{}]*resourceWrapper allResources map[interface{}]*resourceWrapper
availableResources []*resourceWrapper availableResources []*resourceWrapper
minSize int minSize int
maxSize int maxSize int
closed bool maxResourceDuration time.Duration
closed bool
create CreateFunc createRes CreateFunc
closeRes CloseFunc closeRes CloseFunc
backgroundErrorHandler BackgroundErrorHandler backgroundErrorHandler BackgroundErrorHandler
} }
func NewPool(create CreateFunc, closeRes CloseFunc) *Pool { func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool {
return &Pool{ return &Pool{
cond: sync.NewCond(new(sync.Mutex)), cond: sync.NewCond(new(sync.Mutex)),
allResources: make(map[interface{}]*resourceWrapper), allResources: make(map[interface{}]*resourceWrapper),
maxSize: maxInt, maxSize: maxInt,
create: create, maxResourceDuration: math.MaxInt64,
createRes: createRes,
closeRes: closeRes, closeRes: closeRes,
backgroundErrorHandler: func(error) {}, backgroundErrorHandler: func(error) {},
} }
@@ -101,10 +106,7 @@ func (p *Pool) SetMinSize(n int) {
p.cond.L.Lock() p.cond.L.Lock()
p.minSize = n p.minSize = n
for len(p.allResources) < p.minSize { p.ensureMinResources()
createResChan, createErrChan := p.startCreate()
p.backgroundFinishCreate(createResChan, createErrChan)
}
p.cond.L.Unlock() p.cond.L.Unlock()
} }
@@ -127,6 +129,24 @@ func (p *Pool) SetMaxSize(n int) {
p.cond.L.Unlock() p.cond.L.Unlock()
} }
// MaxResourceDuration returns the current maximum resource duration of the pool.
func (p *Pool) MaxResourceDuration() time.Duration {
p.cond.L.Lock()
n := p.maxResourceDuration
p.cond.L.Unlock()
return n
}
// SetMaxResourceDuration sets the maximum maximum resource duration of the pool. It panics if n < 1.
func (p *Pool) SetMaxResourceDuration(d time.Duration) {
if d < 0 {
panic("pool MaxResourceDuration cannot be < 0")
}
p.cond.L.Lock()
p.maxResourceDuration = d
p.cond.L.Unlock()
}
// SetBackgroundErrorHandler assigns a handler for errors that have no other // SetBackgroundErrorHandler assigns a handler for errors that have no other
// place to be reported. For example, Get is called when no resources are // place to be reported. For example, Get is called when no resources are
// available. Get begins creating a new resource (in a goroutine). Before the // available. Get begins creating a new resource (in a goroutine). Before the
@@ -244,10 +264,11 @@ func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) {
var localVal int var localVal int
placeholder := &localVal placeholder := &localVal
p.allResources[placeholder] = &resourceWrapper{resource: placeholder, status: resourceStatusCreating} startTime := time.Now()
p.allResources[placeholder] = &resourceWrapper{resource: placeholder, creationTime: startTime, status: resourceStatusCreating}
go func() { go func() {
res, err := p.create() res, err := p.createRes()
p.cond.L.Lock() p.cond.L.Lock()
delete(p.allResources, placeholder) delete(p.allResources, placeholder)
if err != nil { if err != nil {
@@ -256,7 +277,7 @@ func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) {
return return
} }
rw := &resourceWrapper{resource: res, status: resourceStatusBorrowed} rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed}
p.allResources[res] = rw p.allResources[res] = rw
p.cond.L.Unlock() p.cond.L.Unlock()
resChan <- res resChan <- res
@@ -278,6 +299,15 @@ func (p *Pool) backgroundFinishCreate(resChan chan interface{}, errChan chan err
}() }()
} }
func (p *Pool) backgroundClose(res interface{}) {
go func() {
err := p.closeRes(res)
if err != nil {
p.backgroundErrorHandler(err)
}
}()
}
// Return returns res to the the pool. If res is not part of the pool Return // Return returns res to the the pool. If res is not part of the pool Return
// will panic. // will panic.
func (p *Pool) Return(res interface{}) { func (p *Pool) Return(res interface{}) {
@@ -290,12 +320,18 @@ func (p *Pool) Return(res interface{}) {
} }
if p.closed { if p.closed {
err := p.closeRes(rw.resource)
if err != nil {
p.backgroundErrorHandler(err)
}
delete(p.allResources, rw.resource) delete(p.allResources, rw.resource)
p.cond.L.Unlock() p.cond.L.Unlock()
p.backgroundClose(rw.resource)
return
}
now := time.Now()
if now.Sub(rw.creationTime) > p.maxResourceDuration {
delete(p.allResources, rw.resource)
p.ensureMinResources()
p.cond.L.Unlock()
p.backgroundClose(rw.resource)
return return
} }
@@ -329,7 +365,12 @@ func (p *Pool) Remove(res interface{}) {
} }
}() }()
// Maintain min pool size (unless pool is already closed) p.ensureMinResources()
}
// ensureMinResources creates new resources if necessary to get pool up to min size.
// If pool is closed does nothing. p.cond.L must already be locked.
func (p *Pool) ensureMinResources() {
if !p.closed { if !p.closed {
for len(p.allResources) < p.minSize { for len(p.allResources) < p.minSize {
createResChan, createErrChan := p.startCreate() createResChan, createErrChan := p.startCreate()
+44 -3
View File
@@ -162,6 +162,30 @@ func TestPoolReturnPanicsIfResourceNotPartOfPool(t *testing.T) {
assert.Panics(t, func() { pool.Return(42) }) assert.Panics(t, func() { pool.Return(42) })
} }
func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) {
var createCalls Counter
createFunc := func() (interface{}, error) {
return createCalls.Next(), nil
}
var closeCalls Counter
closeFunc := func(interface{}) error {
closeCalls.Next()
return nil
}
pool := puddle.NewPool(createFunc, closeFunc)
res, err := pool.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, pool.Size())
pool.SetMaxResourceDuration(time.Nanosecond)
time.Sleep(time.Nanosecond)
pool.Return(res)
assert.Equal(t, 0, pool.Size())
}
func TestPoolCloseClosesAllAvailableResources(t *testing.T) { func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
var createCalls Counter var createCalls Counter
createFunc := func() (interface{}, error) { createFunc := func() (interface{}, error) {
@@ -193,6 +217,17 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
} }
func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) {
closeCallsChan := make(chan int, 4)
waitForRead := func(ch chan int) bool {
select {
case <-ch:
return true
case <-time.NewTimer(time.Second).C:
return false
}
}
var createCalls Counter var createCalls Counter
createFunc := func() (interface{}, error) { createFunc := func() (interface{}, error) {
return createCalls.Next(), nil return createCalls.Next(), nil
@@ -200,7 +235,8 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) {
var closeCalls Counter var closeCalls Counter
closeFunc := func(interface{}) error { closeFunc := func(interface{}) error {
closeCalls.Next() n := closeCalls.Next()
closeCallsChan <- n
return nil return nil
} }
@@ -220,6 +256,11 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) {
p.Return(res) p.Return(res)
} }
waitForRead(closeCallsChan)
waitForRead(closeCallsChan)
waitForRead(closeCallsChan)
waitForRead(closeCallsChan)
assert.Equal(t, len(resources), closeCalls.Value()) assert.Equal(t, len(resources), closeCalls.Value())
} }
@@ -483,8 +524,8 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosedErrorIsReported(t *testing.T
select { select {
case err = <-asyncErrChan: case err = <-asyncErrChan:
assert.Equal(t, errCloseFailed, err) assert.Equal(t, errCloseFailed, err)
default: case <-time.NewTimer(time.Second).C:
t.Fatal("error not reported") t.Fatal("timed out waiting for async error")
} }
} }