Add Reset()
This commit is contained in:
@@ -30,11 +30,12 @@ type Destructor[T any] func(res T)
|
||||
|
||||
// Resource is the resource handle returned by acquiring from the pool.
|
||||
type Resource[T any] struct {
|
||||
value T
|
||||
pool *Pool[T]
|
||||
creationTime time.Time
|
||||
lastUsedNano int64
|
||||
status byte
|
||||
value T
|
||||
pool *Pool[T]
|
||||
creationTime time.Time
|
||||
lastUsedNano int64
|
||||
poolResetCount int
|
||||
status byte
|
||||
}
|
||||
|
||||
// Value returns the resource value.
|
||||
@@ -126,6 +127,8 @@ type Pool[T any] struct {
|
||||
emptyAcquireCount int64
|
||||
canceledAcquireCount int64
|
||||
|
||||
resetCount int
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
@@ -303,7 +306,7 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
|
||||
|
||||
// If there is room to create a resource do so
|
||||
if len(p.allResources) < int(p.maxSize) {
|
||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing}
|
||||
p.allResources = append(p.allResources, res)
|
||||
p.destructWG.Add(1)
|
||||
p.cond.L.Unlock()
|
||||
@@ -388,7 +391,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
|
||||
}
|
||||
|
||||
if len(p.allResources) < int(p.maxSize) {
|
||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
|
||||
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing}
|
||||
p.allResources = append(p.allResources, res)
|
||||
p.destructWG.Add(1)
|
||||
|
||||
@@ -450,11 +453,12 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
||||
}
|
||||
|
||||
res := &Resource[T]{
|
||||
pool: p,
|
||||
creationTime: time.Now(),
|
||||
status: resourceStatusIdle,
|
||||
value: value,
|
||||
lastUsedNano: nanotime(),
|
||||
pool: p,
|
||||
creationTime: time.Now(),
|
||||
status: resourceStatusIdle,
|
||||
value: value,
|
||||
lastUsedNano: nanotime(),
|
||||
poolResetCount: p.resetCount,
|
||||
}
|
||||
p.destructWG.Add(1)
|
||||
|
||||
@@ -472,17 +476,36 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
|
||||
// disrupt all resources (such as a network interruption or a server state change).
|
||||
//
|
||||
// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
|
||||
// to the pool.
|
||||
func (p *Pool[T]) Reset() {
|
||||
p.cond.L.Lock()
|
||||
defer p.cond.L.Unlock()
|
||||
|
||||
p.resetCount++
|
||||
|
||||
for i := range p.idleResources {
|
||||
p.allResources = removeResource(p.allResources, p.idleResources[i])
|
||||
go p.destructResourceValue(p.idleResources[i].value)
|
||||
p.idleResources[i] = nil
|
||||
}
|
||||
p.idleResources = p.idleResources[0:0]
|
||||
}
|
||||
|
||||
// releaseAcquiredResource returns res to the the pool.
|
||||
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
|
||||
p.cond.L.Lock()
|
||||
|
||||
if !p.closed {
|
||||
if p.closed || res.poolResetCount != p.resetCount {
|
||||
p.allResources = removeResource(p.allResources, res)
|
||||
go p.destructResourceValue(res.value)
|
||||
} else {
|
||||
res.lastUsedNano = lastUsedNano
|
||||
res.status = resourceStatusIdle
|
||||
p.idleResources = append(p.idleResources, res)
|
||||
} else {
|
||||
p.allResources = removeResource(p.allResources, res)
|
||||
go p.destructResourceValue(res.value)
|
||||
}
|
||||
|
||||
p.cond.L.Unlock()
|
||||
|
||||
@@ -417,6 +417,82 @@ func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
|
||||
p.Close()
|
||||
}
|
||||
|
||||
func TestPoolResetDestroysAllIdleResources(t *testing.T) {
|
||||
constructor, _ := createConstructor()
|
||||
|
||||
var destructorCalls Counter
|
||||
destructor := func(int) {
|
||||
destructorCalls.Next()
|
||||
}
|
||||
|
||||
p := puddle.NewPool(constructor, destructor, 10)
|
||||
|
||||
resources := make([]*puddle.Resource[int], 4)
|
||||
for i := range resources {
|
||||
var err error
|
||||
resources[i], err = p.Acquire(context.Background())
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
for _, res := range resources {
|
||||
res.Release()
|
||||
}
|
||||
|
||||
require.EqualValues(t, 4, p.Stat().TotalResources())
|
||||
p.Reset()
|
||||
require.EqualValues(t, 0, p.Stat().TotalResources())
|
||||
|
||||
// Destructors are called in the background. No way to know when they are all finished.
|
||||
for i := 0; i < 100; i++ {
|
||||
if destructorCalls.Value() == len(resources) {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
require.Equal(t, len(resources), destructorCalls.Value())
|
||||
|
||||
p.Close()
|
||||
}
|
||||
|
||||
func TestPoolResetDestroysCheckedOutResourcesOnReturn(t *testing.T) {
|
||||
constructor, _ := createConstructor()
|
||||
|
||||
var destructorCalls Counter
|
||||
destructor := func(int) {
|
||||
destructorCalls.Next()
|
||||
}
|
||||
|
||||
p := puddle.NewPool(constructor, destructor, 10)
|
||||
|
||||
resources := make([]*puddle.Resource[int], 4)
|
||||
for i := range resources {
|
||||
var err error
|
||||
resources[i], err = p.Acquire(context.Background())
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
require.EqualValues(t, 4, p.Stat().TotalResources())
|
||||
p.Reset()
|
||||
require.EqualValues(t, 4, p.Stat().TotalResources())
|
||||
|
||||
for _, res := range resources {
|
||||
res.Release()
|
||||
}
|
||||
|
||||
require.EqualValues(t, 0, p.Stat().TotalResources())
|
||||
|
||||
// Destructors are called in the background. No way to know when they are all finished.
|
||||
for i := 0; i < 100; i++ {
|
||||
if destructorCalls.Value() == len(resources) {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
require.Equal(t, len(resources), destructorCalls.Value())
|
||||
|
||||
p.Close()
|
||||
}
|
||||
|
||||
func TestPoolStatResources(t *testing.T) {
|
||||
startWaitChan := make(chan struct{})
|
||||
waitingChan := make(chan struct{})
|
||||
|
||||
Reference in New Issue
Block a user