Add Pool.BeforeAcquire and ConnectConfig
This commit is contained in:
+37
-11
@@ -14,12 +14,19 @@ import (
|
|||||||
const defaultMaxConns = 5
|
const defaultMaxConns = 5
|
||||||
|
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
p *puddle.Pool
|
p *puddle.Pool
|
||||||
|
beforeAcquire func(*pgx.Conn) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
MaxConns int32
|
|
||||||
ConnConfig *pgx.ConnConfig
|
ConnConfig *pgx.ConnConfig
|
||||||
|
|
||||||
|
// BeforeAcquire is called before before a connection is acquired from the pool. It must return true to allow the
|
||||||
|
// acquision or false to indicate that the connection should be destroyed and a different connection should be
|
||||||
|
// acquired.
|
||||||
|
BeforeAcquire func(*pgx.Conn) bool
|
||||||
|
|
||||||
|
MaxConns int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
||||||
@@ -30,7 +37,15 @@ func Connect(ctx context.Context, connString string) (*Pool, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &Pool{}
|
return ConnectConfig(ctx, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
||||||
|
// connection.
|
||||||
|
func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||||
|
p := &Pool{
|
||||||
|
beforeAcquire: config.BeforeAcquire,
|
||||||
|
}
|
||||||
|
|
||||||
p.p = puddle.NewPool(
|
p.p = puddle.NewPool(
|
||||||
func(ctx context.Context) (interface{}, error) { return pgx.ConnectConfig(ctx, config.ConnConfig) },
|
func(ctx context.Context) (interface{}, error) { return pgx.ConnectConfig(ctx, config.ConnConfig) },
|
||||||
@@ -80,22 +95,33 @@ func (p *Pool) Close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
||||||
res, err := p.p.Acquire(ctx)
|
for {
|
||||||
if err != nil {
|
res, err := p.p.Acquire(ctx)
|
||||||
return nil, err
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &Conn{res: res}, nil
|
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||||
|
return &Conn{res: res}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
res.Destroy()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
|
// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
|
||||||
// keep-alive functionality. It does not update pool statistics.
|
// keep-alive functionality. It does not update pool statistics.
|
||||||
func (p *Pool) AcquireAllIdle() []*Conn {
|
func (p *Pool) AcquireAllIdle() []*Conn {
|
||||||
resources := p.p.AcquireAllIdle()
|
resources := p.p.AcquireAllIdle()
|
||||||
conns := make([]*Conn, len(resources))
|
conns := make([]*Conn, 0, len(resources))
|
||||||
for i := range conns {
|
for _, res := range resources {
|
||||||
conns[i] = &Conn{res: resources[i]}
|
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||||
|
conns = append(conns, &Conn{res: res})
|
||||||
|
} else {
|
||||||
|
res.Destroy()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return conns
|
return conns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,47 @@ func TestPoolAcquireAndConnRelease(t *testing.T) {
|
|||||||
c.Release()
|
c.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolBeforeAcquire(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
config, err := pool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
acquireAttempts := 0
|
||||||
|
|
||||||
|
config.BeforeAcquire = func(c *pgx.Conn) bool {
|
||||||
|
acquireAttempts += 1
|
||||||
|
return acquireAttempts%2 == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := pool.ConnectConfig(context.Background(), config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
conns := make([]*pool.Conn, 4)
|
||||||
|
for i := range conns {
|
||||||
|
conns[i], err = db.Acquire(context.Background())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range conns {
|
||||||
|
c.Release()
|
||||||
|
}
|
||||||
|
waitForReleaseToComplete()
|
||||||
|
|
||||||
|
assert.EqualValues(t, 8, acquireAttempts)
|
||||||
|
|
||||||
|
conns = db.AcquireAllIdle()
|
||||||
|
assert.Len(t, conns, 2)
|
||||||
|
|
||||||
|
for _, c := range conns {
|
||||||
|
c.Release()
|
||||||
|
}
|
||||||
|
waitForReleaseToComplete()
|
||||||
|
|
||||||
|
assert.EqualValues(t, 12, acquireAttempts)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolAcquireAllIdle(t *testing.T) {
|
func TestPoolAcquireAllIdle(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user