diff --git a/pgxpool/pool.go b/pgxpool/pool.go index f1a79c7f..28b1e1a2 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -225,6 +225,10 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { go p.backgroundHealthCheck() if !config.LazyConnect { + if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { + return nil, err + } + // Initially establish one connection res, err := p.p.Acquire(ctx) if err != nil { @@ -377,6 +381,29 @@ func (p *Pool) checkMinConns() { } } +func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + errs := make(chan error) + + for i := 0; i < targetResources; i++ { + go func() { + err := p.p.CreateResource(ctx) + errs <- err + }() + } + + for i := 0; i < targetResources; i++ { + if err := <-errs; err != nil { + cancel() + return err + } + } + + return nil +} + // Acquire returns a connection (*Conn) from the Pool func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { for { diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index f6621df6..cdf59c24 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "sync/atomic" "testing" "time" @@ -800,3 +801,104 @@ func TestIdempotentPoolClose(t *testing.T) { // Close the already closed pool. require.NotPanics(t, func() { pool.Close() }) } + +func TestConnectCreatesMinPool(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.MinConns = int32(12) + config.MaxConns = int32(15) + config.LazyConnect = false + + acquireAttempts := int64(0) + connectAttempts := int64(0) + + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + atomic.AddInt64(&acquireAttempts, 1) + return true + } + config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error { + atomic.AddInt64(&connectAttempts, 1) + return nil + } + + pool, err := pgxpool.ConnectConfig(context.Background(), config) + require.NoError(t, err) + defer pool.Close() + + stat := pool.Stat() + require.Equal(t, int32(12), stat.IdleConns()) + require.Equal(t, int64(1), stat.AcquireCount()) + require.Equal(t, int32(12), stat.TotalConns()) + require.Equal(t, int64(0), acquireAttempts) + require.Equal(t, int64(12), connectAttempts) +} +func TestConnectSkipMinPoolWithLazy(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.MinConns = int32(12) + config.MaxConns = int32(15) + config.LazyConnect = true + + acquireAttempts := int64(0) + connectAttempts := int64(0) + + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + atomic.AddInt64(&acquireAttempts, 1) + return true + } + config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error { + atomic.AddInt64(&connectAttempts, 1) + return nil + } + + pool, err := pgxpool.ConnectConfig(context.Background(), config) + require.NoError(t, err) + defer pool.Close() + + stat := pool.Stat() + require.Equal(t, int32(0), stat.IdleConns()) + require.Equal(t, int64(0), stat.AcquireCount()) + require.Equal(t, int32(0), stat.TotalConns()) + require.Equal(t, int64(0), acquireAttempts) + require.Equal(t, int64(0), connectAttempts) +} + +func TestConnectMinPoolZero(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.MinConns = int32(0) + config.MaxConns = int32(15) + config.LazyConnect = false + + acquireAttempts := int64(0) + connectAttempts := int64(0) + + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + atomic.AddInt64(&acquireAttempts, 1) + return true + } + config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error { + atomic.AddInt64(&connectAttempts, 1) + return nil + } + + pool, err := pgxpool.ConnectConfig(context.Background(), config) + require.NoError(t, err) + defer pool.Close() + + stat := pool.Stat() + require.Equal(t, int32(1), stat.IdleConns()) + require.Equal(t, int64(1), stat.AcquireCount()) + require.Equal(t, int32(1), stat.TotalConns()) + require.Equal(t, int64(0), acquireAttempts) + require.Equal(t, int64(1), connectAttempts) +}