2
0
Files
pgx/conn_pool.go
T
Jack Christensen 20d22a67e3 Prevent deadlock on ConnPool.Acquire with Reset
If no connections are available, and the pool is reset, there will
be no connections at all in the pool. So the pool needs to be able
to create a connection after waiting for a connection to be released.
2016-03-04 16:19:48 -06:00

304 lines
7.6 KiB
Go

package pgx
import (
"errors"
"sync"
)
type ConnPoolConfig struct {
ConnConfig
MaxConnections int // max simultaneous connections to use, default 5, must be at least 2
AfterConnect func(*Conn) error // function to call on every new connection
}
type ConnPool struct {
allConnections []*Conn
availableConnections []*Conn
cond *sync.Cond
config ConnConfig // config used when establishing connection
maxConnections int
resetCount int
afterConnect func(*Conn) error
logger Logger
logLevel int
closed bool
}
type ConnPoolStat struct {
MaxConnections int // max simultaneous connections to use
CurrentConnections int // current live connections
AvailableConnections int // unused live connections
}
// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to
// Connect directly.
func NewConnPool(config ConnPoolConfig) (p *ConnPool, err error) {
p = new(ConnPool)
p.config = config.ConnConfig
p.maxConnections = config.MaxConnections
if p.maxConnections == 0 {
p.maxConnections = 5
}
if p.maxConnections < 1 {
return nil, errors.New("MaxConnections must be at least 1")
}
p.afterConnect = config.AfterConnect
if config.LogLevel != 0 {
p.logLevel = config.LogLevel
} else {
// Preserve pre-LogLevel behavior by defaulting to LogLevelDebug
p.logLevel = LogLevelDebug
}
p.logger = config.Logger
if p.logger == nil {
p.logLevel = LogLevelNone
}
p.allConnections = make([]*Conn, 0, p.maxConnections)
p.availableConnections = make([]*Conn, 0, p.maxConnections)
p.cond = sync.NewCond(new(sync.Mutex))
// Initially establish one connection
var c *Conn
c, err = p.createConnection()
if err != nil {
return
}
p.allConnections = append(p.allConnections, c)
p.availableConnections = append(p.availableConnections, c)
return
}
// Acquire takes exclusive use of a connection until it is released.
func (p *ConnPool) Acquire() (*Conn, error) {
p.cond.L.Lock()
c, err := p.acquire()
p.cond.L.Unlock()
return c, err
}
// acquire performs acquision assuming pool is already locked
func (p *ConnPool) acquire() (*Conn, error) {
if p.closed {
return nil, errors.New("cannot acquire from closed pool")
}
// A connection is available
if len(p.availableConnections) > 0 {
c := p.availableConnections[len(p.availableConnections)-1]
c.poolResetCount = p.resetCount
p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
return c, nil
}
// No connections are available, but we can create more
if len(p.allConnections) < p.maxConnections {
c, err := p.createConnection()
if err != nil {
return nil, err
}
c.poolResetCount = p.resetCount
p.allConnections = append(p.allConnections, c)
return c, nil
}
// All connections are in use and we cannot create more
if p.logLevel >= LogLevelWarn {
p.logger.Warn("All connections in pool are busy - waiting...")
}
// Wait until there is an available connection OR room to create a new connection
for len(p.availableConnections) == 0 && len(p.allConnections) == p.maxConnections {
p.cond.Wait()
}
return p.acquire()
}
// Release gives up use of a connection.
func (p *ConnPool) Release(conn *Conn) {
if conn.TxStatus != 'I' {
conn.Exec("rollback")
}
if len(conn.channels) > 0 {
if err := conn.Unlisten("*"); err != nil {
conn.die(err)
}
conn.channels = make(map[string]struct{})
}
conn.notifications = nil
p.cond.L.Lock()
if conn.poolResetCount != p.resetCount {
conn.Close()
p.cond.L.Unlock()
p.cond.Signal()
return
}
if conn.IsAlive() {
p.availableConnections = append(p.availableConnections, conn)
} else {
ac := p.allConnections
for i, c := range ac {
if conn == c {
ac[i] = ac[len(ac)-1]
p.allConnections = ac[0 : len(ac)-1]
break
}
}
}
p.cond.L.Unlock()
p.cond.Signal()
}
// Close ends the use of a connection pool. It prevents any new connections
// from being acquired, waits until all acquired connections are released,
// then closes all underlying connections.
func (p *ConnPool) Close() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.closed = true
// Wait until all connections are released
if len(p.availableConnections) != len(p.allConnections) {
for len(p.availableConnections) != len(p.allConnections) {
p.cond.Wait()
}
}
for _, c := range p.allConnections {
_ = c.Close()
}
}
// Reset closes all open connections, but leaves the pool open. It is intended
// for use when an error is detected that would disrupt all connections (such as
// a network interruption or a server state change).
//
// It is safe to reset a pool while connections are checked out. Those
// connections will be closed when they are returned to the pool.
func (p *ConnPool) Reset() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.resetCount++
p.allConnections = make([]*Conn, 0, p.maxConnections)
p.availableConnections = make([]*Conn, 0, p.maxConnections)
}
// Stat returns connection pool statistics
func (p *ConnPool) Stat() (s ConnPoolStat) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
s.MaxConnections = p.maxConnections
s.CurrentConnections = len(p.allConnections)
s.AvailableConnections = len(p.availableConnections)
return
}
func (p *ConnPool) createConnection() (c *Conn, err error) {
c, err = Connect(p.config)
if err != nil {
return
}
if p.afterConnect != nil {
err = p.afterConnect(c)
if err != nil {
return
}
}
return
}
// Exec acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) Exec(sql string, arguments ...interface{}) (commandTag CommandTag, err error) {
var c *Conn
if c, err = p.Acquire(); err != nil {
return
}
defer p.Release(c)
return c.Exec(sql, arguments...)
}
// Query acquires a connection and delegates the call to that connection. When
// *Rows are closed, the connection is released automatically.
func (p *ConnPool) Query(sql string, args ...interface{}) (*Rows, error) {
c, err := p.Acquire()
if err != nil {
// Because checking for errors can be deferred to the *Rows, build one with the error
return &Rows{closed: true, err: err}, err
}
rows, err := c.Query(sql, args...)
if err != nil {
p.Release(c)
return rows, err
}
rows.AfterClose(p.rowsAfterClose)
return rows, nil
}
// QueryRow acquires a connection and delegates the call to that connection. The
// connection is released automatically after Scan is called on the returned
// *Row.
func (p *ConnPool) QueryRow(sql string, args ...interface{}) *Row {
rows, _ := p.Query(sql, args...)
return (*Row)(rows)
}
// Begin acquires a connection and begins a transaction on it. When the
// transaction is closed the connection will be automatically released.
func (p *ConnPool) Begin() (*Tx, error) {
return p.BeginIso("")
}
// BeginIso acquires a connection and begins a transaction in isolation mode iso
// on it. When the transaction is closed the connection will be automatically
// released.
func (p *ConnPool) BeginIso(iso string) (*Tx, error) {
for {
c, err := p.Acquire()
if err != nil {
return nil, err
}
tx, err := c.BeginIso(iso)
if err != nil {
alive := c.IsAlive()
p.Release(c)
// If connection is still alive then the error is not something trying
// again on a new connection would fix, so just return the error. But
// if the connection is dead try to acquire a new connection and try
// again.
if alive {
return nil, err
} else {
continue
}
}
tx.AfterClose(p.txAfterClose)
return tx, nil
}
}
func (p *ConnPool) txAfterClose(tx *Tx) {
p.Release(tx.Conn())
}
func (p *ConnPool) rowsAfterClose(rows *Rows) {
p.Release(rows.Conn())
}