From ad59a9263d043ed1523c6a71e533f4d1ae505b6a Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 22 Dec 2018 22:53:58 -0600 Subject: [PATCH] Add background error handling --- README.md | 1 - pool.go | 122 +++++++++++++++++++++++++++++++++++++-------------- pool_test.go | 97 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 38c3272..1decc0f 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ Puddle is a generic resource pool library for Go. ## TODO -* Error reporting for async errors * Min pool size * Max resource lifetime * Max resource uses diff --git a/pool.go b/pool.go index 8295db2..6762123 100644 --- a/pool.go +++ b/pool.go @@ -21,6 +21,11 @@ var ErrClosedPool = errors.New("cannot get from closed pool") type CreateFunc func() (res interface{}, err error) type CloseFunc func(res interface{}) (err error) +// BackgroundErrorHandler is the type of function that handles background +// errors. It may be called while the pool is locked. Therefore it must not call +// any pool methods and should not perform any lengthy operations. +type BackgroundErrorHandler func(err error) + type resourceWrapper struct { resource interface{} status byte @@ -35,17 +40,19 @@ type Pool struct { maxSize int closed bool - create CreateFunc - closeRes CloseFunc + create CreateFunc + closeRes CloseFunc + backgroundErrorHandler BackgroundErrorHandler } func NewPool(create CreateFunc, closeRes CloseFunc) *Pool { return &Pool{ - cond: sync.NewCond(new(sync.Mutex)), - allResources: make(map[interface{}]*resourceWrapper), - maxSize: maxInt, - create: create, - closeRes: closeRes, + cond: sync.NewCond(new(sync.Mutex)), + allResources: make(map[interface{}]*resourceWrapper), + maxSize: maxInt, + create: create, + closeRes: closeRes, + backgroundErrorHandler: func(error) {}, } } @@ -56,11 +63,17 @@ func (p *Pool) Close() { p.closed = true for _, rw := range p.availableResources { - p.closeRes(rw.resource) - // TODO - something with error + err := p.closeRes(rw.resource) + if err != nil { + p.backgroundErrorHandler(err) + } delete(p.allResources, rw.resource) } + p.availableResources = nil p.cond.L.Unlock() + + // Wake up all go routines waiting for a resource to be returned so they can terminate. + p.cond.Broadcast() } // Size returns the current size of the pool. @@ -89,6 +102,17 @@ func (p *Pool) SetMaxSize(n int) { p.cond.L.Unlock() } +// SetBackgroundErrorHandler assigns a handler for errors that have no other +// place to be reported. For example, Get is called when no resources are +// available. Get begins creating a new resource (in a goroutine). Before the +// new resource is completed, the context passed to Get is canceled. Then the +// new resource creation fails. f will be called with that error. +func (p *Pool) SetBackgroundErrorHandler(f BackgroundErrorHandler) { + p.cond.L.Lock() + p.backgroundErrorHandler = f + p.cond.L.Unlock() +} + // Get gets a resource from the pool. If no resources are available and the pool // is not at maximum capacity it will create a new resource. If the pool is at // maximum capacity it will block until a resource is available. ctx can be used @@ -117,43 +141,63 @@ func (p *Pool) Get(ctx context.Context) (interface{}, error) { } // If there is room to create a resource start the process asynchronously - var errChan chan error + var createResChan chan interface{} + var createErrChan chan error if len(p.allResources) < p.maxSize { - errChan = p.startCreate() + createResChan, createErrChan = p.startCreate() } p.cond.L.Unlock() - // Whether or not we started creating a resource all we can do now is wait. - resChan := make(chan interface{}) - abortChan := make(chan struct{}) - + // Wait for a resource to be returned to the pool. + waitResChan := make(chan interface{}) + abortWaitResChan := make(chan struct{}) go func() { p.cond.L.Lock() for len(p.availableResources) == 0 { p.cond.Wait() } + if p.closed { + p.cond.L.Unlock() + return + } res := p.lockedAvailableGet() p.cond.L.Unlock() select { - case <-abortChan: + case <-abortWaitResChan: p.Return(res) - case resChan <- res: + case waitResChan <- res: } }() select { case <-ctx.Done(): - close(abortChan) + close(abortWaitResChan) + p.backgroundFinishCreate(createResChan, createErrChan) return nil, ctx.Err() - case err := <-errChan: - close(abortChan) + case err := <-createErrChan: + close(abortWaitResChan) return nil, err - case res := <-resChan: + case res := <-createResChan: + close(abortWaitResChan) + return res, nil + case res := <-waitResChan: + p.backgroundFinishCreate(createResChan, createErrChan) return res, nil } } +// func (p *Pool) backgroundReportError(errChan chan error) { +// go func() { +// err := <-errChan +// if err != nil { +// p.cond.L.Lock() +// p.backgroundErrorHandler(err) +// p.cond.L.Unlock() +// } +// }() +// } + // lockedAvailableGet gets the top resource from p.availableResources. p.cond.L // must already be locked. len(p.availableResources) must be > 0. func (p *Pool) lockedAvailableGet() interface{} { @@ -167,11 +211,11 @@ func (p *Pool) lockedAvailableGet() interface{} { } // startCreate starts creating a new resource. p.cond.L must already be -// locked. The returned error channel will receive any error returned by create. -func (p *Pool) startCreate() chan error { - // Use a buffered errChan to receive the error so the goroutine doesn't leak if - // the error channel is never read. - errChan := make(chan error, 1) +// locked. The newly created resource will be sent on resChan (already checked +// out) or an error will be sent on errChan. +func (p *Pool) startCreate() (resChan chan interface{}, errChan chan error) { + resChan = make(chan interface{}) + errChan = make(chan error) var localVal int placeholder := &localVal @@ -187,14 +231,26 @@ func (p *Pool) startCreate() chan error { return } - rw := &resourceWrapper{resource: res, status: resourceStatusAvailable} + rw := &resourceWrapper{resource: res, status: resourceStatusBorrowed} p.allResources[res] = rw - p.availableResources = append(p.availableResources, rw) p.cond.L.Unlock() - p.cond.Signal() + resChan <- res }() - return errChan + return resChan, errChan +} + +func (p *Pool) backgroundFinishCreate(resChan chan interface{}, errChan chan error) { + go func() { + select { + case res := <-resChan: + p.Return(res) + case err := <-errChan: + p.cond.L.Lock() + p.backgroundErrorHandler(err) + p.cond.L.Unlock() + } + }() } // Return returns res to the the pool. If res is not part of the pool Return @@ -209,8 +265,10 @@ func (p *Pool) Return(res interface{}) { } if p.closed { - p.closeRes(rw.resource) - // TODO - something with error + err := p.closeRes(rw.resource) + if err != nil { + p.backgroundErrorHandler(err) + } delete(p.allResources, rw.resource) p.cond.L.Unlock() return diff --git a/pool_test.go b/pool_test.go index 3912850..2fa1e6c 100644 --- a/pool_test.go +++ b/pool_test.go @@ -226,6 +226,103 @@ func TestPoolGetReturnsErrorWhenPoolIsClosed(t *testing.T) { assert.Nil(t, res) } +func TestPoolGetLateFailedCreateErrorIsReported(t *testing.T) { + errCreateStartedChan := make(chan struct{}) + createWaitChan := make(chan struct{}) + errCreateFailed := errors.New("create failed") + var createCalls Counter + createFunc := func() (interface{}, error) { + n := createCalls.Next() + if n == 1 { + return n, nil + } + close(errCreateStartedChan) + <-createWaitChan + return nil, errCreateFailed + } + pool := puddle.NewPool(createFunc, stubCloseRes) + + asyncErrChan := make(chan error) + pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err }) + + res1, err := pool.Get(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, res1) + + go func() { + <-errCreateStartedChan + pool.Return(res1) + }() + + res, err := pool.Get(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, res) + close(createWaitChan) + + select { + case err = <-asyncErrChan: + assert.Equal(t, errCreateFailed, err) + case <-time.NewTimer(time.Second).C: + t.Fatal("timed out waiting for async error") + } +} + +func TestPoolCloseResourceCloseErrorIsReported(t *testing.T) { + var createCalls Counter + createFunc := func() (interface{}, error) { + return createCalls.Next(), nil + } + errCloseFailed := errors.New("close failed") + closeFunc := func(res interface{}) error { return errCloseFailed } + pool := puddle.NewPool(createFunc, closeFunc) + asyncErrChan := make(chan error, 1) + pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err }) + + // Get and return a resource to put something in the pool + res, err := pool.Get(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, res) + pool.Return(res) + + pool.Close() + + select { + case err = <-asyncErrChan: + assert.Equal(t, errCloseFailed, err) + default: + t.Fatal("error not reported") + } +} + +func TestPoolReturnClosesResourcePoolIsAlreadyClosedErrorIsReported(t *testing.T) { + var createCalls Counter + createFunc := func() (interface{}, error) { + return createCalls.Next(), nil + } + + errCloseFailed := errors.New("close failed") + closeFunc := func(res interface{}) error { return errCloseFailed } + pool := puddle.NewPool(createFunc, closeFunc) + + asyncErrChan := make(chan error, 1) + pool.SetBackgroundErrorHandler(func(err error) { asyncErrChan <- err }) + + // Get and return a resource to put something in the pool + res, err := pool.Get(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, res) + + pool.Close() + + pool.Return(res) + select { + case err = <-asyncErrChan: + assert.Equal(t, errCloseFailed, err) + default: + t.Fatal("error not reported") + } +} + func BenchmarkPoolGetAndReturnNoContention(b *testing.B) { var createCalls Counter createFunc := func() (interface{}, error) {