2
0

Releasing or destroying Resource is called by resource

This makes it impossible to return a resource to a pool that it did
not come from.
This commit is contained in:
Jack Christensen
2018-12-25 19:17:57 -06:00
parent 09852e05d7
commit 3afe49e88b
2 changed files with 67 additions and 115 deletions
+41 -33
View File
@@ -23,19 +23,32 @@ var ErrClosedPool = errors.New("cannot get from closed pool")
type CreateFunc func(ctx context.Context) (res interface{}, err error) type CreateFunc func(ctx context.Context) (res interface{}, err error)
type CloseFunc func(res interface{}) type CloseFunc func(res interface{})
type resourceWrapper struct { type Resource struct {
resource interface{} value interface{}
pool *Pool
creationTime time.Time creationTime time.Time
checkoutCount uint64 checkoutCount uint64
status byte status byte
} }
func (res *Resource) Value() interface{} {
return res.value
}
func (res *Resource) Release() {
res.pool.releaseBorrowedResource(res.value)
}
func (res *Resource) Destroy() {
res.pool.destroyBorrowedResource(res.value)
}
// Pool is a thread-safe resource pool. // Pool is a thread-safe resource pool.
type Pool struct { type Pool struct {
cond *sync.Cond cond *sync.Cond
allResources map[interface{}]*resourceWrapper allResources map[interface{}]*Resource
availableResources []*resourceWrapper availableResources []*Resource
minSize int minSize int
maxSize int maxSize int
maxResourceDuration time.Duration maxResourceDuration time.Duration
@@ -49,7 +62,7 @@ type Pool struct {
func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool { func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool {
return &Pool{ return &Pool{
cond: sync.NewCond(new(sync.Mutex)), cond: sync.NewCond(new(sync.Mutex)),
allResources: make(map[interface{}]*resourceWrapper), allResources: make(map[interface{}]*Resource),
maxSize: maxInt, maxSize: maxInt,
maxResourceDuration: math.MaxInt64, maxResourceDuration: math.MaxInt64,
maxResourceCheckouts: math.MaxUint64, maxResourceCheckouts: math.MaxUint64,
@@ -65,8 +78,8 @@ func (p *Pool) Close() {
p.closed = true p.closed = true
for _, rw := range p.availableResources { for _, rw := range p.availableResources {
p.closeRes(rw.resource) p.closeRes(rw.value)
delete(p.allResources, rw.resource) delete(p.allResources, rw.value)
} }
p.availableResources = nil p.availableResources = nil
p.cond.L.Unlock() p.cond.L.Unlock()
@@ -160,7 +173,7 @@ func (p *Pool) SetMaxResourceCheckouts(n uint64) {
// is not at maximum capacity it will create a new resource. If the pool is at // is not at maximum capacity it will create a new resource. If the pool is at
// maximum capacity it will block until a resource is available. ctx can be used // maximum capacity it will block until a resource is available. ctx can be used
// to cancel the Get. // to cancel the Get.
func (p *Pool) Get(ctx context.Context) (interface{}, error) { func (p *Pool) Get(ctx context.Context) (*Resource, error) {
if doneChan := ctx.Done(); doneChan != nil { if doneChan := ctx.Done(); doneChan != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -178,9 +191,9 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
// If a resource is available now // If a resource is available now
if len(p.availableResources) > 0 { if len(p.availableResources) > 0 {
res := p.lockedAvailableGet() rw := p.lockedAvailableGet()
p.cond.L.Unlock() p.cond.L.Unlock()
return res, nil return rw, nil
} }
// If there is room to create a resource do so // If there is room to create a resource do so
@@ -188,7 +201,7 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
var localVal int var localVal int
placeholder := &localVal placeholder := &localVal
startTime := time.Now() startTime := time.Now()
p.allResources[placeholder] = &resourceWrapper{resource: placeholder, creationTime: startTime, status: resourceStatusCreating} p.allResources[placeholder] = &Resource{value: placeholder, creationTime: startTime, status: resourceStatusCreating}
p.cond.L.Unlock() p.cond.L.Unlock()
res, err := p.createRes(ctx) res, err := p.createRes(ctx)
@@ -199,15 +212,15 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
return nil, err return nil, err
} }
rw := &resourceWrapper{resource: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1} rw := &Resource{pool: p, value: res, creationTime: startTime, status: resourceStatusBorrowed, checkoutCount: 1}
p.allResources[res] = rw p.allResources[res] = rw
p.cond.L.Unlock() p.cond.L.Unlock()
return res, nil return rw, nil
} }
p.cond.L.Unlock() p.cond.L.Unlock()
// Wait for a resource to be returned to the pool. // Wait for a resource to be returned to the pool.
waitResChan := make(chan interface{}) waitResChan := make(chan *Resource)
abortWaitResChan := make(chan struct{}) abortWaitResChan := make(chan struct{})
go func() { go func() {
p.cond.L.Lock() p.cond.L.Lock()
@@ -218,13 +231,13 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
p.cond.L.Unlock() p.cond.L.Unlock()
return return
} }
res := p.lockedAvailableGet() rw := p.lockedAvailableGet()
p.cond.L.Unlock() p.cond.L.Unlock()
select { select {
case <-abortWaitResChan: case <-abortWaitResChan:
p.Return(res) p.releaseBorrowedResource(rw.value)
case waitResChan <- res: case waitResChan <- rw:
} }
}() }()
@@ -232,14 +245,14 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
case <-ctx.Done(): case <-ctx.Done():
close(abortWaitResChan) close(abortWaitResChan)
return nil, ctx.Err() return nil, ctx.Err()
case res := <-waitResChan: case rw := <-waitResChan:
return res, nil return rw, nil
} }
} }
// lockedAvailableGet gets the top resource from p.availableResources. p.cond.L // lockedAvailableGet gets the top resource from p.availableResources. p.cond.L
// must already be locked. len(p.availableResources) must be > 0. // must already be locked. len(p.availableResources) must be > 0.
func (p *Pool) lockedAvailableGet() interface{} { func (p *Pool) lockedAvailableGet() *Resource {
rw := p.availableResources[len(p.availableResources)-1] rw := p.availableResources[len(p.availableResources)-1]
p.availableResources = p.availableResources[:len(p.availableResources)-1] p.availableResources = p.availableResources[:len(p.availableResources)-1]
if rw.status != resourceStatusAvailable { if rw.status != resourceStatusAvailable {
@@ -247,19 +260,14 @@ func (p *Pool) lockedAvailableGet() interface{} {
} }
rw.status = resourceStatusBorrowed rw.status = resourceStatusBorrowed
rw.checkoutCount += 1 rw.checkoutCount += 1
return rw.resource return rw
} }
// Return returns res to the the pool. If res is not part of the pool Return // releaseBorrowedResource returns res to the the pool.
// will panic. func (p *Pool) releaseBorrowedResource(res interface{}) {
func (p *Pool) Return(res interface{}) {
p.cond.L.Lock() p.cond.L.Lock()
rw, present := p.allResources[res] rw := p.allResources[res]
if !present {
p.cond.L.Unlock()
panic("Return called on resource that does not belong to pool")
}
closeResource := true closeResource := true
@@ -272,9 +280,9 @@ func (p *Pool) Return(res interface{}) {
} }
if closeResource { if closeResource {
delete(p.allResources, rw.resource) delete(p.allResources, rw.value)
p.cond.L.Unlock() p.cond.L.Unlock()
go p.closeRes(rw.resource) go p.closeRes(rw.value)
return return
} }
@@ -287,7 +295,7 @@ func (p *Pool) Return(res interface{}) {
// Remove removes res from the pool and closes it. If res is not part of the // Remove removes res from the pool and closes it. If res is not part of the
// pool Remove will panic. // pool Remove will panic.
func (p *Pool) Remove(res interface{}) { func (p *Pool) destroyBorrowedResource(res interface{}) {
p.cond.L.Lock() p.cond.L.Lock()
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
@@ -296,7 +304,7 @@ func (p *Pool) Remove(res interface{}) {
panic("Remove called on resource that does not belong to pool") panic("Remove called on resource that does not belong to pool")
} }
delete(p.allResources, rw.resource) delete(p.allResources, rw.value)
// close the resource in the background // close the resource in the background
go func() { go func() {
+26 -82
View File
@@ -83,12 +83,12 @@ func waitForRead(ch chan int) bool {
func TestPoolGetCreatesResourceWhenNoneAvailable(t *testing.T) { func TestPoolGetCreatesResourceWhenNoneAvailable(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
pool := puddle.NewPool(createFunc, stubCloseRes) pool := puddle.NewPool(createFunc, stubCloseRes)
defer pool.Close()
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res) assert.Equal(t, 1, res.Value())
res.Release()
pool.Return(res)
} }
func TestPoolGetDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) { func TestPoolGetDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
@@ -104,8 +104,8 @@ func TestPoolGetDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
for j := 0; j < 100; j++ { for j := 0; j < 100; j++ {
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, res) assert.Equal(t, 1, res.Value())
pool.Return(res) res.Release()
} }
wg.Done() wg.Done()
}() }()
@@ -135,15 +135,15 @@ func TestPoolGetReusesResources(t *testing.T) {
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res) assert.Equal(t, 1, res.Value())
pool.Return(res) res.Release()
res, err = pool.Get(context.Background()) res, err = pool.Get(context.Background())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res) assert.Equal(t, 1, res.Value())
pool.Return(res) res.Release()
assert.Equal(t, 1, createCounter.Value()) assert.Equal(t, 1, createCounter.Value())
} }
@@ -182,14 +182,7 @@ func TestPoolGetContextCanceledDuringCreate(t *testing.T) {
assert.Nil(t, res) assert.Nil(t, res)
} }
func TestPoolReturnPanicsIfResourceNotPartOfPool(t *testing.T) { func TestResourceReleaseClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) {
createFunc, _ := createCreateResourceFunc()
pool := puddle.NewPool(createFunc, stubCloseRes)
assert.Panics(t, func() { pool.Return(42) })
}
func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
@@ -202,14 +195,14 @@ func TestPoolReturnClosesAndRemovesResourceIfOlderThanMaxDuration(t *testing.T)
pool.SetMaxResourceDuration(time.Nanosecond) pool.SetMaxResourceDuration(time.Nanosecond)
time.Sleep(2 * time.Nanosecond) time.Sleep(2 * time.Nanosecond)
pool.Return(res) res.Release()
waitForRead(closeCallsChan) waitForRead(closeCallsChan)
assert.Equal(t, 0, pool.Size()) assert.Equal(t, 0, pool.Size())
assert.Equal(t, 1, closeCalls.Value()) assert.Equal(t, 1, closeCalls.Value())
} }
func TestPoolReturnClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourceCheckouts(t *testing.T) { func TestResourceReleaseClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourceCheckouts(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
@@ -219,7 +212,7 @@ func TestPoolReturnClosesAndRemovesResourceWhenResourceCheckoutCountIsMaxResourc
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
require.NoError(t, err) require.NoError(t, err)
pool.Return(res) res.Release()
waitForRead(closeCallsChan) waitForRead(closeCallsChan)
@@ -237,7 +230,7 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
p := puddle.NewPool(createFunc, closeFunc) p := puddle.NewPool(createFunc, closeFunc)
resources := make([]interface{}, 4) resources := make([]*puddle.Resource, 4)
for i := range resources { for i := range resources {
var err error var err error
resources[i], err = p.Get(context.Background()) resources[i], err = p.Get(context.Background())
@@ -245,7 +238,7 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
} }
for _, res := range resources { for _, res := range resources {
p.Return(res) res.Release()
} }
p.Close() p.Close()
@@ -253,13 +246,13 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
assert.Equal(t, len(resources), closeCalls.Value()) assert.Equal(t, len(resources), closeCalls.Value())
} }
func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) { func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
p := puddle.NewPool(createFunc, closeFunc) p := puddle.NewPool(createFunc, closeFunc)
resources := make([]interface{}, 4) resources := make([]*puddle.Resource, 4)
for i := range resources { for i := range resources {
var err error var err error
resources[i], err = p.Get(context.Background()) resources[i], err = p.Get(context.Background())
@@ -270,7 +263,7 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) {
assert.Equal(t, 0, closeCalls.Value()) assert.Equal(t, 0, closeCalls.Value())
for _, res := range resources { for _, res := range resources {
p.Return(res) res.Release()
} }
waitForRead(closeCallsChan) waitForRead(closeCallsChan)
@@ -281,68 +274,19 @@ func TestPoolReturnClosesResourcePoolIsAlreadyClosed(t *testing.T) {
assert.Equal(t, len(resources), closeCalls.Value()) assert.Equal(t, len(resources), closeCalls.Value())
} }
func TestPoolRemovePanicsIfResourceNotPartOfPool(t *testing.T) { func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
createFunc, _ := createCreateResourceFunc()
pool := puddle.NewPool(createFunc, stubCloseRes)
assert.Panics(t, func() { pool.Remove(42) })
}
func TestPoolRemoveRemovesResourceFromPool(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
pool := puddle.NewPool(createFunc, stubCloseRes) pool := puddle.NewPool(createFunc, stubCloseRes)
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res) assert.Equal(t, 1, res.Value())
assert.Equal(t, 1, pool.Size()) assert.Equal(t, 1, pool.Size())
pool.Remove(res) res.Destroy()
assert.Equal(t, 0, pool.Size()) assert.Equal(t, 0, pool.Size())
} }
func TestPoolRemoveRemovesResourceFromPoolAndDoesNotStartNewCreationToMaintainMinSizeWhenPoolIsClosed(t *testing.T) {
createFunc, createCounter, createCallsChan := createCreateResourceFuncWithNotifierChan()
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan()
pool := puddle.NewPool(createFunc, closeFunc)
// Ensure there are 2 resources available in pool
{
r1, err := pool.Get(context.Background())
require.Nil(t, err)
r2, err := pool.Get(context.Background())
require.Nil(t, err)
pool.Return(r1)
pool.Return(r2)
}
assert.Equal(t, 2, pool.Size())
pool.SetMinSize(2)
assert.Equal(t, 2, pool.Size())
{
r1, err := pool.Get(context.Background())
require.Nil(t, err)
r2, err := pool.Get(context.Background())
require.Nil(t, err)
pool.Close()
pool.Remove(r1)
pool.Remove(r2)
}
require.True(t, waitForRead(createCallsChan))
require.True(t, waitForRead(createCallsChan))
require.True(t, waitForRead(closeCallsChan))
require.True(t, waitForRead(closeCallsChan))
assert.Equal(t, 0, pool.Size())
assert.Equal(t, 2, createCounter.Value())
assert.Equal(t, 2, closeCalls.Value())
}
func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) { func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
pool := puddle.NewPool(createFunc, stubCloseRes) pool := puddle.NewPool(createFunc, stubCloseRes)
@@ -353,7 +297,7 @@ func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) {
assert.Nil(t, res) assert.Nil(t, res)
} }
func BenchmarkPoolGetAndReturn(b *testing.B) { func BenchmarkPoolGetAndRelease(b *testing.B) {
benchmarks := []struct { benchmarks := []struct {
poolSize int poolSize int
concurrentClientCount int concurrentClientCount int
@@ -397,13 +341,13 @@ func BenchmarkPoolGetAndReturn(b *testing.B) {
pool := puddle.NewPool(createFunc, stubCloseRes) pool := puddle.NewPool(createFunc, stubCloseRes)
pool.SetMaxSize(bm.poolSize) pool.SetMaxSize(bm.poolSize)
borrowAndReturn := func() { borrowAndRelease := func() {
res, err := pool.Get(context.Background()) res, err := pool.Get(context.Background())
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
time.Sleep(bm.loanDuration) time.Sleep(bm.loanDuration)
pool.Return(res) res.Release()
} }
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
@@ -418,13 +362,13 @@ func BenchmarkPoolGetAndReturn(b *testing.B) {
default: default:
} }
borrowAndReturn() borrowAndRelease()
} }
}() }()
} }
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
borrowAndReturn() borrowAndRelease()
} }
}) })
} }