diff --git a/pool.go b/pool.go index 5606694..bae1e15 100644 --- a/pool.go +++ b/pool.go @@ -18,8 +18,8 @@ const maxInt = int(maxUint >> 1) // ErrClosedPool occurs on an attempt to get a connection from a closed pool. var ErrClosedPool = errors.New("cannot get from closed pool") -type CreateFunc func(ctx context.Context) (res interface{}, err error) -type CloseFunc func(res interface{}) +type Constructor func(ctx context.Context) (res interface{}, err error) +type Destructor func(res interface{}) type Resource struct { value interface{} @@ -41,7 +41,8 @@ func (res *Resource) Destroy() { // Pool is a thread-safe resource pool. type Pool struct { - cond *sync.Cond + cond *sync.Cond + destructWG *sync.WaitGroup allResources []*Resource availableResources []*Resource @@ -50,34 +51,37 @@ type Pool struct { maxSize int closed bool - createRes CreateFunc - closeRes CloseFunc + constructor Constructor + destructor Destructor } -func NewPool(createRes CreateFunc, closeRes CloseFunc) *Pool { +func NewPool(constructor Constructor, destructor Destructor) *Pool { return &Pool{ - cond: sync.NewCond(new(sync.Mutex)), - maxSize: maxInt, - createRes: createRes, - closeRes: closeRes, + cond: sync.NewCond(new(sync.Mutex)), + destructWG: &sync.WaitGroup{}, + maxSize: maxInt, + constructor: constructor, + destructor: destructor, } } // Close closes all resources in the pool and rejects future Acquire calls. -// Unavailable resources will be closes when they are returned to the pool. +// Blocks until all resources are returned to pool and closed. func (p *Pool) Close() { p.cond.L.Lock() p.closed = true for _, res := range p.availableResources { - p.closeRes(res.value) p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) } p.availableResources = nil p.cond.L.Unlock() // Wake up all go routines waiting for a resource to be returned so they can terminate. p.cond.Broadcast() + + p.destructWG.Wait() } // Size returns the current size of the pool. @@ -159,7 +163,7 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) { p.allResources = append(p.allResources, res) p.cond.L.Unlock() - value, err := p.createRes(ctx) + value, err := p.constructResourceValue(ctx) p.cond.L.Lock() if err != nil { p.allResources = removeResource(p.allResources, res) @@ -217,16 +221,14 @@ func (p *Pool) lockedAvailableAcquire() *Resource { func (p *Pool) releaseBorrowedResource(res *Resource) { p.cond.L.Lock() - if p.closed { + if !p.closed { + res.status = resourceStatusAvailable + p.availableResources = append(p.availableResources, res) + } else { p.allResources = removeResource(p.allResources, res) - p.cond.L.Unlock() - go p.closeRes(res.value) - return + go p.destructResourceValue(res.value) } - res.status = resourceStatusAvailable - p.availableResources = append(p.availableResources, res) - p.cond.L.Unlock() p.cond.Signal() } @@ -235,12 +237,12 @@ func (p *Pool) releaseBorrowedResource(res *Resource) { // pool Remove will panic. func (p *Pool) destroyBorrowedResource(res *Resource) { p.cond.L.Lock() + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + p.cond.L.Unlock() p.cond.Signal() - - // close the resource in the background - go p.closeRes(res.value) } func removeResource(slice []*Resource, res *Resource) []*Resource { @@ -253,3 +255,17 @@ func removeResource(slice []*Resource, res *Resource) []*Resource { return slice } + +func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) { + value, err := p.constructor(ctx) + if err != nil { + return nil, err + } + p.destructWG.Add(1) + return value, nil +} + +func (p *Pool) destructResourceValue(value interface{}) { + p.destructor(value) + p.destructWG.Done() +} diff --git a/pool_test.go b/pool_test.go index bfdb8cc..7fe042e 100644 --- a/pool_test.go +++ b/pool_test.go @@ -35,7 +35,7 @@ func (c *Counter) Value() int { return n } -func createCreateResourceFunc() (puddle.CreateFunc, *Counter) { +func createCreateResourceFunc() (puddle.Constructor, *Counter) { var c Counter f := func(ctx context.Context) (interface{}, error) { return c.Next(), nil @@ -43,7 +43,7 @@ func createCreateResourceFunc() (puddle.CreateFunc, *Counter) { return f, &c } -func createCreateResourceFuncWithNotifierChan() (puddle.CreateFunc, *Counter, chan int) { +func createCreateResourceFuncWithNotifierChan() (puddle.Constructor, *Counter, chan int) { ch := make(chan int) var c Counter f := func(ctx context.Context) (interface{}, error) { @@ -57,7 +57,7 @@ func createCreateResourceFuncWithNotifierChan() (puddle.CreateFunc, *Counter, ch return f, &c, ch } -func createCloseResourceFuncWithNotifierChan() (puddle.CloseFunc, *Counter, chan int) { +func createCloseResourceFuncWithNotifierChan() (puddle.Destructor, *Counter, chan int) { ch := make(chan int) var c Counter f := func(interface{}) { @@ -208,9 +208,12 @@ func TestPoolCloseClosesAllAvailableResources(t *testing.T) { assert.Equal(t, len(resources), closeCalls.Value()) } -func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) { +func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) { createFunc, _ := createCreateResourceFunc() - closeFunc, closeCalls, closeCallsChan := createCloseResourceFuncWithNotifierChan() + var closeCalls Counter + closeFunc := func(interface{}) { + closeCalls.Next() + } p := puddle.NewPool(createFunc, closeFunc) @@ -221,18 +224,14 @@ func TestPoolReleaseClosesResourcePoolIsAlreadyClosed(t *testing.T) { require.Nil(t, err) } - p.Close() - assert.Equal(t, 0, closeCalls.Value()) - for _, res := range resources { - res.Release() + go func() { + time.Sleep(100 * time.Millisecond) + res.Release() + }() } - waitForRead(closeCallsChan) - waitForRead(closeCallsChan) - waitForRead(closeCallsChan) - waitForRead(closeCallsChan) - + p.Close() assert.Equal(t, len(resources), closeCalls.Value()) }