diff --git a/pool.go b/pool.go index 28d2f50..d61c156 100644 --- a/pool.go +++ b/pool.go @@ -23,19 +23,32 @@ var ErrClosedPool = errors.New("cannot get from closed pool") type CreateFunc func(ctx context.Context) (res interface{}, err error) type CloseFunc func(res interface{}) -type resourceWrapper struct { - resource interface{} +type Resource struct { + value interface{} + pool *Pool creationTime time.Time checkoutCount uint64 status byte } +func (res *Resource) Value() interface{} { + return res.value +} + +func (res *Resource) Release() { + res.pool.releaseBorrowedResource(res.value) +} + +func (res *Resource) Destroy() { + res.pool.destroyBorrowedResource(res.value) +} + // Pool is a thread-safe resource pool. type Pool struct { cond *sync.Cond - allResources map[interface{}]*resourceWrapper - availableResources []*resourceWrapper + allResources map[interface{}]*Resource + availableResources []*Resource minSize int maxSize int maxResourceDuration time.Duration @@ -49,7 +62,7 @@ type Pool struct { func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool { return &Pool{ cond: sync.NewCond(new(sync.Mutex)), - allResources: make(map[interface{}]*resourceWrapper), + allResources: make(map[interface{}]*Resource), maxSize: maxInt, maxResourceDuration: math.MaxInt64, maxResourceCheckouts: math.MaxUint64, @@ -65,8 +78,8 @@ func (p *Pool) Close() { p.closed = true for _, rw := range p.availableResources { - p.closeRes(rw.resource) - delete(p.allResources, rw.resource) + p.closeRes(rw.value) + delete(p.allResources, rw.value) } p.availableResources = nil p.cond.L.Unlock() @@ -160,7 +173,7 @@ func (p *Pool) SetMaxResourceCheckouts(n uint64) { // is not at maximum capacity it will create a new resource. If the pool is at // maximum capacity it will block until a resource is available. ctx can be used // to cancel the Get. -func (p *Pool) Get(ctx context.Context) (interface{}, error) { +func (p *Pool) Get(ctx context.Context) (*Resource, error) { if doneChan := ctx.Done(); doneChan != nil { select { case <-ctx.Done(): @@ -178,9 +191,9 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { // If a resource is available now if len(p.availableResources) > 0 { - res := p.lockedAvailableGet() + rw := p.lockedAvailableGet() p.cond.L.Unlock() - return res, nil + return rw, nil } // If there is room to create a resource do so @@ -188,7 +201,7 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { var localVal int placeholder := &localVal startTime := time.Now() - p.allResources[placeholder] = &resourceWrapper{resource: placeholder, creationTime: startTime, status: resourceStatusCreating} + p.allResources[placeholder] = &Resource{value: placeholder, creationTime: startTime, status: resourceStatusCreating} p.cond.L.Unlock() res, err := p.createRes(ctx) @@ -199,15 +212,15 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { return nil, err } - rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1} + rw := &Resource{pool: p, value: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1} p.allResources[res] = rw p.cond.L.Unlock() - return res, nil + return rw, nil } p.cond.L.Unlock() // Wait for a resource to be returned to the pool. - waitResChan := make(chan interface{}) + waitResChan := make(chan *Resource) abortWaitResChan := make(chan struct{}) go func() { p.cond.L.Lock() @@ -218,13 +231,13 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { p.cond.L.Unlock() return } - res := p.lockedAvailableGet() + rw := p.lockedAvailableGet() p.cond.L.Unlock() select { case <-abortWaitResChan: - p.Return(res) - case waitResChan <- res: + p.releaseBorrowedResource(rw.value) + case waitResChan <- rw: } }() @@ -232,14 +245,14 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { case <-ctx.Done(): close(abortWaitResChan) return nil, ctx.Err() - case res := <-waitResChan: - return res, nil + case rw := <-waitResChan: + return rw, nil } } // lockedAvailableGet gets the top resource from p.availableResources. p.cond.L // must already be locked. len(p.availableResources) must be > 0. -func (p *Pool) lockedAvailableGet() interface{} { +func (p *Pool) lockedAvailableGet() *Resource { rw := p.availableResources[len(p.availableResources)-1] p.availableResources = p.availableResources[:len(p.availableResources)-1] if rw.status != resourceStatusAvailable { @@ -247,19 +260,14 @@ func (p *Pool) lockedAvailableGet() interface{} { } rw.status = resourceStatusBorrowed rw.checkoutCount += 1 - return rw.resource + return rw } -// Return returns res to the the pool. If res is not part of the pool Return -// will panic. -func (p *Pool) Return(res interface{}) { +// releaseBorrowedResource returns res to the the pool. +func (p *Pool) releaseBorrowedResource(res interface{}) { p.cond.L.Lock() - rw, present := p.allResources[res] - if !present { - p.cond.L.Unlock() - panic("Return called on resource that does not belong to pool") - } + rw := p.allResources[res] closeResource := true @@ -272,9 +280,9 @@ func (p *Pool) Return(res interface{}) { } if closeResource { - delete(p.allResources, rw.resource) + delete(p.allResources, rw.value) p.cond.L.Unlock() - go p.closeRes(rw.resource) + go p.closeRes(rw.value) return } @@ -287,7 +295,7 @@ func (p *Pool) Return(res interface{}) { // Remove removes res from the pool and closes it. If res is not part of the // pool Remove will panic. -func (p *Pool) Remove(res interface{}) { +func (p *Pool) destroyBorrowedResource(res interface{}) { p.cond.L.Lock() defer p.cond.L.Unlock() @@ -296,7 +304,7 @@ func (p *Pool) Remove(res interface{}) { panic("Remove called on resource that does not belong to pool") } - delete(p.allResources, rw.resource) + delete(p.allResources, rw.value) // close the resource in the background go func() { diff --git a/pool_test.go b/pool_test.go index 3bfa0f5..f530217 100644 --- a/pool_test.go +++ b/pool_test.go @@ -83,12 +83,12 @@ func waitForRead(ch chan int) bool { func TestPoolGetCreatesResourceWhenNoneAvailable(t *testing.T) { createFunc, _ := createCreateResourceFunc() pool := puddle.NewPool(createFunc, stubCloseRes) + defer pool.Close() res, err := pool.Get(context.Background()) require.NoError(t, err) - assert.Equal(t, 1, res) - - pool.Return(res) + assert.Equal(t, 1, res.Value()) + res.Release() } func TestPoolGetDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) { @@ -104,8 +104,8 @@ func TestPoolGetDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) { for j := 0; j < 100; j++ { res, err := pool.Get(context.Background()) assert.NoError(t, err) - assert.Equal(t, 1, res) - pool.Return(res) + assert.Equal(t, 1, res.Value()) + res.Release() } wg.Done() }() @@ -135,15 +135,15 @@ func TestPoolGetReusesResources(t *testing.T) { res, err := pool.Get(context.Background()) require.NoError(t, err) - assert.Equal(t, 1, res) + assert.Equal(t, 1, res.Value()) - pool.Return(res) + res.Release() res, err = pool.Get(context.Background()) require.NoError(t, err) - assert.Equal(t, 1, res) + assert.Equal(t, 1, res.Value()) - pool.Return(res) + res.Release() assert.Equal(t, 1, createCounter.Value()) } @@ -182,14 +182,7 @@ func TestPoolGetContextCanceledDuringCreate(t *testing.T) { assert.Nil(t, res) } -func TestPoolReturnPanicsIfResourceNotPartOfPool(t *testing.T) { - createFunc, _ := createCreateResourceFunc() - pool := puddle.NewPool(createFunc, stubCloseRes) - - assert.Panics(t, func() { pool.Return(42) }) -} - -func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) { +func TestResourceReleaseClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) { createFunc, _ := createCreateResourceFunc() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() @@ -202,14 +195,14 @@ func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) pool.SetMaxResourceDuration(time.Nanosecond) time.Sleep(2 * time.Nanosecond) - pool.Return(res) + res.Release() waitForRead(closeCallsChan) assert.Equal(t, 0, pool.Size()) assert.Equal(t, 1, closeCalls.Value()) } -func TestPoolReturnClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourceCheckouts(t *testing.T) { +func TestResourceReleaseClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourceCheckouts(t *testing.T) { createFunc, _ := createCreateResourceFunc() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() @@ -219,7 +212,7 @@ func TestPoolReturnClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourc res, err := pool.Get(context.Background()) require.NoError(t, err) - pool.Return(res) + res.Release() waitForRead(closeCallsChan) @@ -237,7 +230,7 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) { p := puddle.NewPool(createFunc, closeFunc) - resources := make([]interface{}, 4) + resources := make([]*puddle.Resource, 4) for i := range resources { var err error resources[i], err = p.Get(context.Background()) @@ -245,7 +238,7 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) { } for _, res := range resources { - p.Return(res) + res.Release() } p.Close() @@ -253,13 +246,13 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) { assert.Equal(t, len(resources), closeCalls.Value()) } -func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { +func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) { createFunc, _ := createCreateResourceFunc() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() p := puddle.NewPool(createFunc, closeFunc) - resources := make([]interface{}, 4) + resources := make([]*puddle.Resource, 4) for i := range resources { var err error resources[i], err = p.Get(context.Background()) @@ -270,7 +263,7 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { assert.Equal(t, 0, closeCalls.Value()) for _, res := range resources { - p.Return(res) + res.Release() } waitForRead(closeCallsChan) @@ -281,68 +274,19 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { assert.Equal(t, len(resources), closeCalls.Value()) } -func TestPoolRemovePanicsIfResourceNotPartOfPool(t *testing.T) { - createFunc, _ := createCreateResourceFunc() - pool := puddle.NewPool(createFunc, stubCloseRes) - - assert.Panics(t, func() { pool.Remove(42) }) -} - -func TestPoolRemoveRemovesResourceFromPool(t *testing.T) { +func TestResourceDestroyRemovesResourceFromPool(t *testing.T) { createFunc, _ := createCreateResourceFunc() pool := puddle.NewPool(createFunc, stubCloseRes) res, err := pool.Get(context.Background()) require.NoError(t, err) - assert.Equal(t, 1, res) + assert.Equal(t, 1, res.Value()) assert.Equal(t, 1, pool.Size()) - pool.Remove(res) + res.Destroy() assert.Equal(t, 0, pool.Size()) } -func TestPoolRemoveRemovesResourceFromPoolAndDoesNotStartNewCreationToMaintainMinSizeWhenPoolIsClosed(t *testing.T) { - createFunc, createCounter, createCallsChan := createCreateResourceFuncWithNotifierChan() - closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() - - pool := puddle.NewPool(createFunc, closeFunc) - - // Ensure there are 2 resources available in pool - { - r1, err := pool.Get(context.Background()) - require.Nil(t, err) - r2, err := pool.Get(context.Background()) - require.Nil(t, err) - pool.Return(r1) - pool.Return(r2) - } - - assert.Equal(t, 2, pool.Size()) - pool.SetMinSize(2) - assert.Equal(t, 2, pool.Size()) - - { - r1, err := pool.Get(context.Background()) - require.Nil(t, err) - r2, err := pool.Get(context.Background()) - require.Nil(t, err) - - pool.Close() - - pool.Remove(r1) - pool.Remove(r2) - } - - require.True(t, waitForRead(createCallsChan)) - require.True(t, waitForRead(createCallsChan)) - require.True(t, waitForRead(closeCallsChan)) - require.True(t, waitForRead(closeCallsChan)) - - assert.Equal(t, 0, pool.Size()) - assert.Equal(t, 2, createCounter.Value()) - assert.Equal(t, 2, closeCalls.Value()) -} - func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) { createFunc, _ := createCreateResourceFunc() pool := puddle.NewPool(createFunc, stubCloseRes) @@ -353,7 +297,7 @@ func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) { assert.Nil(t, res) } -func BenchmarkPoolGetAndReturn(b *testing.B) { +func BenchmarkPoolGetAndRelease(b *testing.B) { benchmarks := []struct { poolSize int concurrentClientCount int @@ -397,13 +341,13 @@ func BenchmarkPoolGetAndReturn(b *testing.B) { pool := puddle.NewPool(createFunc, stubCloseRes) pool.SetMaxSize(bm.poolSize) - borrowAndReturn := func() { + borrowAndRelease := func() { res, err := pool.Get(context.Background()) if err != nil { b.Fatal(err) } time.Sleep(bm.loanDuration) - pool.Return(res) + res.Release() } b.Run(name, func(b *testing.B) { @@ -418,13 +362,13 @@ func BenchmarkPoolGetAndReturn(b *testing.B) { default: } - borrowAndReturn() + borrowAndRelease() } }() } for i := 0; i < b.N; i++ { - borrowAndReturn() + borrowAndRelease() } }) }