Resolve some potential deadlocks
Tweak benchmarks
This commit is contained in:
@@ -140,67 +140,64 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
|
|||||||
|
|
||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
|
|
||||||
if p.closed {
|
for {
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, ErrClosedPool
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a resource is available now
|
|
||||||
if len(p.availableResources) > 0 {
|
|
||||||
rw := p.lockedAvailableAcquire()
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return rw, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is room to create a resource do so
|
|
||||||
if len(p.allResources) < p.maxSize {
|
|
||||||
res := &Resource{pool: p, status: resourceStatusCreating}
|
|
||||||
p.allResources = append(p.allResources, res)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
value, err := p.createRes(ctx)
|
|
||||||
p.cond.L.Lock()
|
|
||||||
if err != nil {
|
|
||||||
p.allResources = removeResource(p.allResources, res)
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res.value = value
|
|
||||||
res.status = resourceStatusBorrowed
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
p.cond.L.Unlock()
|
|
||||||
|
|
||||||
// Wait for a resource to be returned to the pool.
|
|
||||||
waitResChan := make(chan *Resource)
|
|
||||||
abortWaitResChan := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
p.cond.L.Lock()
|
|
||||||
for len(p.availableResources) == 0 {
|
|
||||||
p.cond.Wait()
|
|
||||||
}
|
|
||||||
if p.closed {
|
if p.closed {
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
return
|
return nil, ErrClosedPool
|
||||||
}
|
}
|
||||||
rw := p.lockedAvailableAcquire()
|
|
||||||
p.cond.L.Unlock()
|
// If a resource is available now
|
||||||
|
if len(p.availableResources) > 0 {
|
||||||
|
rw := p.lockedAvailableAcquire()
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return rw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there is room to create a resource do so
|
||||||
|
if len(p.allResources) < p.maxSize {
|
||||||
|
res := &Resource{pool: p, status: resourceStatusCreating}
|
||||||
|
p.allResources = append(p.allResources, res)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
|
||||||
|
value, err := p.createRes(ctx)
|
||||||
|
p.cond.L.Lock()
|
||||||
|
if err != nil {
|
||||||
|
p.allResources = removeResource(p.allResources, res)
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res.value = value
|
||||||
|
res.status = resourceStatusBorrowed
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if ctx.Done() == nil {
|
||||||
|
// p.cond.Wait()
|
||||||
|
// } else {
|
||||||
|
|
||||||
|
// Convert p.cond.Wait into a channel
|
||||||
|
waitChan := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
p.cond.Wait()
|
||||||
|
waitChan <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-abortWaitResChan:
|
case <-ctx.Done():
|
||||||
p.releaseBorrowedResource(rw)
|
// Allow goroutine waiting for signal to exit. Re-signal since we couldn't
|
||||||
case waitResChan <- rw:
|
// do anything with it. Another goroutine might be waiting.
|
||||||
}
|
go func() {
|
||||||
}()
|
<-waitChan
|
||||||
|
p.cond.Signal()
|
||||||
|
p.cond.L.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
return nil, ctx.Err()
|
||||||
case <-ctx.Done():
|
case <-waitChan:
|
||||||
close(abortWaitResChan)
|
}
|
||||||
return nil, ctx.Err()
|
// }
|
||||||
case rw := <-waitResChan:
|
|
||||||
return rw, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -240,6 +237,7 @@ func (p *Pool) destroyBorrowedResource(res *Resource) {
|
|||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
p.allResources = removeResource(p.allResources, res)
|
p.allResources = removeResource(p.allResources, res)
|
||||||
p.cond.L.Unlock()
|
p.cond.L.Unlock()
|
||||||
|
p.cond.Signal()
|
||||||
|
|
||||||
// close the resource in the background
|
// close the resource in the background
|
||||||
go p.closeRes(res.value)
|
go p.closeRes(res.value)
|
||||||
|
|||||||
+6
-18
@@ -264,41 +264,29 @@ func BenchmarkPoolAcquireAndRelease(b *testing.B) {
|
|||||||
poolSize int
|
poolSize int
|
||||||
clientCount int
|
clientCount int
|
||||||
}{
|
}{
|
||||||
{8, 1},
|
{8, 2},
|
||||||
{8, 4},
|
|
||||||
{8, 8},
|
{8, 8},
|
||||||
{8, 16},
|
|
||||||
{8, 32},
|
{8, 32},
|
||||||
{8, 64},
|
|
||||||
{8, 128},
|
{8, 128},
|
||||||
{8, 256},
|
|
||||||
{8, 512},
|
{8, 512},
|
||||||
{8, 1024},
|
|
||||||
{8, 2048},
|
{8, 2048},
|
||||||
|
{8, 8192},
|
||||||
|
|
||||||
{64, 1},
|
{64, 2},
|
||||||
{64, 4},
|
|
||||||
{64, 8},
|
{64, 8},
|
||||||
{64, 16},
|
|
||||||
{64, 32},
|
{64, 32},
|
||||||
{64, 64},
|
|
||||||
{64, 128},
|
{64, 128},
|
||||||
{64, 256},
|
|
||||||
{64, 512},
|
{64, 512},
|
||||||
{64, 1024},
|
|
||||||
{64, 2048},
|
{64, 2048},
|
||||||
|
{64, 8192},
|
||||||
|
|
||||||
{512, 1},
|
{512, 2},
|
||||||
{512, 4},
|
|
||||||
{512, 8},
|
{512, 8},
|
||||||
{512, 16},
|
|
||||||
{512, 32},
|
{512, 32},
|
||||||
{512, 64},
|
|
||||||
{512, 128},
|
{512, 128},
|
||||||
{512, 256},
|
|
||||||
{512, 512},
|
{512, 512},
|
||||||
{512, 1024},
|
|
||||||
{512, 2048},
|
{512, 2048},
|
||||||
|
{512, 8192},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, bm := range benchmarks {
|
for _, bm := range benchmarks {
|
||||||
|
|||||||
Reference in New Issue
Block a user