From 48ea620c93c3819e98633204e1bdcb96804c28a1 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 27 Apr 2019 08:31:23 -0500 Subject: [PATCH] Add Pool.BeforeAcquire and ConnectConfig --- pool/pool.go | 48 ++++++++++++++++++++++++++++++++++++----------- pool/pool_test.go | 41 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 11 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index afbaf98a..d90f8e26 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -14,12 +14,19 @@ import ( const defaultMaxConns = 5 type Pool struct { - p *puddle.Pool + p *puddle.Pool + beforeAcquire func(*pgx.Conn) bool } type Config struct { - MaxConns int32 ConnConfig *pgx.ConnConfig + + // BeforeAcquire is called before before a connection is acquired from the pool. It must return true to allow the + // acquision or false to indicate that the connection should be destroyed and a different connection should be + // acquired. + BeforeAcquire func(*pgx.Conn) bool + + MaxConns int32 } // Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial @@ -30,7 +37,15 @@ func Connect(ctx context.Context, connString string) (*Pool, error) { return nil, err } - p := &Pool{} + return ConnectConfig(ctx, config) +} + +// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial +// connection. +func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { + p := &Pool{ + beforeAcquire: config.BeforeAcquire, + } p.p = puddle.NewPool( func(ctx context.Context) (interface{}, error) { return pgx.ConnectConfig(ctx, config.ConnConfig) }, @@ -80,22 +95,33 @@ func (p *Pool) Close() { } func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { - res, err := p.p.Acquire(ctx) - if err != nil { - return nil, err - } + for { + res, err := p.p.Acquire(ctx) + if err != nil { + return nil, err + } - return &Conn{res: res}, nil + if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) { + return &Conn{res: res}, nil + } + + res.Destroy() + } } // AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and // keep-alive functionality. It does not update pool statistics. func (p *Pool) AcquireAllIdle() []*Conn { resources := p.p.AcquireAllIdle() - conns := make([]*Conn, len(resources)) - for i := range conns { - conns[i] = &Conn{res: resources[i]} + conns := make([]*Conn, 0, len(resources)) + for _, res := range resources { + if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) { + conns = append(conns, &Conn{res: res}) + } else { + res.Destroy() + } } + return conns } diff --git a/pool/pool_test.go b/pool/pool_test.go index b026b2ee..45464c18 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -51,6 +51,47 @@ func TestPoolAcquireAndConnRelease(t *testing.T) { c.Release() } +func TestPoolBeforeAcquire(t *testing.T) { + t.Parallel() + + config, err := pool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + acquireAttempts := 0 + + config.BeforeAcquire = func(c *pgx.Conn) bool { + acquireAttempts += 1 + return acquireAttempts%2 == 0 + } + + db, err := pool.ConnectConfig(context.Background(), config) + require.NoError(t, err) + defer db.Close() + + conns := make([]*pool.Conn, 4) + for i := range conns { + conns[i], err = db.Acquire(context.Background()) + assert.NoError(t, err) + } + + for _, c := range conns { + c.Release() + } + waitForReleaseToComplete() + + assert.EqualValues(t, 8, acquireAttempts) + + conns = db.AcquireAllIdle() + assert.Len(t, conns, 2) + + for _, c := range conns { + c.Release() + } + waitForReleaseToComplete() + + assert.EqualValues(t, 12, acquireAttempts) +} + func TestPoolAcquireAllIdle(t *testing.T) { t.Parallel()