diff --git a/pool.go b/pool.go index 38a61a8..acbcc33 100644 --- a/pool.go +++ b/pool.go @@ -30,11 +30,12 @@ type Destructor[T any] func(res T) // Resource is the resource handle returned by acquiring from the pool. type Resource[T any] struct { - value T - pool *Pool[T] - creationTime time.Time - lastUsedNano int64 - status byte + value T + pool *Pool[T] + creationTime time.Time + lastUsedNano int64 + poolResetCount int + status byte } // Value returns the resource value. @@ -126,6 +127,8 @@ type Pool[T any] struct { emptyAcquireCount int64 canceledAcquireCount int64 + resetCount int + closed bool } @@ -303,7 +306,7 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { // If there is room to create a resource do so if len(p.allResources) < int(p.maxSize) { - res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} p.allResources = append(p.allResources, res) p.destructWG.Add(1) p.cond.L.Unlock() @@ -388,7 +391,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { } if len(p.allResources) < int(p.maxSize) { - res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} p.allResources = append(p.allResources, res) p.destructWG.Add(1) @@ -450,11 +453,12 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { } res := &Resource[T]{ - pool: p, - creationTime: time.Now(), - status: resourceStatusIdle, - value: value, - lastUsedNano: nanotime(), + pool: p, + creationTime: time.Now(), + status: resourceStatusIdle, + value: value, + lastUsedNano: nanotime(), + poolResetCount: p.resetCount, } p.destructWG.Add(1) @@ -472,17 +476,36 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { return nil } +// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would +// disrupt all resources (such as a network interruption or a server state change). +// +// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned +// to the pool. +func (p *Pool[T]) Reset() { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + p.resetCount++ + + for i := range p.idleResources { + p.allResources = removeResource(p.allResources, p.idleResources[i]) + go p.destructResourceValue(p.idleResources[i].value) + p.idleResources[i] = nil + } + p.idleResources = p.idleResources[0:0] +} + // releaseAcquiredResource returns res to the the pool. func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { p.cond.L.Lock() - if !p.closed { + if p.closed || res.poolResetCount != p.resetCount { + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + } else { res.lastUsedNano = lastUsedNano res.status = resourceStatusIdle p.idleResources = append(p.idleResources, res) - } else { - p.allResources = removeResource(p.allResources, res) - go p.destructResourceValue(res.value) } p.cond.L.Unlock() diff --git a/pool_test.go b/pool_test.go index 4a93ad2..5a0f645 100644 --- a/pool_test.go +++ b/pool_test.go @@ -417,6 +417,82 @@ func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) { p.Close() } +func TestPoolResetDestroysAllIdleResources(t *testing.T) { + constructor, _ := createConstructor() + + var destructorCalls Counter + destructor := func(int) { + destructorCalls.Next() + } + + p := puddle.NewPool(constructor, destructor, 10) + + resources := make([]*puddle.Resource[int], 4) + for i := range resources { + var err error + resources[i], err = p.Acquire(context.Background()) + require.Nil(t, err) + } + + for _, res := range resources { + res.Release() + } + + require.EqualValues(t, 4, p.Stat().TotalResources()) + p.Reset() + require.EqualValues(t, 0, p.Stat().TotalResources()) + + // Destructors are called in the background. No way to know when they are all finished. + for i := 0; i < 100; i++ { + if destructorCalls.Value() == len(resources) { + break + } + time.Sleep(100 * time.Millisecond) + } + require.Equal(t, len(resources), destructorCalls.Value()) + + p.Close() +} + +func TestPoolResetDestroysCheckedOutResourcesOnReturn(t *testing.T) { + constructor, _ := createConstructor() + + var destructorCalls Counter + destructor := func(int) { + destructorCalls.Next() + } + + p := puddle.NewPool(constructor, destructor, 10) + + resources := make([]*puddle.Resource[int], 4) + for i := range resources { + var err error + resources[i], err = p.Acquire(context.Background()) + require.Nil(t, err) + } + + require.EqualValues(t, 4, p.Stat().TotalResources()) + p.Reset() + require.EqualValues(t, 4, p.Stat().TotalResources()) + + for _, res := range resources { + res.Release() + } + + require.EqualValues(t, 0, p.Stat().TotalResources()) + + // Destructors are called in the background. No way to know when they are all finished. + for i := 0; i < 100; i++ { + if destructorCalls.Value() == len(resources) { + break + } + time.Sleep(100 * time.Millisecond) + } + require.Equal(t, len(resources), destructorCalls.Value()) + + p.Close() +} + func TestPoolStatResources(t *testing.T) { startWaitChan := make(chan struct{}) waitingChan := make(chan struct{})