c3e41872a8
This is in preparation for a Begin / Tx interface that will similate nested transactions with savepoints. In addition, this passes the TxOptions struct by value and thereby removes an allocation.
381 lines
9.5 KiB
Go
381 lines
9.5 KiB
Go
package pgxpool
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/jackc/pgconn"
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/jackc/puddle"
|
|
errors "golang.org/x/xerrors"
|
|
)
|
|
|
|
var defaultMaxConns = int32(4)
|
|
var defaultMaxConnLifetime = time.Hour
|
|
var defaultHealthCheckPeriod = time.Minute
|
|
|
|
type connResource struct {
|
|
conn *pgx.Conn
|
|
conns []Conn
|
|
poolRows []poolRow
|
|
poolRowss []poolRows
|
|
}
|
|
|
|
func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn {
|
|
if len(cr.conns) == 0 {
|
|
cr.conns = make([]Conn, 128)
|
|
}
|
|
|
|
c := &cr.conns[len(cr.conns)-1]
|
|
cr.conns = cr.conns[0 : len(cr.conns)-1]
|
|
|
|
c.res = res
|
|
c.p = p
|
|
|
|
return c
|
|
}
|
|
|
|
func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
|
|
if len(cr.poolRows) == 0 {
|
|
cr.poolRows = make([]poolRow, 128)
|
|
}
|
|
|
|
pr := &cr.poolRows[len(cr.poolRows)-1]
|
|
cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
|
|
|
|
pr.c = c
|
|
pr.r = r
|
|
|
|
return pr
|
|
}
|
|
|
|
func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
|
if len(cr.poolRowss) == 0 {
|
|
cr.poolRowss = make([]poolRows, 128)
|
|
}
|
|
|
|
pr := &cr.poolRowss[len(cr.poolRowss)-1]
|
|
cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
|
|
|
|
pr.c = c
|
|
pr.r = r
|
|
|
|
return pr
|
|
}
|
|
|
|
type Pool struct {
|
|
p *puddle.Pool
|
|
afterConnect func(context.Context, *pgx.Conn) error
|
|
beforeAcquire func(*pgx.Conn) bool
|
|
afterRelease func(*pgx.Conn) bool
|
|
maxConnLifetime time.Duration
|
|
healthCheckPeriod time.Duration
|
|
closeChan chan struct{}
|
|
}
|
|
|
|
// Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by
|
|
// ParseConfig rather than to construct a Config from scratch.
|
|
type Config struct {
|
|
ConnConfig *pgx.ConnConfig
|
|
|
|
// AfterConnect is called after a connection is established, but before it is added to the pool.
|
|
AfterConnect func(context.Context, *pgx.Conn) error
|
|
|
|
// BeforeAcquire is called before before a connection is acquired from the pool. It must return true to allow the
|
|
// acquision or false to indicate that the connection should be destroyed and a different connection should be
|
|
// acquired.
|
|
BeforeAcquire func(*pgx.Conn) bool
|
|
|
|
// AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
|
|
// return the connection to the pool or false to destroy the connection.
|
|
AfterRelease func(*pgx.Conn) bool
|
|
|
|
// MaxConnLifetime is the duration after which a connection will be automatically closed.
|
|
MaxConnLifetime time.Duration
|
|
|
|
// MaxConns is the maximum size of the pool.
|
|
MaxConns int32
|
|
|
|
// HealthCheckPeriod is the duration between checks of the health of idle connections.
|
|
HealthCheckPeriod time.Duration
|
|
}
|
|
|
|
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
|
// connection. See ParseConfig for information on connString format.
|
|
func Connect(ctx context.Context, connString string) (*Pool, error) {
|
|
config, err := ParseConfig(connString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ConnectConfig(ctx, config)
|
|
}
|
|
|
|
// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
|
// connection.
|
|
func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
|
|
p := &Pool{
|
|
afterConnect: config.AfterConnect,
|
|
beforeAcquire: config.BeforeAcquire,
|
|
afterRelease: config.AfterRelease,
|
|
maxConnLifetime: config.MaxConnLifetime,
|
|
healthCheckPeriod: config.HealthCheckPeriod,
|
|
closeChan: make(chan struct{}),
|
|
}
|
|
|
|
p.p = puddle.NewPool(
|
|
func(ctx context.Context) (interface{}, error) {
|
|
conn, err := pgx.ConnectConfig(ctx, config.ConnConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if p.afterConnect != nil {
|
|
err = p.afterConnect(ctx, conn)
|
|
if err != nil {
|
|
conn.Close(ctx)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
cr := &connResource{
|
|
conn: conn,
|
|
conns: make([]Conn, 64),
|
|
poolRows: make([]poolRow, 64),
|
|
poolRowss: make([]poolRows, 64),
|
|
}
|
|
|
|
return cr, nil
|
|
},
|
|
func(value interface{}) {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
value.(*connResource).conn.Close(ctx)
|
|
cancel()
|
|
}()
|
|
},
|
|
config.MaxConns,
|
|
)
|
|
|
|
go p.backgroundHealthCheck()
|
|
|
|
// Initially establish one connection
|
|
res, err := p.p.Acquire(ctx)
|
|
if err != nil {
|
|
p.p.Close()
|
|
return nil, err
|
|
}
|
|
res.Release()
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// ParseConfig builds a Config from connString. It parses connString with the same behavior as pgx.ParseConfig with the
|
|
// addition of the following variables:
|
|
//
|
|
// pool_max_conns: integer greater than 0
|
|
// pool_max_conn_lifetime: duration string
|
|
// pool_health_check_period: duration string
|
|
//
|
|
// See Config for definitions of these arguments.
|
|
//
|
|
// # Example DSN
|
|
// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
|
|
//
|
|
// # Example URL
|
|
// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
|
|
func ParseConfig(connString string) (*Config, error) {
|
|
connConfig, err := pgx.ParseConfig(connString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config := &Config{ConnConfig: connConfig}
|
|
|
|
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
|
|
delete(connConfig.Config.RuntimeParams, "pool_max_conns")
|
|
n, err := strconv.ParseInt(s, 10, 32)
|
|
if err != nil {
|
|
return nil, errors.Errorf("cannot parse pool_max_conns: %w", err)
|
|
}
|
|
if n < 1 {
|
|
return nil, errors.Errorf("pool_max_conns too small: %d", n)
|
|
}
|
|
config.MaxConns = int32(n)
|
|
} else {
|
|
config.MaxConns = defaultMaxConns
|
|
if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
|
|
config.MaxConns = numCPU
|
|
}
|
|
}
|
|
|
|
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
|
|
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
|
|
d, err := time.ParseDuration(s)
|
|
if err != nil {
|
|
return nil, errors.Errorf("invalid pool_max_conn_lifetime: %w", err)
|
|
}
|
|
config.MaxConnLifetime = d
|
|
} else {
|
|
config.MaxConnLifetime = defaultMaxConnLifetime
|
|
}
|
|
|
|
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
|
|
delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
|
|
d, err := time.ParseDuration(s)
|
|
if err != nil {
|
|
return nil, errors.Errorf("invalid pool_health_check_period: %w", err)
|
|
}
|
|
config.HealthCheckPeriod = d
|
|
} else {
|
|
config.HealthCheckPeriod = defaultHealthCheckPeriod
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
|
|
// to pool and closed.
|
|
func (p *Pool) Close() {
|
|
close(p.closeChan)
|
|
p.p.Close()
|
|
}
|
|
|
|
func (p *Pool) backgroundHealthCheck() {
|
|
ticker := time.NewTicker(p.healthCheckPeriod)
|
|
|
|
for {
|
|
select {
|
|
case <-p.closeChan:
|
|
ticker.Stop()
|
|
return
|
|
case <-ticker.C:
|
|
p.checkIdleConnsHealth()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) checkIdleConnsHealth() {
|
|
resources := p.p.AcquireAllIdle()
|
|
|
|
now := time.Now()
|
|
for _, res := range resources {
|
|
if now.Sub(res.CreationTime()) > p.maxConnLifetime {
|
|
res.Destroy()
|
|
} else {
|
|
res.Release()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
|
for {
|
|
res, err := p.p.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cr := res.Value().(*connResource)
|
|
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
|
return cr.getConn(p, res), nil
|
|
}
|
|
|
|
res.Destroy()
|
|
}
|
|
}
|
|
|
|
// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
|
|
// keep-alive functionality. It does not update pool statistics.
|
|
func (p *Pool) AcquireAllIdle() []*Conn {
|
|
resources := p.p.AcquireAllIdle()
|
|
conns := make([]*Conn, 0, len(resources))
|
|
for _, res := range resources {
|
|
cr := res.Value().(*connResource)
|
|
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
|
conns = append(conns, cr.getConn(p, res))
|
|
} else {
|
|
res.Destroy()
|
|
}
|
|
}
|
|
|
|
return conns
|
|
}
|
|
|
|
func (p *Pool) Stat() *Stat {
|
|
return &Stat{s: p.p.Stat()}
|
|
}
|
|
|
|
func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer c.Release()
|
|
|
|
return c.Exec(ctx, sql, arguments...)
|
|
}
|
|
|
|
func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return errRows{err: err}, err
|
|
}
|
|
|
|
rows, err := c.Query(ctx, sql, args...)
|
|
if err != nil {
|
|
c.Release()
|
|
return errRows{err: err}, err
|
|
}
|
|
|
|
return c.getPoolRows(rows), nil
|
|
}
|
|
|
|
func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return errRow{err: err}
|
|
}
|
|
|
|
row := c.QueryRow(ctx, sql, args...)
|
|
return c.getPoolRow(row)
|
|
}
|
|
|
|
func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return errBatchResults{err: err}
|
|
}
|
|
|
|
br := c.SendBatch(ctx, b)
|
|
return &poolBatchResults{br: br, c: c}
|
|
}
|
|
|
|
func (p *Pool) Begin(ctx context.Context) (*Tx, error) {
|
|
return p.BeginEx(ctx, pgx.TxOptions{})
|
|
}
|
|
func (p *Pool) BeginEx(ctx context.Context, txOptions pgx.TxOptions) (*Tx, error) {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, err := c.BeginEx(ctx, txOptions)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Tx{t: t, c: c}, err
|
|
}
|
|
|
|
func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
|
|
c, err := p.Acquire(ctx)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer c.Release()
|
|
|
|
return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
|
|
}
|