2
0

Implements MinConns, the minimum size of the connection pool.

The health check will increase the number of connections to this amount if it had dropped below.
This commit is contained in:
Patrick Ellul
2020-02-04 08:17:53 +11:00
parent 77c1076d39
commit a01827732f
4 changed files with 51 additions and 2 deletions
+26
View File
@@ -13,6 +13,7 @@ import (
)
var defaultMaxConns = int32(4)
var defaultMinConns = int32(0)
var defaultMaxConnLifetime = time.Hour
var defaultMaxConnIdleTime = time.Minute * 30
var defaultHealthCheckPeriod = time.Minute
@@ -71,6 +72,7 @@ type Pool struct {
afterConnect func(context.Context, *pgx.Conn) error
beforeAcquire func(context.Context, *pgx.Conn) bool
afterRelease func(*pgx.Conn) bool
minConns int32
maxConnLifetime time.Duration
maxConnIdleTime time.Duration
healthCheckPeriod time.Duration
@@ -103,6 +105,10 @@ type Config struct {
// MaxConns is the maximum size of the pool.
MaxConns int32
// MinConns is the minimum size of the pool. The health check will increase the number of connections to this
// amount if it had dropped below.
MinConns int32
// HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration
@@ -133,6 +139,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
afterConnect: config.AfterConnect,
beforeAcquire: config.BeforeAcquire,
afterRelease: config.AfterRelease,
minConns: config.MinConns,
maxConnLifetime: config.MaxConnLifetime,
maxConnIdleTime: config.MaxConnIdleTime,
healthCheckPeriod: config.HealthCheckPeriod,
@@ -190,6 +197,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
// addition of the following variables:
//
// pool_max_conns: integer greater than 0
// pool_min_conns: integer 0 or greater
// pool_max_conn_lifetime: duration string
// pool_max_conn_idle_time: duration string
// pool_health_check_period: duration string
@@ -229,6 +237,17 @@ func ParseConfig(connString string) (*Config, error) {
}
}
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_min_conns")
n, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return nil, errors.Errorf("cannot parse pool_min_conns: %w", err)
}
config.MinConns = int32(n)
} else {
config.MinConns = defaultMinConns
}
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
d, err := time.ParseDuration(s)
@@ -282,6 +301,7 @@ func (p *Pool) backgroundHealthCheck() {
return
case <-ticker.C:
p.checkIdleConnsHealth()
p.checkMinConns()
}
}
}
@@ -301,6 +321,12 @@ func (p *Pool) checkIdleConnsHealth() {
}
}
func (p *Pool) checkMinConns() {
for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- {
go p.p.CreateResource(context.Background())
}
}
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
for {
res, err := p.p.Acquire(ctx)
+22 -1
View File
@@ -23,10 +23,12 @@ func TestConnect(t *testing.T) {
func TestParseConfigExtractsPoolArguments(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig("pool_max_conns=42")
config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1")
assert.NoError(t, err)
assert.EqualValues(t, 42, config.MaxConns)
assert.EqualValues(t, 1, config.MinConns)
assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
}
func TestConnectCancel(t *testing.T) {
@@ -276,6 +278,25 @@ func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
assert.EqualValues(t, 0, stats.TotalConns())
}
func TestPoolBackgroundChecksMinConns(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)
config.HealthCheckPeriod = 100 * time.Millisecond
config.MinConns = 2
db, err := pgxpool.ConnectConfig(context.Background(), config)
require.NoError(t, err)
defer db.Close()
time.Sleep(config.HealthCheckPeriod + 100*time.Millisecond)
stats := db.Stat()
assert.EqualValues(t, 2, stats.TotalConns())
}
func TestPoolExec(t *testing.T) {
t.Parallel()