2
0

Add background error handling

This commit is contained in:
Jack Christensen
2018-12-22 22:53:58 -06:00
parent 45336ddb3f
commit ad59a9263d
3 changed files with 187 additions and 33 deletions
-1
View File
@@ -4,7 +4,6 @@ Puddle is a generic resource pool library for Go.
## TODO
* Error reporting for async errors
* Min pool size
* Max resource lifetime
* Max resource uses
+90 -32
View File
@@ -21,6 +21,11 @@ var ErrClosedPool = errors.New("cannot get from closed pool")
type CreateFunc func() (res interface{}, err error)
type CloseFunc func(res interface{}) (err error)
// BackgroundErrorHandler is the type of function that handles background
// errors. It may be called while the pool is locked. Therefore it must not call
// any pool methods and should not perform any lengthy operations.
type BackgroundErrorHandler func(err error)
type resourceWrapper struct {
resource interface{}
status byte
@@ -35,17 +40,19 @@ type Pool struct {
maxSize int
closed bool
create CreateFunc
closeRes CloseFunc
create CreateFunc
closeRes CloseFunc
backgroundErrorHandler BackgroundErrorHandler
}
func NewPool(create CreateFunc, closeRes CloseFunc) *Pool {
return &Pool{
cond: sync.NewCond(new(sync.Mutex)),
allResources: make(map[interface{}]*resourceWrapper),
maxSize: maxInt,
create: create,
closeRes: closeRes,
cond: sync.NewCond(new(sync.Mutex)),
allResources: make(map[interface{}]*resourceWrapper),
maxSize: maxInt,
create: create,
closeRes: closeRes,
backgroundErrorHandler: func(error) {},
}
}
@@ -56,11 +63,17 @@ func (p *Pool) Close() {
p.closed = true
for _, rw := range p.availableResources {
p.closeRes(rw.resource)
// TODO - something with error
err := p.closeRes(rw.resource)
if err != nil {
p.backgroundErrorHandler(err)
}
delete(p.allResources, rw.resource)
}
p.availableResources = nil
p.cond.L.Unlock()
// Wake up all go routines waiting for a resource to be returned so they can terminate.
p.cond.Broadcast()
}
// Size returns the current size of the pool.
@@ -89,6 +102,17 @@ func (p *Pool) SetMaxSize(n int) {
p.cond.L.Unlock()
}
// SetBackgroundErrorHandler assigns a handler for errors that have no other
// place to be reported. For example, Get is called when no resources are
// available. Get begins creating a new resource (in a goroutine). Before the
// new resource is completed, the context passed to Get is canceled. Then the
// new resource creation fails. f will be called with that error.
func (p *Pool) SetBackgroundErrorHandler(f BackgroundErrorHandler) {
p.cond.L.Lock()
p.backgroundErrorHandler = f
p.cond.L.Unlock()
}
// Get gets a resource from the pool. If no resources are available and the pool
// is not at maximum capacity it will create a new resource. If the pool is at
// maximum capacity it will block until a resource is available. ctx can be used
@@ -117,43 +141,63 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) {
}
// If there is room to create a resource start the process asynchronously
var errChan chan error
var createResChan chan interface{}
var createErrChan chan error
if len(p.allResources) < p.maxSize {
errChan = p.startCreate()
createResChan, createErrChan = p.startCreate()
}
p.cond.L.Unlock()
// Whether or not we started creating a resource all we can do now is wait.
resChan := make(chan interface{})
abortChan := make(chan struct{})
// Wait for a resource to be returned to the pool.
waitResChan := make(chan interface{})
abortWaitResChan := make(chan struct{})
go func() {
p.cond.L.Lock()
for len(p.availableResources) == 0 {
p.cond.Wait()
}
if p.closed {
p.cond.L.Unlock()
return
}
res := p.lockedAvailableGet()
p.cond.L.Unlock()
select {
case <-abortChan:
case <-abortWaitResChan:
p.Return(res)
case resChan <- res:
case waitResChan <- res:
}
}()
select {
case <-ctx.Done():
close(abortChan)
close(abortWaitResChan)
p.backgroundFinishCreate(createResChan, createErrChan)
return nil, ctx.Err()
case err := <-errChan:
close(abortChan)
case err := <-createErrChan:
close(abortWaitResChan)
return nil, err
case res := <-resChan:
case res := <-createResChan:
close(abortWaitResChan)
return res, nil
case res := <-waitResChan:
p.backgroundFinishCreate(createResChan, createErrChan)
return res, nil
}
}
// func (p *Pool) backgroundReportError(errChan chan error) {
// go func() {
// err := <-errChan
// if err != nil {
// p.cond.L.Lock()
// p.backgroundErrorHandler(err)
// p.cond.L.Unlock()
// }
// }()
// }
// lockedAvailableGet gets the top resource from p.availableResources. p.cond.L
// must already be locked. len(p.availableResources) must be > 0.
func (p *Pool) lockedAvailableGet() interface{} {
@@ -167,11 +211,11 @@ func (p *Pool) lockedAvailableGet() interface{} {
}
// startCreate starts creating a new resource. p.cond.L must already be
// locked. The returned error channel will receive any error returned by create.
func (p *Pool) startCreate() chan error {
// Use a buffered errChan to receive the error so the goroutine doesn't leak if
// the error channel is never read.
errChan := make(chan error, 1)
// locked. The newly created resource will be sent on resChan (already checked
// out) or an error will be sent on errChan.
func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) {
resChan = make(chan interface{})
errChan = make(chan error)
var localVal int
placeholder := &localVal
@@ -187,14 +231,26 @@ func (p *Pool) startCreate() chan error {
return
}
rw := &resourceWrapper{resource: res, status: resourceStatusAvailable}
rw := &resourceWrapper{resource: res, status: resourceStatusBorrowed}
p.allResources[res] = rw
p.availableResources = append(p.availableResources, rw)
p.cond.L.Unlock()
p.cond.Signal()
resChan <- res
}()
return errChan
return resChan, errChan
}
func (p *Pool) backgroundFinishCreate(resChan chan interface{}, errChan chan error) {
go func() {
select {
case res := <-resChan:
p.Return(res)
case err := <-errChan:
p.cond.L.Lock()
p.backgroundErrorHandler(err)
p.cond.L.Unlock()
}
}()
}
// Return returns res to the the pool. If res is not part of the pool Return
@@ -209,8 +265,10 @@ func (p *Pool) Return(res interface{}) {
}
if p.closed {
p.closeRes(rw.resource)
// TODO - something with error
err := p.closeRes(rw.resource)
if err != nil {
p.backgroundErrorHandler(err)
}
delete(p.allResources, rw.resource)
p.cond.L.Unlock()
return
+97
View File
@@ -226,6 +226,103 @@ func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) {
assert.Nil(t, res)
}
func TestPoolGetLateFailedCreateErrorIsReported(t *testing.T) {
errCreateStartedChan := make(chan struct{})
createWaitChan := make(chan struct{})
errCreateFailed := errors.New("create failed")
var createCalls Counter
createFunc := func() (interface{}, error) {
n := createCalls.Next()
if n == 1 {
return n, nil
}
close(errCreateStartedChan)
<-createWaitChan
return nil, errCreateFailed
}
pool := puddle.NewPool(createFunc, stubCloseRes)
asyncErrChan := make(chan error)
pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err })
res1, err := pool.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, res1)
go func() {
<-errCreateStartedChan
pool.Return(res1)
}()
res, err := pool.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, res)
close(createWaitChan)
select {
case err = <-asyncErrChan:
assert.Equal(t, errCreateFailed, err)
case <-time.NewTimer(time.Second).C:
t.Fatal("timed out waiting for async error")
}
}
func TestPoolCloseResourceCloseErrorIsReported(t *testing.T) {
var createCalls Counter
createFunc := func() (interface{}, error) {
return createCalls.Next(), nil
}
errCloseFailed := errors.New("close failed")
closeFunc := func(res interface{}) error { return errCloseFailed }
pool := puddle.NewPool(createFunc, closeFunc)
asyncErrChan := make(chan error, 1)
pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err })
// Get and return a resource to put something in the pool
res, err := pool.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, res)
pool.Return(res)
pool.Close()
select {
case err = <-asyncErrChan:
assert.Equal(t, errCloseFailed, err)
default:
t.Fatal("error not reported")
}
}
func TestPoolReturnClosesResourcePoolIsAlreadyClosedErrorIsReported(t *testing.T) {
var createCalls Counter
createFunc := func() (interface{}, error) {
return createCalls.Next(), nil
}
errCloseFailed := errors.New("close failed")
closeFunc := func(res interface{}) error { return errCloseFailed }
pool := puddle.NewPool(createFunc, closeFunc)
asyncErrChan := make(chan error, 1)
pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err })
// Get and return a resource to put something in the pool
res, err := pool.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, res)
pool.Close()
pool.Return(res)
select {
case err = <-asyncErrChan:
assert.Equal(t, errCloseFailed, err)
default:
t.Fatal("error not reported")
}
}
func BenchmarkPoolGetAndReturnNoContention(b *testing.B) {
var createCalls Counter
createFunc := func() (interface{}, error) {