diff --git a/go.mod b/go.mod index 002c40d..e64539b 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.4.0 // indirect + golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f1ea297..813eea1 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc= +golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pool.go b/pool.go index 84a1fc7..c60fc5b 100644 --- a/pool.go +++ b/pool.go @@ -6,6 +6,8 @@ import ( "sync" "sync/atomic" "time" + + "golang.org/x/sync/semaphore" ) const ( @@ -113,8 +115,9 @@ func (res *Resource[T]) IdleDuration() time.Duration { // Pool is a concurrency-safe resource pool. type Pool[T any] struct { - cond *sync.Cond - destructWG *sync.WaitGroup + mux sync.Mutex + acquireSem *semaphore.Weighted + destructWG sync.WaitGroup allResources []*Resource[T] idleResources []*Resource[T] @@ -150,8 +153,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) return &Pool[T]{ - cond: sync.NewCond(new(sync.Mutex)), - destructWG: &sync.WaitGroup{}, + acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), maxSize: config.MaxSize, constructor: config.Constructor, destructor: config.Destructor, @@ -163,9 +165,12 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { // Close destroys all resources in the pool and rejects future Acquire calls. // Blocks until all resources are returned to pool and destroyed. func (p *Pool[T]) Close() { - p.cond.L.Lock() + defer p.destructWG.Wait() + + p.mux.Lock() + defer p.mux.Unlock() + if p.closed { - p.cond.L.Unlock() return } p.closed = true @@ -176,12 +181,6 @@ func (p *Pool[T]) Close() { go p.destructResourceValue(res.value) } p.idleResources = nil - p.cond.L.Unlock() - - // Wake up all go routines waiting for a resource to be returned so they can terminate. - p.cond.Broadcast() - - p.destructWG.Wait() } // Stat is a snapshot of Pool statistics. @@ -250,7 +249,9 @@ func (s *Stat) CanceledAcquireCount() int64 { // Stat returns the current pool statistics. func (p *Pool[T]) Stat() *Stat { - p.cond.L.Lock() + p.mux.Lock() + defer p.mux.Unlock() + s := &Stat{ maxResources: p.maxSize, acquireCount: p.acquireCount, @@ -270,7 +271,6 @@ func (p *Pool[T]) Stat() *Stat { } } - p.cond.L.Unlock() return s } @@ -318,7 +318,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] { // ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids // the problem of it being impossible to create resources when the time to create a resource is greater than any one // caller of Acquire is willing to wait. -func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { +func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { select { case <-ctx.Done(): p.canceledAcquireCount.Add(1) @@ -333,114 +333,96 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { // validity. This function exists separatly only for benchmarking purposes. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { startNano := nanotime() - p.cond.L.Lock() - emptyAcquire := false + var waitedForLock bool + if !p.acquireSem.TryAcquire(1) { + waitedForLock = true + err := p.acquireSem.Acquire(ctx, 1) + if err != nil { + p.canceledAcquireCount.Add(1) + return nil, err + } + } - for { - if p.closed { - p.cond.L.Unlock() - return nil, ErrClosedPool + p.mux.Lock() + if p.closed { + p.mux.Unlock() + p.acquireSem.Release(1) + return nil, ErrClosedPool + } + + // If a resource is available in the pool. + if res := p.tryAcquireIdleResource(); res != nil { + res.status = resourceStatusAcquired + if waitedForLock { + p.emptyAcquireCount += 1 + } + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.mux.Unlock() + return res, nil + } + + if len(p.allResources) >= int(p.maxSize) { + p.mux.Unlock() + p.acquireSem.Release(1) + panic("bug: semaphore allowed more acquires than pool allows") + } + + // The resource is not available, but there is enough space to create + // one. + res := p.createNewResource() + p.mux.Unlock() + + // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling + // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 + constructErrCh := make(chan error) + go func() { + constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) + value, err := p.constructResourceValue(constructorCtx) + if err != nil { + p.mux.Lock() + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + + p.mux.Unlock() + // We won't take the resource so we allow someone else + // to run Acquire. + p.acquireSem.Release(1) + + select { + case constructErrCh <- err: + case <-ctx.Done(): + // The caller is cancelled, so no-one awaits the + // error. This branch avoid goroutine leak. + } + return } - // If a resource is available now - if res := p.tryAcquireIdleResource(); res != nil { - res.status = resourceStatusAcquired - if emptyAcquire { - p.emptyAcquireCount += 1 - } + res.value = value + res.status = resourceStatusAcquired + + select { + case constructErrCh <- nil: + p.mux.Lock() + p.emptyAcquireCount += 1 p.acquireCount += 1 p.acquireDuration += time.Duration(nanotime() - startNano) - p.cond.L.Unlock() - return res, nil + p.mux.Unlock() + case <-ctx.Done(): + p.releaseAcquiredResource(res, res.lastUsedNano) } + }() - emptyAcquire = true - - // If there is room to create a resource do so - if len(p.allResources) < int(p.maxSize) { - res := p.createNewResource() - p.cond.L.Unlock() - - // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling - // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 - constructErrCh := make(chan error) - go func() { - constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) - value, err := p.constructResourceValue(constructorCtx) - if err != nil { - p.cond.L.Lock() - p.allResources = removeResource(p.allResources, res) - p.destructWG.Done() - p.cond.L.Unlock() - p.cond.Signal() - - select { - case constructErrCh <- err: - case <-ctx.Done(): - // The caller is cancelled, so - // no-one awaits the error. This - // branch avoid goroutine leak. - } - return - } - - res.value = value - res.status = resourceStatusAcquired - - select { - case constructErrCh <- nil: - p.cond.L.Lock() - p.emptyAcquireCount += 1 - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) - p.cond.L.Unlock() - // No need to call Signal as this new resource was immediately acquired and did not change availability for - // any waiting Acquire calls. - case <-ctx.Done(): - p.releaseAcquiredResource(res, res.lastUsedNano) - } - }() - - select { - case <-ctx.Done(): - p.canceledAcquireCount.Add(1) - return nil, ctx.Err() - case err := <-constructErrCh: - if err != nil { - return nil, err - } - // we don't call signal here because we didn't change the resource pools - // at all so waking anything else up won't help - return res, nil - } - } - - if ctx.Done() == nil { - p.cond.Wait() - } else { - // Convert p.cond.Wait into a channel - waitChan := make(chan struct{}, 1) - go func() { - p.cond.Wait() - waitChan <- struct{}{} - }() - - select { - case <-ctx.Done(): - // Allow goroutine waiting for signal to exit. Re-signal since we couldn't - // do anything with it. Another goroutine might be waiting. - go func() { - <-waitChan - p.cond.L.Unlock() - p.cond.Signal() - }() - - p.canceledAcquireCount.Add(1) - return nil, ctx.Err() - case <-waitChan: - } + select { + case <-ctx.Done(): + p.canceledAcquireCount.Add(1) + return nil, ctx.Err() + case err := <-constructErrCh: + if err != nil { + return nil, err } + return res, nil } } @@ -448,10 +430,15 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // resources are available but the pool has room to grow, a resource will be created in the background. ctx is only // used to cancel the background creation. func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { - p.cond.L.Lock() - defer p.cond.L.Unlock() + if !p.acquireSem.TryAcquire(1) { + return nil, ErrNotAvailable + } + + p.mux.Lock() + defer p.mux.Unlock() if p.closed { + p.acquireSem.Release(1) return nil, ErrClosedPool } @@ -462,27 +449,31 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { return res, nil } - if len(p.allResources) < int(p.maxSize) { - res := p.createNewResource() - - go func() { - value, err := p.constructResourceValue(ctx) - defer p.cond.Signal() - p.cond.L.Lock() - defer p.cond.L.Unlock() - - if err != nil { - p.allResources = removeResource(p.allResources, res) - p.destructWG.Done() - return - } - - res.value = value - res.status = resourceStatusIdle - p.idleResources = append(p.idleResources, res) - }() + if len(p.allResources) >= int(p.maxSize) { + panic("bug: semaphore allowed more acquires than pool allows") } + res := p.createNewResource() + go func() { + value, err := p.constructResourceValue(ctx) + // We have to create the resource and only then release the + // semaphore - For the time being there is no resource that + // someone could acquire. + defer p.acquireSem.Release(1) + p.mux.Lock() + defer p.mux.Unlock() + + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + return + } + + res.value = value + res.status = resourceStatusIdle + p.idleResources = append(p.idleResources, res) + }() + return nil, ErrNotAvailable } @@ -490,19 +481,41 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // use is for health check and keep-alive functionality. It does not update pool // statistics. func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { - p.cond.L.Lock() - if p.closed { - p.cond.L.Unlock() + var cnt int + for p.acquireSem.TryAcquire(1) { + cnt++ + } + if cnt == 0 { return nil } - for _, res := range p.idleResources { + resources := make([]*Resource[T], 0, cnt) + + p.mux.Lock() + defer p.mux.Unlock() + + if p.closed { + p.acquireSem.Release(int64(cnt)) + return nil + } + + // Some resources from the maxSize limit do not have to exist (i.e. be + // idle). + if diff := cnt - len(p.idleResources); diff > 0 { + p.acquireSem.Release(int64(diff)) + cnt = len(p.idleResources) + } + + // Resources above cnt might be reserved by the semaphore. + for i := len(p.idleResources) - cnt; i < len(p.idleResources); i++ { + res := p.idleResources[i] + p.idleResources[i] = nil // Avoid memory leak. + res.status = resourceStatusAcquired + resources = append(resources, res) } - resources := p.idleResources // Swap out current slice - p.idleResources = nil + p.idleResources = p.idleResources[:len(p.idleResources)-cnt] - p.cond.L.Unlock() return resources } @@ -510,13 +523,13 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { // It goes straight in the IdlePool. It does not check against maxSize. // It can be useful to maintain warm resources under little load. func (p *Pool[T]) CreateResource(ctx context.Context) error { - p.cond.L.Lock() + p.mux.Lock() if p.closed { - p.cond.L.Unlock() + p.mux.Unlock() return ErrClosedPool } p.destructWG.Add(1) - p.cond.L.Unlock() + p.mux.Unlock() value, err := p.constructResourceValue(ctx) if err != nil { @@ -533,8 +546,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { poolResetCount: p.resetCount, } - p.cond.L.Lock() - defer p.cond.L.Unlock() + p.mux.Lock() + defer p.mux.Unlock() // If closed while constructing resource then destroy it and return an error if p.closed { @@ -553,8 +566,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error { // It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned // to the pool. func (p *Pool[T]) Reset() { - p.cond.L.Lock() - defer p.cond.L.Unlock() + p.mux.Lock() + defer p.mux.Unlock() p.resetCount++ @@ -568,7 +581,9 @@ func (p *Pool[T]) Reset() { // releaseAcquiredResource returns res to the the pool. func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { - p.cond.L.Lock() + defer p.acquireSem.Release(1) + p.mux.Lock() + defer p.mux.Unlock() if p.closed || res.poolResetCount != p.resetCount { p.allResources = removeResource(p.allResources, res) @@ -578,30 +593,26 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) res.status = resourceStatusIdle p.idleResources = append(p.idleResources, res) } - - p.cond.L.Unlock() - p.cond.Signal() } // Remove removes res from the pool and closes it. If res is not part of the // pool Remove will panic. func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { p.destructResourceValue(res.value) - p.cond.L.Lock() + defer p.acquireSem.Release(1) + p.mux.Lock() + defer p.mux.Unlock() p.allResources = removeResource(p.allResources, res) - p.cond.L.Unlock() - p.cond.Signal() } func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { - p.cond.L.Lock() + defer p.acquireSem.Release(1) + p.mux.Lock() + defer p.mux.Unlock() p.allResources = removeResource(p.allResources, res) res.status = resourceStatusHijacked p.destructWG.Done() // not responsible for destructing hijacked resources - - p.cond.L.Unlock() - p.cond.Signal() } func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {