diff --git a/README.md b/README.md index 278eda8..9a4bb7f 100644 --- a/README.md +++ b/README.md @@ -12,3 +12,5 @@ Puddle is a generic resource pool library for Go. * Reset pool * Shrink pool * Stress test +* Stat - supercede Size, include available resources, checked out resources, get count, slow get count, slow get wait time, total create count, total background error count +* Refactor createCalls and closeCalls in tests diff --git a/pool.go b/pool.go index f14f645..5247a27 100644 --- a/pool.go +++ b/pool.go @@ -3,7 +3,9 @@ package puddle import ( "context" "errors" + "math" "sync" + "time" ) const ( @@ -27,31 +29,34 @@ type CloseFunc func(res interface{}) (err error) type BackgroundErrorHandler func(err error) type resourceWrapper struct { - resource interface{} - status byte + resource interface{} + creationTime time.Time + status byte } // Pool is a thread-safe resource pool. type Pool struct { cond *sync.Cond - allResources map[interface{}]*resourceWrapper - availableResources []*resourceWrapper - minSize int - maxSize int - closed bool + allResources map[interface{}]*resourceWrapper + availableResources []*resourceWrapper + minSize int + maxSize int + maxResourceDuration time.Duration + closed bool - create CreateFunc + createRes CreateFunc closeRes CloseFunc backgroundErrorHandler BackgroundErrorHandler } -func NewPool(create CreateFunc, closeRes CloseFunc) *Pool { +func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool { return &Pool{ cond: sync.NewCond(new(sync.Mutex)), allResources: make(map[interface{}]*resourceWrapper), maxSize: maxInt, - create: create, + maxResourceDuration: math.MaxInt64, + createRes: createRes, closeRes: closeRes, backgroundErrorHandler: func(error) {}, } @@ -101,10 +106,7 @@ func (p *Pool) SetMinSize(n int) { p.cond.L.Lock() p.minSize = n - for len(p.allResources) < p.minSize { - createResChan, createErrChan := p.startCreate() - p.backgroundFinishCreate(createResChan, createErrChan) - } + p.ensureMinResources() p.cond.L.Unlock() } @@ -127,6 +129,24 @@ func (p *Pool) SetMaxSize(n int) { p.cond.L.Unlock() } +// MaxResourceDuration returns the current maximum resource duration of the pool. +func (p *Pool) MaxResourceDuration() time.Duration { + p.cond.L.Lock() + n := p.maxResourceDuration + p.cond.L.Unlock() + return n +} + +// SetMaxResourceDuration sets the maximum maximum resource duration of the pool. It panics if n < 1. +func (p *Pool) SetMaxResourceDuration(d time.Duration) { + if d < 0 { + panic("pool MaxResourceDuration cannot be < 0") + } + p.cond.L.Lock() + p.maxResourceDuration = d + p.cond.L.Unlock() +} + // SetBackgroundErrorHandler assigns a handler for errors that have no other // place to be reported. For example, Get is called when no resources are // available. Get begins creating a new resource (in a goroutine). Before the @@ -244,10 +264,11 @@ func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) { var localVal int placeholder := &localVal - p.allResources[placeholder] = &resourceWrapper{resource: placeholder, status: resourceStatusCreating} + startTime := time.Now() + p.allResources[placeholder] = &resourceWrapper{resource: placeholder, creationTime: startTime, status: resourceStatusCreating} go func() { - res, err := p.create() + res, err := p.createRes() p.cond.L.Lock() delete(p.allResources, placeholder) if err != nil { @@ -256,7 +277,7 @@ func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) { return } - rw := &resourceWrapper{resource: res, status: resourceStatusBorrowed} + rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed} p.allResources[res] = rw p.cond.L.Unlock() resChan <- res @@ -278,6 +299,15 @@ func (p *Pool) backgroundFinishCreate(resChan chan interface{}, errChan chan err }() } +func (p *Pool) backgroundClose(res interface{}) { + go func() { + err := p.closeRes(res) + if err != nil { + p.backgroundErrorHandler(err) + } + }() +} + // Return returns res to the the pool. If res is not part of the pool Return // will panic. func (p *Pool) Return(res interface{}) { @@ -290,12 +320,18 @@ func (p *Pool) Return(res interface{}) { } if p.closed { - err := p.closeRes(rw.resource) - if err != nil { - p.backgroundErrorHandler(err) - } delete(p.allResources, rw.resource) p.cond.L.Unlock() + p.backgroundClose(rw.resource) + return + } + + now := time.Now() + if now.Sub(rw.creationTime) > p.maxResourceDuration { + delete(p.allResources, rw.resource) + p.ensureMinResources() + p.cond.L.Unlock() + p.backgroundClose(rw.resource) return } @@ -329,7 +365,12 @@ func (p *Pool) Remove(res interface{}) { } }() - // Maintain min pool size (unless pool is already closed) + p.ensureMinResources() +} + +// ensureMinResources creates new resources if necessary to get pool up to min size. +// If pool is closed does nothing. p.cond.L must already be locked. +func (p *Pool) ensureMinResources() { if !p.closed { for len(p.allResources) < p.minSize { createResChan, createErrChan := p.startCreate() diff --git a/pool_test.go b/pool_test.go index 96aeea4..f23c3bf 100644 --- a/pool_test.go +++ b/pool_test.go @@ -162,6 +162,30 @@ func TestPoolReturnPanicsIfResourceNotPartOfPool(t *testing.T) { assert.Panics(t, func() { pool.Return(42) }) } +func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) { + var createCalls Counter + createFunc := func() (interface{}, error) { + return createCalls.Next(), nil + } + var closeCalls Counter + closeFunc := func(interface{}) error { + closeCalls.Next() + return nil + } + + pool := puddle.NewPool(createFunc, closeFunc) + + res, err := pool.Get(context.Background()) + require.NoError(t, err) + + assert.Equal(t, 1, pool.Size()) + pool.SetMaxResourceDuration(time.Nanosecond) + time.Sleep(time.Nanosecond) + + pool.Return(res) + assert.Equal(t, 0, pool.Size()) +} + func TestPoolCloseClosesAllAvailableResources(t *testing.T) { var createCalls Counter createFunc := func() (interface{}, error) { @@ -193,6 +217,17 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) { } func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { + closeCallsChan := make(chan int, 4) + + waitForRead := func(ch chan int) bool { + select { + case <-ch: + return true + case <-time.NewTimer(time.Second).C: + return false + } + } + var createCalls Counter createFunc := func() (interface{}, error) { return createCalls.Next(), nil @@ -200,7 +235,8 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { var closeCalls Counter closeFunc := func(interface{}) error { - closeCalls.Next() + n := closeCalls.Next() + closeCallsChan <- n return nil } @@ -220,6 +256,11 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { p.Return(res) } + waitForRead(closeCallsChan) + waitForRead(closeCallsChan) + waitForRead(closeCallsChan) + waitForRead(closeCallsChan) + assert.Equal(t, len(resources), closeCalls.Value()) } @@ -483,8 +524,8 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosedErrorIsReported(t *testing.T select { case err = <-asyncErrChan: assert.Equal(t, errCloseFailed, err) - default: - t.Fatal("error not reported") + case <-time.NewTimer(time.Second).C: + t.Fatal("timed out waiting for async error") } }