diff --git a/pool.go b/pool.go index ed1d377..5606694 100644 --- a/pool.go +++ b/pool.go @@ -140,67 +140,64 @@ func (p *Pool) Acquire(ctx context.Context) (*Resource, error) { p.cond.L.Lock() - if p.closed { - p.cond.L.Unlock() - return nil, ErrClosedPool - } - - // If a resource is available now - if len(p.availableResources) > 0 { - rw := p.lockedAvailableAcquire() - p.cond.L.Unlock() - return rw, nil - } - - // If there is room to create a resource do so - if len(p.allResources) < p.maxSize { - res := &Resource{pool: p, status: resourceStatusCreating} - p.allResources = append(p.allResources, res) - p.cond.L.Unlock() - - value, err := p.createRes(ctx) - p.cond.L.Lock() - if err != nil { - p.allResources = removeResource(p.allResources, res) - p.cond.L.Unlock() - return nil, err - } - - res.value = value - res.status = resourceStatusBorrowed - p.cond.L.Unlock() - return res, nil - } - p.cond.L.Unlock() - - // Wait for a resource to be returned to the pool. - waitResChan := make(chan *Resource) - abortWaitResChan := make(chan struct{}) - go func() { - p.cond.L.Lock() - for len(p.availableResources) == 0 { - p.cond.Wait() - } + for { if p.closed { p.cond.L.Unlock() - return + return nil, ErrClosedPool } - rw := p.lockedAvailableAcquire() - p.cond.L.Unlock() + + // If a resource is available now + if len(p.availableResources) > 0 { + rw := p.lockedAvailableAcquire() + p.cond.L.Unlock() + return rw, nil + } + + // If there is room to create a resource do so + if len(p.allResources) < p.maxSize { + res := &Resource{pool: p, status: resourceStatusCreating} + p.allResources = append(p.allResources, res) + p.cond.L.Unlock() + + value, err := p.createRes(ctx) + p.cond.L.Lock() + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.cond.L.Unlock() + return nil, err + } + + res.value = value + res.status = resourceStatusBorrowed + p.cond.L.Unlock() + return res, nil + } + + // if ctx.Done() == nil { + // p.cond.Wait() + // } else { + + // Convert p.cond.Wait into a channel + waitChan := make(chan struct{}, 1) + go func() { + p.cond.Wait() + waitChan <- struct{}{} + }() select { - case <-abortWaitResChan: - p.releaseBorrowedResource(rw) - case waitResChan <- rw: - } - }() + case <-ctx.Done(): + // Allow goroutine waiting for signal to exit. Re-signal since we couldn't + // do anything with it. Another goroutine might be waiting. + go func() { + <-waitChan + p.cond.Signal() + p.cond.L.Unlock() + }() - select { - case <-ctx.Done(): - close(abortWaitResChan) - return nil, ctx.Err() - case rw := <-waitResChan: - return rw, nil + return nil, ctx.Err() + case <-waitChan: + } + // } } } @@ -240,6 +237,7 @@ func (p *Pool) destroyBorrowedResource(res *Resource) { p.cond.L.Lock() p.allResources = removeResource(p.allResources, res) p.cond.L.Unlock() + p.cond.Signal() // close the resource in the background go p.closeRes(res.value) diff --git a/pool_test.go b/pool_test.go index 22c0aca..bfdb8cc 100644 --- a/pool_test.go +++ b/pool_test.go @@ -264,41 +264,29 @@ func BenchmarkPoolAcquireAndRelease(b *testing.B) { poolSize int clientCount int }{ - {8, 1}, - {8, 4}, + {8, 2}, {8, 8}, - {8, 16}, {8, 32}, - {8, 64}, {8, 128}, - {8, 256}, {8, 512}, - {8, 1024}, {8, 2048}, + {8, 8192}, - {64, 1}, - {64, 4}, + {64, 2}, {64, 8}, - {64, 16}, {64, 32}, - {64, 64}, {64, 128}, - {64, 256}, {64, 512}, - {64, 1024}, {64, 2048}, + {64, 8192}, - {512, 1}, - {512, 4}, + {512, 2}, {512, 8}, - {512, 16}, {512, 32}, - {512, 64}, {512, 128}, - {512, 256}, {512, 512}, - {512, 1024}, {512, 2048}, + {512, 8192}, } for _, bm := range benchmarks {