2
0

Separate duplicate code to functions

This commit is contained in:
Jan Dubsky
2022-09-29 20:37:19 +02:00
committed by Jack Christensen
parent 6214680aa8
commit 9df21ce1a1
3 changed files with 71 additions and 32 deletions
+24
View File
@@ -0,0 +1,24 @@
package puddle
import (
"context"
"time"
)
// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation.
type valueCancelCtx struct {
valueCtx context.Context
cancelCtx context.Context
}
func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() }
func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() }
func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() }
func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) }
func newValueCancelCtx(valueCtx, cancelContext context.Context) context.Context {
return &valueCancelCtx{
valueCtx: valueCtx,
cancelCtx: cancelContext,
}
}
+41 -26
View File
@@ -131,7 +131,7 @@ type Pool[T any] struct {
resetCount int resetCount int
baseAcquireCtx context.Context baseAcquireCtx context.Context
cancelBaseAcquireCtx func() cancelBaseAcquireCtx context.CancelFunc
closed bool closed bool
} }
@@ -274,16 +274,41 @@ func (p *Pool[T]) Stat() *Stat {
return s return s
} }
// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation. // tryAcquireIdleResource checks if there is any idle resource. If there is
type valueCancelCtx struct { // some, this method removes it from idle list and returns it. If the idle pool
valueCtx context.Context // is empty, this method returns nil and doesn't modify the idleResources slice.
cancelCtx context.Context //
// WARNING: Caller of this method must hold the pool mutex!
func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
if len(p.idleResources) == 0 {
return nil
}
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
return res
} }
func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() } // createNewResource creates a new resource and inserts it into list of pool
func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() } // resources.
func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() } //
func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) } // WARNING: Caller of this method must hold the pool mutex!
func (p *Pool[T]) createNewResource() *Resource[T] {
res := &Resource[T]{
pool: p,
creationTime: time.Now(),
lastUsedNano: nanotime(),
poolResetCount: p.resetCount,
status: resourceStatusConstructing,
}
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
return res
}
// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will // Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be // create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
@@ -319,10 +344,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
} }
// If a resource is available now // If a resource is available now
if len(p.idleResources) > 0 { if res := p.tryAcquireIdleResource(); res != nil {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
res.status = resourceStatusAcquired res.status = resourceStatusAcquired
if emptyAcquire { if emptyAcquire {
p.emptyAcquireCount += 1 p.emptyAcquireCount += 1
@@ -337,19 +359,17 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
// If there is room to create a resource do so // If there is room to create a resource do so
if len(p.allResources) < int(p.maxSize) { if len(p.allResources) < int(p.maxSize) {
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} res := p.createNewResource()
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
p.cond.L.Unlock() p.cond.L.Unlock()
// Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling
// the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259
constructErrCh := make(chan error) constructErrCh := make(chan error)
go func() { go func() {
constructorCtx := &valueCancelCtx{valueCtx: ctx, cancelCtx: p.baseAcquireCtx} constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
value, err := p.constructResourceValue(constructorCtx) value, err := p.constructResourceValue(constructorCtx)
p.cond.L.Lock()
if err != nil { if err != nil {
p.cond.L.Lock()
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
p.destructWG.Done() p.destructWG.Done()
p.cond.L.Unlock() p.cond.L.Unlock()
@@ -370,6 +390,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
select { select {
case constructErrCh <- nil: case constructErrCh <- nil:
p.cond.L.Lock()
p.emptyAcquireCount += 1 p.emptyAcquireCount += 1
p.acquireCount += 1 p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano) p.acquireDuration += time.Duration(nanotime() - startNano)
@@ -377,7 +398,6 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
// No need to call Signal as this new resource was immediately acquired and did not change availability for // No need to call Signal as this new resource was immediately acquired and did not change availability for
// any waiting Acquire calls. // any waiting Acquire calls.
case <-ctx.Done(): case <-ctx.Done():
p.cond.L.Unlock()
p.releaseAcquiredResource(res, res.lastUsedNano) p.releaseAcquiredResource(res, res.lastUsedNano)
} }
}() }()
@@ -436,19 +456,14 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
} }
// If a resource is available now // If a resource is available now
if len(p.idleResources) > 0 { if res := p.tryAcquireIdleResource(); res != nil {
res := p.idleResources[len(p.idleResources)-1]
p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak
p.idleResources = p.idleResources[:len(p.idleResources)-1]
p.acquireCount += 1 p.acquireCount += 1
res.status = resourceStatusAcquired res.status = resourceStatusAcquired
return res, nil return res, nil
} }
if len(p.allResources) < int(p.maxSize) { if len(p.allResources) < int(p.maxSize) {
res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} res := p.createNewResource()
p.allResources = append(p.allResources, res)
p.destructWG.Add(1)
go func() { go func() {
value, err := p.constructResourceValue(ctx) value, err := p.constructResourceValue(ctx)
+6 -6
View File
@@ -27,18 +27,18 @@ type Counter struct {
// Next increments the counter and returns the value // Next increments the counter and returns the value
func (c *Counter) Next() int { func (c *Counter) Next() int {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock()
c.n += 1 c.n += 1
n := c.n return c.n
c.mutex.Unlock()
return n
} }
// Value returns the counter // Value returns the counter
func (c *Counter) Value() int { func (c *Counter) Value() int {
c.mutex.Lock() c.mutex.Lock()
n := c.n defer c.mutex.Unlock()
c.mutex.Unlock()
return n return c.n
} }
func createConstructor() (puddle.Constructor[int], *Counter) { func createConstructor() (puddle.Constructor[int], *Counter) {