From 9df21ce1a1074c535487347294422aeb8d1ab3e4 Mon Sep 17 00:00:00 2001 From: Jan Dubsky Date: Thu, 29 Sep 2022 20:37:19 +0200 Subject: [PATCH] Separate duplicate code to functions --- context.go | 24 +++++++++++++++++++ pool.go | 67 ++++++++++++++++++++++++++++++++-------------------- pool_test.go | 12 +++++----- 3 files changed, 71 insertions(+), 32 deletions(-) create mode 100644 context.go diff --git a/context.go b/context.go new file mode 100644 index 0000000..e19d2a6 --- /dev/null +++ b/context.go @@ -0,0 +1,24 @@ +package puddle + +import ( + "context" + "time" +) + +// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation. +type valueCancelCtx struct { + valueCtx context.Context + cancelCtx context.Context +} + +func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() } +func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() } +func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() } +func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) } + +func newValueCancelCtx(valueCtx, cancelContext context.Context) context.Context { + return &valueCancelCtx{ + valueCtx: valueCtx, + cancelCtx: cancelContext, + } +} diff --git a/pool.go b/pool.go index 50705f8..84a1fc7 100644 --- a/pool.go +++ b/pool.go @@ -131,7 +131,7 @@ type Pool[T any] struct { resetCount int baseAcquireCtx context.Context - cancelBaseAcquireCtx func() + cancelBaseAcquireCtx context.CancelFunc closed bool } @@ -274,16 +274,41 @@ func (p *Pool[T]) Stat() *Stat { return s } -// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation. -type valueCancelCtx struct { - valueCtx context.Context - cancelCtx context.Context +// tryAcquireIdleResource checks if there is any idle resource. If there is +// some, this method removes it from idle list and returns it. If the idle pool +// is empty, this method returns nil and doesn't modify the idleResources slice. +// +// WARNING: Caller of this method must hold the pool mutex! +func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { + if len(p.idleResources) == 0 { + return nil + } + + res := p.idleResources[len(p.idleResources)-1] + p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak + p.idleResources = p.idleResources[:len(p.idleResources)-1] + + return res } -func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() } -func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() } -func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() } -func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) } +// createNewResource creates a new resource and inserts it into list of pool +// resources. +// +// WARNING: Caller of this method must hold the pool mutex! +func (p *Pool[T]) createNewResource() *Resource[T] { + res := &Resource[T]{ + pool: p, + creationTime: time.Now(), + lastUsedNano: nanotime(), + poolResetCount: p.resetCount, + status: resourceStatusConstructing, + } + + p.allResources = append(p.allResources, res) + p.destructWG.Add(1) + + return res +} // Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will // create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be @@ -319,10 +344,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { } // If a resource is available now - if len(p.idleResources) > 0 { - res := p.idleResources[len(p.idleResources)-1] - p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak - p.idleResources = p.idleResources[:len(p.idleResources)-1] + if res := p.tryAcquireIdleResource(); res != nil { res.status = resourceStatusAcquired if emptyAcquire { p.emptyAcquireCount += 1 @@ -337,19 +359,17 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // If there is room to create a resource do so if len(p.allResources) < int(p.maxSize) { - res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} - p.allResources = append(p.allResources, res) - p.destructWG.Add(1) + res := p.createNewResource() p.cond.L.Unlock() // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 constructErrCh := make(chan error) go func() { - constructorCtx := &valueCancelCtx{valueCtx: ctx, cancelCtx: p.baseAcquireCtx} + constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) value, err := p.constructResourceValue(constructorCtx) - p.cond.L.Lock() if err != nil { + p.cond.L.Lock() p.allResources = removeResource(p.allResources, res) p.destructWG.Done() p.cond.L.Unlock() @@ -370,6 +390,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { select { case constructErrCh <- nil: + p.cond.L.Lock() p.emptyAcquireCount += 1 p.acquireCount += 1 p.acquireDuration += time.Duration(nanotime() - startNano) @@ -377,7 +398,6 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // No need to call Signal as this new resource was immediately acquired and did not change availability for // any waiting Acquire calls. case <-ctx.Done(): - p.cond.L.Unlock() p.releaseAcquiredResource(res, res.lastUsedNano) } }() @@ -436,19 +456,14 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { } // If a resource is available now - if len(p.idleResources) > 0 { - res := p.idleResources[len(p.idleResources)-1] - p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak - p.idleResources = p.idleResources[:len(p.idleResources)-1] + if res := p.tryAcquireIdleResource(); res != nil { p.acquireCount += 1 res.status = resourceStatusAcquired return res, nil } if len(p.allResources) < int(p.maxSize) { - res := &Resource[T]{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), poolResetCount: p.resetCount, status: resourceStatusConstructing} - p.allResources = append(p.allResources, res) - p.destructWG.Add(1) + res := p.createNewResource() go func() { value, err := p.constructResourceValue(ctx) diff --git a/pool_test.go b/pool_test.go index 012bf09..40c987a 100644 --- a/pool_test.go +++ b/pool_test.go @@ -27,18 +27,18 @@ type Counter struct { // Next increments the counter and returns the value func (c *Counter) Next() int { c.mutex.Lock() + defer c.mutex.Unlock() + c.n += 1 - n := c.n - c.mutex.Unlock() - return n + return c.n } // Value returns the counter func (c *Counter) Value() int { c.mutex.Lock() - n := c.n - c.mutex.Unlock() - return n + defer c.mutex.Unlock() + + return c.n } func createConstructor() (puddle.Constructor[int], *Counter) {