Rename ConnectionPool to ConnPool
This commit is contained in:
+2
-2
@@ -634,9 +634,9 @@ func BenchmarkTimestampTzBinary(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkConnectionPool(b *testing.B) {
|
func BenchmarkConnPool(b *testing.B) {
|
||||||
options := pgx.ConnectionPoolOptions{MaxConnections: 5}
|
options := pgx.ConnectionPoolOptions{MaxConnections: 5}
|
||||||
pool, err := pgx.NewConnectionPool(*defaultConnConfig, options)
|
pool, err := pgx.NewConnPool(*defaultConnConfig, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("Unable to create connection pool: %v", err)
|
b.Fatalf("Unable to create connection pool: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ type ConnConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Conn is a PostgreSQL connection handle. It is not safe for concurrent usage.
|
// Conn is a PostgreSQL connection handle. It is not safe for concurrent usage.
|
||||||
// Use ConnectionPool to manage access to multiple database connections from multiple
|
// Use ConnPool to manage access to multiple database connections from multiple
|
||||||
// goroutines.
|
// goroutines.
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
conn net.Conn // the underlying TCP or unix domain socket connection
|
conn net.Conn // the underlying TCP or unix domain socket connection
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type ConnectionPoolOptions struct {
|
|||||||
Logger Logger
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionPool struct {
|
type ConnPool struct {
|
||||||
allConnections []*Conn
|
allConnections []*Conn
|
||||||
availableConnections []*Conn
|
availableConnections []*Conn
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
@@ -27,10 +27,10 @@ type ConnectionPoolStat struct {
|
|||||||
AvailableConnections int // unused live connections
|
AvailableConnections int // unused live connections
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnectionPool creates a new ConnectionPool. config are passed through to
|
// NewConnPool creates a new ConnPool. config are passed through to
|
||||||
// Connect directly.
|
// Connect directly.
|
||||||
func NewConnectionPool(config ConnConfig, options ConnectionPoolOptions) (p *ConnectionPool, err error) {
|
func NewConnPool(config ConnConfig, options ConnectionPoolOptions) (p *ConnPool, err error) {
|
||||||
p = new(ConnectionPool)
|
p = new(ConnPool)
|
||||||
p.config = config
|
p.config = config
|
||||||
p.maxConnections = options.MaxConnections
|
p.maxConnections = options.MaxConnections
|
||||||
p.afterConnect = options.AfterConnect
|
p.afterConnect = options.AfterConnect
|
||||||
@@ -57,7 +57,7 @@ func NewConnectionPool(config ConnConfig, options ConnectionPoolOptions) (p *Con
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Acquire takes exclusive use of a connection until it is released.
|
// Acquire takes exclusive use of a connection until it is released.
|
||||||
func (p *ConnectionPool) Acquire() (c *Conn, err error) {
|
func (p *ConnPool) Acquire() (c *Conn, err error) {
|
||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
defer p.cond.L.Unlock()
|
defer p.cond.L.Unlock()
|
||||||
|
|
||||||
@@ -93,7 +93,7 @@ func (p *ConnectionPool) Acquire() (c *Conn, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Release gives up use of a connection.
|
// Release gives up use of a connection.
|
||||||
func (p *ConnectionPool) Release(conn *Conn) {
|
func (p *ConnPool) Release(conn *Conn) {
|
||||||
if conn.TxStatus != 'I' {
|
if conn.TxStatus != 'I' {
|
||||||
conn.Execute("rollback")
|
conn.Execute("rollback")
|
||||||
}
|
}
|
||||||
@@ -116,7 +116,7 @@ func (p *ConnectionPool) Release(conn *Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close ends the use of a connection by closing all underlying connections.
|
// Close ends the use of a connection by closing all underlying connections.
|
||||||
func (p *ConnectionPool) Close() {
|
func (p *ConnPool) Close() {
|
||||||
for i := 0; i < p.maxConnections; i++ {
|
for i := 0; i < p.maxConnections; i++ {
|
||||||
if c, err := p.Acquire(); err != nil {
|
if c, err := p.Acquire(); err != nil {
|
||||||
_ = c.Close()
|
_ = c.Close()
|
||||||
@@ -124,7 +124,7 @@ func (p *ConnectionPool) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnectionPool) Stat() (s ConnectionPoolStat) {
|
func (p *ConnPool) Stat() (s ConnectionPoolStat) {
|
||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
defer p.cond.L.Unlock()
|
defer p.cond.L.Unlock()
|
||||||
|
|
||||||
@@ -134,15 +134,15 @@ func (p *ConnectionPool) Stat() (s ConnectionPoolStat) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnectionPool) MaxConnectionCount() int {
|
func (p *ConnPool) MaxConnectionCount() int {
|
||||||
return p.maxConnections
|
return p.maxConnections
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnectionPool) CurrentConnectionCount() int {
|
func (p *ConnPool) CurrentConnectionCount() int {
|
||||||
return p.maxConnections
|
return p.maxConnections
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnectionPool) createConnection() (c *Conn, err error) {
|
func (p *ConnPool) createConnection() (c *Conn, err error) {
|
||||||
c, err = Connect(p.config)
|
c, err = Connect(p.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@@ -157,7 +157,7 @@ func (p *ConnectionPool) createConnection() (c *Conn, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectFunc acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectFunc acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
|
func (p *ConnPool) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -168,7 +168,7 @@ func (p *ConnectionPool) SelectFunc(sql string, onDataRow func(*DataRowReader) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectRows acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectRows acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) {
|
func (p *ConnPool) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -179,7 +179,7 @@ func (p *ConnectionPool) SelectRows(sql string, arguments ...interface{}) (rows
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectRow acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectRow acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) {
|
func (p *ConnPool) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -190,7 +190,7 @@ func (p *ConnectionPool) SelectRow(sql string, arguments ...interface{}) (row ma
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectValue acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectValue acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) {
|
func (p *ConnPool) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -201,7 +201,7 @@ func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v in
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
func (p *ConnPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -212,7 +212,7 @@ func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SelectValues acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectValues acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
|
func (p *ConnPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -223,7 +223,7 @@ func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (val
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Execute acquires a connection, delegates the call to that connection, and releases the connection
|
// Execute acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
|
func (p *ConnPool) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -236,7 +236,7 @@ func (p *ConnectionPool) Execute(sql string, arguments ...interface{}) (commandT
|
|||||||
// Transaction acquires a connection, delegates the call to that connection,
|
// Transaction acquires a connection, delegates the call to that connection,
|
||||||
// and releases the connection. The call signature differs slightly from the
|
// and releases the connection. The call signature differs slightly from the
|
||||||
// underlying Transaction in that the callback function accepts a *Conn
|
// underlying Transaction in that the callback function accepts a *Conn
|
||||||
func (p *ConnectionPool) Transaction(f func(conn *Conn) bool) (committed bool, err error) {
|
func (p *ConnPool) Transaction(f func(conn *Conn) bool) (committed bool, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -251,7 +251,7 @@ func (p *ConnectionPool) Transaction(f func(conn *Conn) bool) (committed bool, e
|
|||||||
// TransactionIso acquires a connection, delegates the call to that connection,
|
// TransactionIso acquires a connection, delegates the call to that connection,
|
||||||
// and releases the connection. The call signature differs slightly from the
|
// and releases the connection. The call signature differs slightly from the
|
||||||
// underlying TransactionIso in that the callback function accepts a *Conn
|
// underlying TransactionIso in that the callback function accepts a *Conn
|
||||||
func (p *ConnectionPool) TransactionIso(isoLevel string, f func(conn *Conn) bool) (committed bool, err error) {
|
func (p *ConnPool) TransactionIso(isoLevel string, f func(conn *Conn) bool) (committed bool, err error) {
|
||||||
var c *Conn
|
var c *Conn
|
||||||
if c, err = p.Acquire(); err != nil {
|
if c, err = p.Acquire(); err != nil {
|
||||||
return
|
return
|
||||||
@@ -8,16 +8,16 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createConnectionPool(t *testing.T, maxConnections int) *pgx.ConnectionPool {
|
func createConnPool(t *testing.T, maxConnections int) *pgx.ConnPool {
|
||||||
options := pgx.ConnectionPoolOptions{MaxConnections: maxConnections}
|
options := pgx.ConnectionPoolOptions{MaxConnections: maxConnections}
|
||||||
pool, err := pgx.NewConnectionPool(*defaultConnConfig, options)
|
pool, err := pgx.NewConnPool(*defaultConnConfig, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
t.Fatalf("Unable to create connection pool: %v", err)
|
||||||
}
|
}
|
||||||
return pool
|
return pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewConnectionPool(t *testing.T) {
|
func TestNewConnPool(t *testing.T) {
|
||||||
var numCallbacks int
|
var numCallbacks int
|
||||||
afterConnect := func(c *pgx.Conn) error {
|
afterConnect := func(c *pgx.Conn) error {
|
||||||
numCallbacks++
|
numCallbacks++
|
||||||
@@ -25,7 +25,7 @@ func TestNewConnectionPool(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options := pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
options := pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
||||||
pool, err := pgx.NewConnectionPool(*defaultConnConfig, options)
|
pool, err := pgx.NewConnPool(*defaultConnConfig, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Unable to establish connection pool")
|
t.Fatal("Unable to establish connection pool")
|
||||||
}
|
}
|
||||||
@@ -44,7 +44,7 @@ func TestNewConnectionPool(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
options = pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
options = pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
||||||
pool, err = pgx.NewConnectionPool(*defaultConnConfig, options)
|
pool, err = pgx.NewConnPool(*defaultConnConfig, options)
|
||||||
if err != errAfterConnect {
|
if err != errAfterConnect {
|
||||||
t.Errorf("Expected errAfterConnect but received unexpected: %v", err)
|
t.Errorf("Expected errAfterConnect but received unexpected: %v", err)
|
||||||
}
|
}
|
||||||
@@ -54,7 +54,7 @@ func TestPoolAcquireAndReleaseCycle(t *testing.T) {
|
|||||||
maxConnections := 2
|
maxConnections := 2
|
||||||
incrementCount := int32(100)
|
incrementCount := int32(100)
|
||||||
completeSync := make(chan int)
|
completeSync := make(chan int)
|
||||||
pool := createConnectionPool(t, maxConnections)
|
pool := createConnPool(t, maxConnections)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
acquireAll := func() (connections []*pgx.Conn) {
|
acquireAll := func() (connections []*pgx.Conn) {
|
||||||
@@ -125,7 +125,7 @@ func TestPoolAcquireAndReleaseCycle(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolReleaseWithTransactions(t *testing.T) {
|
func TestPoolReleaseWithTransactions(t *testing.T) {
|
||||||
pool := createConnectionPool(t, 1)
|
pool := createConnPool(t, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
conn, err := pool.Acquire()
|
conn, err := pool.Acquire()
|
||||||
@@ -164,7 +164,7 @@ func TestPoolReleaseWithTransactions(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) {
|
func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) {
|
||||||
maxConnections := 3
|
maxConnections := 3
|
||||||
pool := createConnectionPool(t, maxConnections)
|
pool := createConnPool(t, maxConnections)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
doSomething := func() {
|
doSomething := func() {
|
||||||
@@ -203,7 +203,7 @@ func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) {
|
|||||||
|
|
||||||
func TestPoolReleaseDiscardsDeadConnections(t *testing.T) {
|
func TestPoolReleaseDiscardsDeadConnections(t *testing.T) {
|
||||||
maxConnections := 3
|
maxConnections := 3
|
||||||
pool := createConnectionPool(t, maxConnections)
|
pool := createConnPool(t, maxConnections)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
var c1, c2 *pgx.Conn
|
var c1, c2 *pgx.Conn
|
||||||
@@ -262,7 +262,7 @@ func TestPoolReleaseDiscardsDeadConnections(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolTransaction(t *testing.T) {
|
func TestPoolTransaction(t *testing.T) {
|
||||||
pool := createConnectionPool(t, 1)
|
pool := createConnPool(t, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
committed, err := pool.Transaction(func(conn *pgx.Conn) bool {
|
committed, err := pool.Transaction(func(conn *pgx.Conn) bool {
|
||||||
@@ -315,7 +315,7 @@ func TestPoolTransaction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolTransactionIso(t *testing.T) {
|
func TestPoolTransactionIso(t *testing.T) {
|
||||||
pool := createConnectionPool(t, 1)
|
pool := createConnPool(t, 1)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
committed, err := pool.TransactionIso("serializable", func(conn *pgx.Conn) bool {
|
committed, err := pool.TransactionIso("serializable", func(conn *pgx.Conn) bool {
|
||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
var pool *pgx.ConnectionPool
|
var pool *pgx.ConnPool
|
||||||
|
|
||||||
// afterConnect creates the prepared statements that this application uses
|
// afterConnect creates the prepared statements that this application uses
|
||||||
func afterConnect(conn *pgx.Conn) (err error) {
|
func afterConnect(conn *pgx.Conn) (err error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user