Merge branch 'master' into v5-dev
This commit is contained in:
+135
-40
@@ -3,9 +3,11 @@ package pgxpool
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
@@ -70,16 +72,24 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
||||
|
||||
// Pool allows for connection reuse.
|
||||
type Pool struct {
|
||||
p *puddle.Pool[*connResource]
|
||||
config *Config
|
||||
beforeConnect func(context.Context, *pgx.ConnConfig) error
|
||||
afterConnect func(context.Context, *pgx.Conn) error
|
||||
beforeAcquire func(context.Context, *pgx.Conn) bool
|
||||
afterRelease func(*pgx.Conn) bool
|
||||
minConns int32
|
||||
maxConnLifetime time.Duration
|
||||
maxConnIdleTime time.Duration
|
||||
healthCheckPeriod time.Duration
|
||||
p *puddle.Pool[*connResource]
|
||||
config *Config
|
||||
beforeConnect func(context.Context, *pgx.ConnConfig) error
|
||||
afterConnect func(context.Context, *pgx.Conn) error
|
||||
beforeAcquire func(context.Context, *pgx.Conn) bool
|
||||
afterRelease func(*pgx.Conn) bool
|
||||
minConns int32
|
||||
maxConns int32
|
||||
maxConnLifetime time.Duration
|
||||
maxConnLifetimeJitter time.Duration
|
||||
maxConnIdleTime time.Duration
|
||||
healthCheckPeriod time.Duration
|
||||
|
||||
healthCheckChan chan struct{}
|
||||
|
||||
newConnsCount int64
|
||||
lifetimeDestroyCount int64
|
||||
idleDestroyCount int64
|
||||
|
||||
closeOnce sync.Once
|
||||
closeChan chan struct{}
|
||||
@@ -109,14 +119,19 @@ type Config struct {
|
||||
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
|
||||
MaxConnLifetime time.Duration
|
||||
|
||||
// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
|
||||
// This helps prevent all connections from being closed at the exact same time, starving the pool.
|
||||
MaxConnLifetimeJitter time.Duration
|
||||
|
||||
// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
|
||||
MaxConnIdleTime time.Duration
|
||||
|
||||
// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
|
||||
MaxConns int32
|
||||
|
||||
// MinConns is the minimum size of the pool. The health check will increase the number of connections to this
|
||||
// amount if it had dropped below.
|
||||
// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
|
||||
// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
|
||||
// to create new connections.
|
||||
MinConns int32
|
||||
|
||||
// HealthCheckPeriod is the duration between checks of the health of idle connections.
|
||||
@@ -157,16 +172,19 @@ func NewConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||
}
|
||||
|
||||
p := &Pool{
|
||||
config: config,
|
||||
beforeConnect: config.BeforeConnect,
|
||||
afterConnect: config.AfterConnect,
|
||||
beforeAcquire: config.BeforeAcquire,
|
||||
afterRelease: config.AfterRelease,
|
||||
minConns: config.MinConns,
|
||||
maxConnLifetime: config.MaxConnLifetime,
|
||||
maxConnIdleTime: config.MaxConnIdleTime,
|
||||
healthCheckPeriod: config.HealthCheckPeriod,
|
||||
closeChan: make(chan struct{}),
|
||||
config: config,
|
||||
beforeConnect: config.BeforeConnect,
|
||||
afterConnect: config.AfterConnect,
|
||||
beforeAcquire: config.BeforeAcquire,
|
||||
afterRelease: config.AfterRelease,
|
||||
minConns: config.MinConns,
|
||||
maxConns: config.MaxConns,
|
||||
maxConnLifetime: config.MaxConnLifetime,
|
||||
maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
|
||||
maxConnIdleTime: config.MaxConnIdleTime,
|
||||
healthCheckPeriod: config.HealthCheckPeriod,
|
||||
healthCheckChan: make(chan struct{}, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
p.p = puddle.NewPool(
|
||||
@@ -216,7 +234,7 @@ func NewConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||
)
|
||||
|
||||
go func() {
|
||||
p.checkMinConns() // reach min conns as soon as possible
|
||||
p.createIdleResources(ctx, int(p.minConns))
|
||||
p.backgroundHealthCheck()
|
||||
}()
|
||||
|
||||
@@ -231,6 +249,7 @@ func NewConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||
// pool_max_conn_lifetime: duration string
|
||||
// pool_max_conn_idle_time: duration string
|
||||
// pool_health_check_period: duration string
|
||||
// pool_max_conn_lifetime_jitter: duration string
|
||||
//
|
||||
// See Config for definitions of these arguments.
|
||||
//
|
||||
@@ -311,6 +330,15 @@ func ParseConfig(connString string) (*Config, error) {
|
||||
config.HealthCheckPeriod = defaultHealthCheckPeriod
|
||||
}
|
||||
|
||||
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
|
||||
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
|
||||
d, err := time.ParseDuration(s)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
|
||||
}
|
||||
config.MaxConnLifetimeJitter = d
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
@@ -323,44 +351,105 @@ func (p *Pool) Close() {
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
|
||||
now := time.Now()
|
||||
// Small optimization to avoid rand. If it's over lifetime AND jitter, immediately
|
||||
// return true.
|
||||
if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter {
|
||||
return true
|
||||
}
|
||||
if p.maxConnLifetimeJitter == 0 {
|
||||
return false
|
||||
}
|
||||
jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds()
|
||||
return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second)
|
||||
}
|
||||
|
||||
func (p *Pool) triggerHealthCheck() {
|
||||
go func() {
|
||||
// Destroy is asynchronous so we give it time to actually remove itself from
|
||||
// the pool otherwise we might try to check the pool size too soon
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
select {
|
||||
case p.healthCheckChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Pool) backgroundHealthCheck() {
|
||||
ticker := time.NewTicker(p.healthCheckPeriod)
|
||||
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-p.closeChan:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-p.healthCheckChan:
|
||||
p.checkHealth()
|
||||
case <-ticker.C:
|
||||
p.checkIdleConnsHealth()
|
||||
p.checkMinConns()
|
||||
p.checkHealth()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) checkIdleConnsHealth() {
|
||||
resources := p.p.AcquireAllIdle()
|
||||
func (p *Pool) checkHealth() {
|
||||
for {
|
||||
// If checkMinConns failed we don't destroy any connections since we couldn't
|
||||
// even get to minConns
|
||||
if err := p.checkMinConns(); err != nil {
|
||||
// Should we log this error somewhere?
|
||||
break
|
||||
}
|
||||
if !p.checkConnsHealth() {
|
||||
// Since we didn't destroy any connections we can stop looping
|
||||
break
|
||||
}
|
||||
// Technically Destroy is asynchronous but 500ms should be enough for it to
|
||||
// remove it from the underlying pool
|
||||
select {
|
||||
case <-p.closeChan:
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
// checkConnsHealth will check all idle connections, destroy a connection if
|
||||
// it's idle or too old, and returns true if any were destroyed
|
||||
func (p *Pool) checkConnsHealth() bool {
|
||||
var destroyed bool
|
||||
totalConns := p.Stat().TotalConns()
|
||||
resources := p.p.AcquireAllIdle()
|
||||
for _, res := range resources {
|
||||
if now.Sub(res.CreationTime()) > p.maxConnLifetime {
|
||||
// We're okay going under minConns if the lifetime is up
|
||||
if p.isExpired(res) && totalConns >= p.minConns {
|
||||
atomic.AddInt64(&p.lifetimeDestroyCount, 1)
|
||||
res.Destroy()
|
||||
} else if res.IdleDuration() > p.maxConnIdleTime {
|
||||
destroyed = true
|
||||
// Since Destroy is async we manually decrement totalConns.
|
||||
totalConns--
|
||||
} else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
|
||||
atomic.AddInt64(&p.idleDestroyCount, 1)
|
||||
res.Destroy()
|
||||
destroyed = true
|
||||
// Since Destroy is async we manually decrement totalConns.
|
||||
totalConns--
|
||||
} else {
|
||||
res.ReleaseUnused()
|
||||
}
|
||||
}
|
||||
return destroyed
|
||||
}
|
||||
|
||||
func (p *Pool) checkMinConns() {
|
||||
for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
p.p.CreateResource(ctx)
|
||||
}()
|
||||
func (p *Pool) checkMinConns() error {
|
||||
// TotalConns can include ones that are being destroyed but we should have
|
||||
// sleep(500ms) around all of the destroys to help prevent that from throwing
|
||||
// off this check
|
||||
toCreate := p.minConns - p.Stat().TotalConns()
|
||||
if toCreate > 0 {
|
||||
return p.createIdleResources(context.Background(), int(toCreate))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
|
||||
@@ -371,6 +460,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
|
||||
|
||||
for i := 0; i < targetResources; i++ {
|
||||
go func() {
|
||||
atomic.AddInt64(&p.newConnsCount, 1)
|
||||
err := p.p.CreateResource(ctx)
|
||||
errs <- err
|
||||
}()
|
||||
@@ -449,7 +539,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() }
|
||||
|
||||
// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
|
||||
func (p *Pool) Stat() *Stat {
|
||||
return &Stat{s: p.p.Stat()}
|
||||
return &Stat{
|
||||
s: p.p.Stat(),
|
||||
newConnsCount: atomic.LoadInt64(&p.newConnsCount),
|
||||
lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
|
||||
idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
|
||||
}
|
||||
}
|
||||
|
||||
// Exec acquires a connection from the Pool and executes the given SQL.
|
||||
|
||||
Reference in New Issue
Block a user