2
0

Use semaphore rather than conditional variable

This commit is contained in:
Jan Dubsky
2022-09-29 22:45:38 +02:00
committed by Jack Christensen
parent 9df21ce1a1
commit 021588b93e
3 changed files with 174 additions and 160 deletions
+1
View File
@@ -8,5 +8,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect github.com/stretchr/objx v0.4.0 // indirect
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
+2
View File
@@ -12,6 +12,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc=
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+110 -99
View File
@@ -6,6 +6,8 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"golang.org/x/sync/semaphore"
) )
const ( const (
@@ -113,8 +115,9 @@ func (res *Resource[T]) IdleDuration() time.Duration {
// Pool is a concurrency-safe resource pool. // Pool is a concurrency-safe resource pool.
type Pool[T any] struct { type Pool[T any] struct {
cond *sync.Cond mux sync.Mutex
destructWG *sync.WaitGroup acquireSem *semaphore.Weighted
destructWG sync.WaitGroup
allResources []*Resource[T] allResources []*Resource[T]
idleResources []*Resource[T] idleResources []*Resource[T]
@@ -150,8 +153,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
return &Pool[T]{ return &Pool[T]{
cond: sync.NewCond(new(sync.Mutex)), acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
destructWG: &sync.WaitGroup{},
maxSize: config.MaxSize, maxSize: config.MaxSize,
constructor: config.Constructor, constructor: config.Constructor,
destructor: config.Destructor, destructor: config.Destructor,
@@ -163,9 +165,12 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
// Close destroys all resources in the pool and rejects future Acquire calls. // Close destroys all resources in the pool and rejects future Acquire calls.
// Blocks until all resources are returned to pool and destroyed. // Blocks until all resources are returned to pool and destroyed.
func (p *Pool[T]) Close() { func (p *Pool[T]) Close() {
p.cond.L.Lock() defer p.destructWG.Wait()
p.mux.Lock()
defer p.mux.Unlock()
if p.closed { if p.closed {
p.cond.L.Unlock()
return return
} }
p.closed = true p.closed = true
@@ -176,12 +181,6 @@ func (p *Pool[T]) Close() {
go p.destructResourceValue(res.value) go p.destructResourceValue(res.value)
} }
p.idleResources = nil p.idleResources = nil
p.cond.L.Unlock()
// Wake up all go routines waiting for a resource to be returned so they can terminate.
p.cond.Broadcast()
p.destructWG.Wait()
} }
// Stat is a snapshot of Pool statistics. // Stat is a snapshot of Pool statistics.
@@ -250,7 +249,9 @@ func (s *Stat) CanceledAcquireCount() int64 {
// Stat returns the current pool statistics. // Stat returns the current pool statistics.
func (p *Pool[T]) Stat() *Stat { func (p *Pool[T]) Stat() *Stat {
p.cond.L.Lock() p.mux.Lock()
defer p.mux.Unlock()
s := &Stat{ s := &Stat{
maxResources: p.maxSize, maxResources: p.maxSize,
acquireCount: p.acquireCount, acquireCount: p.acquireCount,
@@ -270,7 +271,6 @@ func (p *Pool[T]) Stat() *Stat {
} }
} }
p.cond.L.Unlock()
return s return s
} }
@@ -318,7 +318,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] {
// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids // ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
// the problem of it being impossible to create resources when the time to create a resource is greater than any one // the problem of it being impossible to create resources when the time to create a resource is greater than any one
// caller of Acquire is willing to wait. // caller of Acquire is willing to wait.
func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) { func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
p.canceledAcquireCount.Add(1) p.canceledAcquireCount.Add(1)
@@ -333,34 +333,46 @@ func (p *Pool[T]) Acquire(ctx context.Context) (*Resource[T], error) {
// validity. This function exists separatly only for benchmarking purposes. // validity. This function exists separatly only for benchmarking purposes.
func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
startNano := nanotime() startNano := nanotime()
p.cond.L.Lock()
emptyAcquire := false var waitedForLock bool
if !p.acquireSem.TryAcquire(1) {
waitedForLock = true
err := p.acquireSem.Acquire(ctx, 1)
if err != nil {
p.canceledAcquireCount.Add(1)
return nil, err
}
}
for { p.mux.Lock()
if p.closed { if p.closed {
p.cond.L.Unlock() p.mux.Unlock()
p.acquireSem.Release(1)
return nil, ErrClosedPool return nil, ErrClosedPool
} }
// If a resource is available now // If a resource is available in the pool.
if res := p.tryAcquireIdleResource(); res != nil { if res := p.tryAcquireIdleResource(); res != nil {
res.status = resourceStatusAcquired res.status = resourceStatusAcquired
if emptyAcquire { if waitedForLock {
p.emptyAcquireCount += 1 p.emptyAcquireCount += 1
} }
p.acquireCount += 1 p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano) p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock() p.mux.Unlock()
return res, nil return res, nil
} }
emptyAcquire = true if len(p.allResources) >= int(p.maxSize) {
p.mux.Unlock()
p.acquireSem.Release(1)
panic("bug: semaphore allowed more acquires than pool allows")
}
// If there is room to create a resource do so // The resource is not available, but there is enough space to create
if len(p.allResources) < int(p.maxSize) { // one.
res := p.createNewResource() res := p.createNewResource()
p.cond.L.Unlock() p.mux.Unlock()
// Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling // Create the resource in a goroutine to immediately return from Acquire if ctx is canceled without also canceling
// the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259 // the constructor. See: https://github.com/jackc/pgx/issues/1287 and https://github.com/jackc/pgx/issues/1259
@@ -369,18 +381,20 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
value, err := p.constructResourceValue(constructorCtx) value, err := p.constructResourceValue(constructorCtx)
if err != nil { if err != nil {
p.cond.L.Lock() p.mux.Lock()
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
p.destructWG.Done() p.destructWG.Done()
p.cond.L.Unlock()
p.cond.Signal() p.mux.Unlock()
// We won't take the resource so we allow someone else
// to run Acquire.
p.acquireSem.Release(1)
select { select {
case constructErrCh <- err: case constructErrCh <- err:
case <-ctx.Done(): case <-ctx.Done():
// The caller is cancelled, so // The caller is cancelled, so no-one awaits the
// no-one awaits the error. This // error. This branch avoid goroutine leak.
// branch avoid goroutine leak.
} }
return return
} }
@@ -390,13 +404,11 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
select { select {
case constructErrCh <- nil: case constructErrCh <- nil:
p.cond.L.Lock() p.mux.Lock()
p.emptyAcquireCount += 1 p.emptyAcquireCount += 1
p.acquireCount += 1 p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano) p.acquireDuration += time.Duration(nanotime() - startNano)
p.cond.L.Unlock() p.mux.Unlock()
// No need to call Signal as this new resource was immediately acquired and did not change availability for
// any waiting Acquire calls.
case <-ctx.Done(): case <-ctx.Done():
p.releaseAcquiredResource(res, res.lastUsedNano) p.releaseAcquiredResource(res, res.lastUsedNano)
} }
@@ -410,48 +422,23 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// we don't call signal here because we didn't change the resource pools
// at all so waking anything else up won't help
return res, nil return res, nil
} }
}
if ctx.Done() == nil {
p.cond.Wait()
} else {
// Convert p.cond.Wait into a channel
waitChan := make(chan struct{}, 1)
go func() {
p.cond.Wait()
waitChan <- struct{}{}
}()
select {
case <-ctx.Done():
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
// do anything with it. Another goroutine might be waiting.
go func() {
<-waitChan
p.cond.L.Unlock()
p.cond.Signal()
}()
p.canceledAcquireCount.Add(1)
return nil, ctx.Err()
case <-waitChan:
}
}
}
} }
// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no // TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only // resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
// used to cancel the background creation. // used to cancel the background creation.
func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
p.cond.L.Lock() if !p.acquireSem.TryAcquire(1) {
defer p.cond.L.Unlock() return nil, ErrNotAvailable
}
p.mux.Lock()
defer p.mux.Unlock()
if p.closed { if p.closed {
p.acquireSem.Release(1)
return nil, ErrClosedPool return nil, ErrClosedPool
} }
@@ -462,14 +449,19 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
return res, nil return res, nil
} }
if len(p.allResources) < int(p.maxSize) { if len(p.allResources) >= int(p.maxSize) {
res := p.createNewResource() panic("bug: semaphore allowed more acquires than pool allows")
}
res := p.createNewResource()
go func() { go func() {
value, err := p.constructResourceValue(ctx) value, err := p.constructResourceValue(ctx)
defer p.cond.Signal() // We have to create the resource and only then release the
p.cond.L.Lock() // semaphore - For the time being there is no resource that
defer p.cond.L.Unlock() // someone could acquire.
defer p.acquireSem.Release(1)
p.mux.Lock()
defer p.mux.Unlock()
if err != nil { if err != nil {
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
@@ -481,7 +473,6 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
res.status = resourceStatusIdle res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res) p.idleResources = append(p.idleResources, res)
}() }()
}
return nil, ErrNotAvailable return nil, ErrNotAvailable
} }
@@ -490,19 +481,41 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
// use is for health check and keep-alive functionality. It does not update pool // use is for health check and keep-alive functionality. It does not update pool
// statistics. // statistics.
func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
p.cond.L.Lock() var cnt int
if p.closed { for p.acquireSem.TryAcquire(1) {
p.cond.L.Unlock() cnt++
}
if cnt == 0 {
return nil return nil
} }
for _, res := range p.idleResources { resources := make([]*Resource[T], 0, cnt)
p.mux.Lock()
defer p.mux.Unlock()
if p.closed {
p.acquireSem.Release(int64(cnt))
return nil
}
// Some resources from the maxSize limit do not have to exist (i.e. be
// idle).
if diff := cnt - len(p.idleResources); diff > 0 {
p.acquireSem.Release(int64(diff))
cnt = len(p.idleResources)
}
// Resources above cnt might be reserved by the semaphore.
for i := len(p.idleResources) - cnt; i < len(p.idleResources); i++ {
res := p.idleResources[i]
p.idleResources[i] = nil // Avoid memory leak.
res.status = resourceStatusAcquired res.status = resourceStatusAcquired
resources = append(resources, res)
} }
resources := p.idleResources // Swap out current slice p.idleResources = p.idleResources[:len(p.idleResources)-cnt]
p.idleResources = nil
p.cond.L.Unlock()
return resources return resources
} }
@@ -510,13 +523,13 @@ func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
// It goes straight in the IdlePool. It does not check against maxSize. // It goes straight in the IdlePool. It does not check against maxSize.
// It can be useful to maintain warm resources under little load. // It can be useful to maintain warm resources under little load.
func (p *Pool[T]) CreateResource(ctx context.Context) error { func (p *Pool[T]) CreateResource(ctx context.Context) error {
p.cond.L.Lock() p.mux.Lock()
if p.closed { if p.closed {
p.cond.L.Unlock() p.mux.Unlock()
return ErrClosedPool return ErrClosedPool
} }
p.destructWG.Add(1) p.destructWG.Add(1)
p.cond.L.Unlock() p.mux.Unlock()
value, err := p.constructResourceValue(ctx) value, err := p.constructResourceValue(ctx)
if err != nil { if err != nil {
@@ -533,8 +546,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
poolResetCount: p.resetCount, poolResetCount: p.resetCount,
} }
p.cond.L.Lock() p.mux.Lock()
defer p.cond.L.Unlock() defer p.mux.Unlock()
// If closed while constructing resource then destroy it and return an error // If closed while constructing resource then destroy it and return an error
if p.closed { if p.closed {
@@ -553,8 +566,8 @@ func (p *Pool[T]) CreateResource(ctx context.Context) error {
// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned // It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
// to the pool. // to the pool.
func (p *Pool[T]) Reset() { func (p *Pool[T]) Reset() {
p.cond.L.Lock() p.mux.Lock()
defer p.cond.L.Unlock() defer p.mux.Unlock()
p.resetCount++ p.resetCount++
@@ -568,7 +581,9 @@ func (p *Pool[T]) Reset() {
// releaseAcquiredResource returns res to the the pool. // releaseAcquiredResource returns res to the the pool.
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
p.cond.L.Lock() defer p.acquireSem.Release(1)
p.mux.Lock()
defer p.mux.Unlock()
if p.closed || res.poolResetCount != p.resetCount { if p.closed || res.poolResetCount != p.resetCount {
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
@@ -578,30 +593,26 @@ func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64)
res.status = resourceStatusIdle res.status = resourceStatusIdle
p.idleResources = append(p.idleResources, res) p.idleResources = append(p.idleResources, res)
} }
p.cond.L.Unlock()
p.cond.Signal()
} }
// Remove removes res from the pool and closes it. If res is not part of the // Remove removes res from the pool and closes it. If res is not part of the
// pool Remove will panic. // pool Remove will panic.
func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
p.destructResourceValue(res.value) p.destructResourceValue(res.value)
p.cond.L.Lock() defer p.acquireSem.Release(1)
p.mux.Lock()
defer p.mux.Unlock()
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
p.cond.L.Unlock()
p.cond.Signal()
} }
func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
p.cond.L.Lock() defer p.acquireSem.Release(1)
p.mux.Lock()
defer p.mux.Unlock()
p.allResources = removeResource(p.allResources, res) p.allResources = removeResource(p.allResources, res)
res.status = resourceStatusHijacked res.status = resourceStatusHijacked
p.destructWG.Done() // not responsible for destructing hijacked resources p.destructWG.Done() // not responsible for destructing hijacked resources
p.cond.L.Unlock()
p.cond.Signal()
} }
func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] { func removeResource[T any](slice []*Resource[T], res *Resource[T]) []*Resource[T] {