2
0

Pool.Close blocks until all resources released and destructed

Renamed resource constructor and destructor callbacks.
This commit is contained in:
Jack Christensen
2018-12-26 02:59:36 -06:00
parent f718a625cf
commit ab3795c24c
2 changed files with 52 additions and 37 deletions
+39 -23
View File
@@ -18,8 +18,8 @@ const maxInt = int(maxUint >> 1)
// ErrClosedPool occurs on an attempt to get a connection from a closed pool. // ErrClosedPool occurs on an attempt to get a connection from a closed pool.
var ErrClosedPool = errors.New("cannot get from closed pool") var ErrClosedPool = errors.New("cannot get from closed pool")
type CreateFunc func(ctx context.Context) (res interface{}, err error) type Constructor func(ctx context.Context) (res interface{}, err error)
type CloseFunc func(res interface{}) type Destructor func(res interface{})
type Resource struct { type Resource struct {
value interface{} value interface{}
@@ -41,7 +41,8 @@ func (res *Resource) Destroy() {
// Pool is a thread-safe resource pool. // Pool is a thread-safe resource pool.
type Pool struct { type Pool struct {
cond *sync.Cond cond *sync.Cond
destructWG *sync.WaitGroup
allResources []*Resource allResources []*Resource
availableResources []*Resource availableResources []*Resource
@@ -50,34 +51,37 @@ type Pool struct {
maxSize int maxSize int
closed bool closed bool
createRes CreateFunc constructor Constructor
closeRes CloseFunc destructor Destructor
} }
func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool { func NewPool(constructor Constructor, destructor Destructor) *Pool {
return &Pool{ return &Pool{
cond: sync.NewCond(new(sync.Mutex)), cond: sync.NewCond(new(sync.Mutex)),
maxSize: maxInt, destructWG: &sync.WaitGroup{},
createRes: createRes, maxSize: maxInt,
closeRes: closeRes, constructor: constructor,
destructor: destructor,
} }
} }
// Close closes all resources in the pool and rejects future Acquire calls. // Close closes all resources in the pool and rejects future Acquire calls.
// Unavailable resources will be closes when they are returned to the pool. // Blocks until all resources are returned to pool and closed.
func (p *Pool) Close() { func (p *Pool) Close() {
p.cond.L.Lock() p.cond.L.Lock()
p.closed = true p.closed = true
for _, res := range p.availableResources { for _, res := range p.availableResources {
p.closeRes(res.value)
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
} }
p.availableResources = nil p.availableResources = nil
p.cond.L.Unlock() p.cond.L.Unlock()
// Wake up all go routines waiting for a resource to be returned so they can terminate. // Wake up all go routines waiting for a resource to be returned so they can terminate.
p.cond.Broadcast() p.cond.Broadcast()
p.destructWG.Wait()
} }
// Size returns the current size of the pool. // Size returns the current size of the pool.
@@ -159,7 +163,7 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
p.allResources = append(p.allResources, res) p.allResources = append(p.allResources, res)
p.cond.L.Unlock() p.cond.L.Unlock()
value, err := p.createRes(ctx) value, err := p.constructResourceValue(ctx)
p.cond.L.Lock() p.cond.L.Lock()
if err != nil { if err != nil {
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
@@ -217,16 +221,14 @@ func (p *Pool) lockedAvailableAcquire() *Resource {
func (p *Pool) releaseBorrowedResource(res *Resource) { func (p *Pool) releaseBorrowedResource(res *Resource) {
p.cond.L.Lock() p.cond.L.Lock()
if p.closed { if !p.closed {
res.status = resourceStatusAvailable
p.availableResources = append(p.availableResources, res)
} else {
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
p.cond.L.Unlock() go p.destructResourceValue(res.value)
go p.closeRes(res.value)
return
} }
res.status = resourceStatusAvailable
p.availableResources = append(p.availableResources, res)
p.cond.L.Unlock() p.cond.L.Unlock()
p.cond.Signal() p.cond.Signal()
} }
@@ -235,12 +237,12 @@ func (p *Pool) releaseBorrowedResource(res *Resource) {
// pool Remove will panic. // pool Remove will panic.
func (p *Pool) destroyBorrowedResource(res *Resource) { func (p *Pool) destroyBorrowedResource(res *Resource) {
p.cond.L.Lock() p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
go p.destructResourceValue(res.value)
p.cond.L.Unlock() p.cond.L.Unlock()
p.cond.Signal() p.cond.Signal()
// close the resource in the background
go p.closeRes(res.value)
} }
func removeResource(slice []*Resource, res *Resource) []*Resource { func removeResource(slice []*Resource, res *Resource) []*Resource {
@@ -253,3 +255,17 @@ func removeResource(slice []*Resource, res *Resource) []*Resource {
return slice return slice
} }
func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) {
value, err := p.constructor(ctx)
if err != nil {
return nil, err
}
p.destructWG.Add(1)
return value, nil
}
func (p *Pool) destructResourceValue(value interface{}) {
p.destructor(value)
p.destructWG.Done()
}
+13 -14
View File
@@ -35,7 +35,7 @@ func (c *Counter) Value() int {
return n return n
} }
func createCreateResourceFunc() (puddle.CreateFunc, *Counter) { func createCreateResourceFunc() (puddle.Constructor, *Counter) {
var c Counter var c Counter
f := func(ctx context.Context) (interface{}, error) { f := func(ctx context.Context) (interface{}, error) {
return c.Next(), nil return c.Next(), nil
@@ -43,7 +43,7 @@ func createCreateResourceFunc() (puddle.CreateFunc, *Counter) {
return f, &c return f, &c
} }
func createCreateResourceFuncWithNotifierChan() (puddle.CreateFunc, *Counter, chan int) { func createCreateResourceFuncWithNotifierChan() (puddle.Constructor, *Counter, chan int) {
ch := make(chan int) ch := make(chan int)
var c Counter var c Counter
f := func(ctx context.Context) (interface{}, error) { f := func(ctx context.Context) (interface{}, error) {
@@ -57,7 +57,7 @@ func createCreateResourceFuncWithNotifierChan() (puddle.CreateFunc, *Counter, ch
return f, &c, ch return f, &c, ch
} }
func createCloseResourceFuncWithNotifierChan() (puddle.CloseFunc, *Counter, chan int) { func createCloseResourceFuncWithNotifierChan() (puddle.Destructor, *Counter, chan int) {
ch := make(chan int) ch := make(chan int)
var c Counter var c Counter
f := func(interface{}) { f := func(interface{}) {
@@ -208,9 +208,12 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) {
assert.Equal(t, len(resources), closeCalls.Value()) assert.Equal(t, len(resources), closeCalls.Value())
} }
func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) { func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
createFunc, _ := createCreateResourceFunc() createFunc, _ := createCreateResourceFunc()
closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() var closeCalls Counter
closeFunc := func(interface{}) {
closeCalls.Next()
}
p := puddle.NewPool(createFunc, closeFunc) p := puddle.NewPool(createFunc, closeFunc)
@@ -221,18 +224,14 @@ func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
} }
p.Close()
assert.Equal(t, 0, closeCalls.Value())
for _, res := range resources { for _, res := range resources {
res.Release() go func() {
time.Sleep(100 * time.Millisecond)
res.Release()
}()
} }
waitForRead(closeCallsChan) p.Close()
waitForRead(closeCallsChan)
waitForRead(closeCallsChan)
waitForRead(closeCallsChan)
assert.Equal(t, len(resources), closeCalls.Value()) assert.Equal(t, len(resources), closeCalls.Value())
} }