Remove stdlib.OpenFromConnPool
This commit is contained in:
@@ -13,41 +13,6 @@
|
|||||||
// if err != nil {
|
// if err != nil {
|
||||||
// return err
|
// return err
|
||||||
// }
|
// }
|
||||||
//
|
|
||||||
// Or a normal pgx connection pool can be established and the database/sql
|
|
||||||
// connection can be created through stdlib.OpenFromConnPool(). This allows
|
|
||||||
// more control over the connection process (such as TLS), more control
|
|
||||||
// over the connection pool, setting an AfterConnect hook, and using both
|
|
||||||
// database/sql and pgx interfaces as needed.
|
|
||||||
//
|
|
||||||
// connConfig := pgx.ConnConfig{
|
|
||||||
// Host: "localhost",
|
|
||||||
// User: "pgx_md5",
|
|
||||||
// Password: "secret",
|
|
||||||
// Database: "pgx_test",
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// config := pgx.ConnPoolConfig{ConnConfig: connConfig}
|
|
||||||
// pool, err := pgx.NewConnPool(config)
|
|
||||||
// if err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// db, err := stdlib.OpenFromConnPool(pool)
|
|
||||||
// if err != nil {
|
|
||||||
// t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// If the database/sql connection is established through
|
|
||||||
// stdlib.OpenFromConnPool then access to a pgx *ConnPool can be regained
|
|
||||||
// through db.Driver(). This allows writing a fast path for pgx while
|
|
||||||
// preserving compatibility with other drivers and database
|
|
||||||
//
|
|
||||||
// if driver, ok := db.Driver().(*stdlib.Driver); ok && driver.Pool != nil {
|
|
||||||
// // fast path with pgx
|
|
||||||
// } else {
|
|
||||||
// // normal path for other drivers and databases
|
|
||||||
// }
|
|
||||||
package stdlib
|
package stdlib
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -55,7 +20,6 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"database/sql/driver"
|
"database/sql/driver"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -64,11 +28,6 @@ import (
|
|||||||
"github.com/jackc/pgx/pgtype"
|
"github.com/jackc/pgx/pgtype"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
openFromConnPoolCountMu sync.Mutex
|
|
||||||
openFromConnPoolCount int
|
|
||||||
)
|
|
||||||
|
|
||||||
// oids that map to intrinsic database/sql types. These will be allowed to be
|
// oids that map to intrinsic database/sql types. These will be allowed to be
|
||||||
// binary, anything else will be forced to text format
|
// binary, anything else will be forced to text format
|
||||||
var databaseSqlOids map[pgtype.Oid]bool
|
var databaseSqlOids map[pgtype.Oid]bool
|
||||||
@@ -98,23 +57,12 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Driver struct {
|
type Driver struct {
|
||||||
Pool *pgx.ConnPool
|
|
||||||
|
|
||||||
configMutex sync.Mutex
|
configMutex sync.Mutex
|
||||||
configCount int64
|
configCount int64
|
||||||
configs map[int64]*DriverConfig
|
configs map[int64]*DriverConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Driver) Open(name string) (driver.Conn, error) {
|
func (d *Driver) Open(name string) (driver.Conn, error) {
|
||||||
if d.Pool != nil {
|
|
||||||
conn, err := d.Pool.Acquire()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Conn{conn: conn, pool: d.Pool}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var connConfig pgx.ConnConfig
|
var connConfig pgx.ConnConfig
|
||||||
var afterConnect func(*pgx.Conn) error
|
var afterConnect func(*pgx.Conn) error
|
||||||
if len(name) >= 9 && name[0] == 0 {
|
if len(name) >= 9 && name[0] == 0 {
|
||||||
@@ -194,49 +142,8 @@ func UnregisterDriverConfig(c *DriverConfig) {
|
|||||||
pgxDriver.unregisterDriverConfig(c)
|
pgxDriver.unregisterDriverConfig(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenFromConnPool takes the existing *pgx.ConnPool pool and returns a *sql.DB
|
|
||||||
// with pool as the backend. This enables full control over the connection
|
|
||||||
// process and configuration while maintaining compatibility with the
|
|
||||||
// database/sql interface. In addition, by calling Driver() on the returned
|
|
||||||
// *sql.DB and typecasting to *stdlib.Driver a reference to the pgx.ConnPool can
|
|
||||||
// be reaquired later. This allows fast paths targeting pgx to be used while
|
|
||||||
// still maintaining compatibility with other databases and drivers.
|
|
||||||
//
|
|
||||||
// pool connection size must be at least 2.
|
|
||||||
func OpenFromConnPool(pool *pgx.ConnPool) (*sql.DB, error) {
|
|
||||||
d := &Driver{Pool: pool}
|
|
||||||
|
|
||||||
openFromConnPoolCountMu.Lock()
|
|
||||||
name := fmt.Sprintf("pgx-%d", openFromConnPoolCount)
|
|
||||||
openFromConnPoolCount++
|
|
||||||
openFromConnPoolCountMu.Unlock()
|
|
||||||
|
|
||||||
sql.Register(name, d)
|
|
||||||
db, err := sql.Open(name, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Presumably OpenFromConnPool is being used because the user wants to use
|
|
||||||
// database/sql most of the time, but fast path with pgx some of the time.
|
|
||||||
// Allow database/sql to use all the connections, but release 2 idle ones.
|
|
||||||
// Don't have database/sql immediately release all idle connections because
|
|
||||||
// that would mean that prepared statements would be lost (which kills
|
|
||||||
// performance if the prepared statements constantly have to be reprepared)
|
|
||||||
stat := pool.Stat()
|
|
||||||
|
|
||||||
if stat.MaxConnections <= 2 {
|
|
||||||
return nil, errors.New("pool connection size must be at least 3")
|
|
||||||
}
|
|
||||||
db.SetMaxIdleConns(stat.MaxConnections - 2)
|
|
||||||
db.SetMaxOpenConns(stat.MaxConnections)
|
|
||||||
|
|
||||||
return db, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
conn *pgx.Conn
|
conn *pgx.Conn
|
||||||
pool *pgx.ConnPool
|
|
||||||
psCount int64 // Counter used for creating unique prepared statement names
|
psCount int64 // Counter used for creating unique prepared statement names
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -259,11 +166,6 @@ func (c *Conn) Prepare(query string) (driver.Stmt, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
func (c *Conn) Close() error {
|
||||||
if c.pool != nil {
|
|
||||||
c.pool.Release(c.conn)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+14
-101
@@ -3,7 +3,6 @@ package stdlib_test
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/jackc/pgx"
|
"github.com/jackc/pgx"
|
||||||
@@ -120,88 +119,6 @@ func TestNormalLifeCycle(t *testing.T) {
|
|||||||
ensureConnValid(t, db)
|
ensureConnValid(t, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSqlOpenDoesNotHavePool(t *testing.T) {
|
|
||||||
db := openDB(t)
|
|
||||||
defer closeDB(t, db)
|
|
||||||
|
|
||||||
driver := db.Driver().(*stdlib.Driver)
|
|
||||||
if driver.Pool != nil {
|
|
||||||
t.Fatal("Did not expect driver opened through database/sql to have Pool, but it did")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOpenFromConnPool(t *testing.T) {
|
|
||||||
connConfig := pgx.ConnConfig{
|
|
||||||
Host: "127.0.0.1",
|
|
||||||
User: "pgx_md5",
|
|
||||||
Password: "secret",
|
|
||||||
Database: "pgx_test",
|
|
||||||
}
|
|
||||||
|
|
||||||
config := pgx.ConnPoolConfig{ConnConfig: connConfig}
|
|
||||||
pool, err := pgx.NewConnPool(config)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
}
|
|
||||||
defer pool.Close()
|
|
||||||
|
|
||||||
db, err := stdlib.OpenFromConnPool(pool)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
}
|
|
||||||
defer closeDB(t, db)
|
|
||||||
|
|
||||||
// Can get pgx.ConnPool from driver
|
|
||||||
driver := db.Driver().(*stdlib.Driver)
|
|
||||||
if driver.Pool == nil {
|
|
||||||
t.Fatal("Expected driver opened through OpenFromConnPool to have Pool, but it did not")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Normal sql/database still works
|
|
||||||
var n int64
|
|
||||||
err = db.QueryRow("select 1").Scan(&n)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("db.QueryRow unexpectedly failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOpenFromConnPoolRace(t *testing.T) {
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
connConfig := pgx.ConnConfig{
|
|
||||||
Host: "127.0.0.1",
|
|
||||||
User: "pgx_md5",
|
|
||||||
Password: "secret",
|
|
||||||
Database: "pgx_test",
|
|
||||||
}
|
|
||||||
|
|
||||||
config := pgx.ConnPoolConfig{ConnConfig: connConfig}
|
|
||||||
pool, err := pgx.NewConnPool(config)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
}
|
|
||||||
defer pool.Close()
|
|
||||||
|
|
||||||
wg.Add(10)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
db, err := stdlib.OpenFromConnPool(pool)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
}
|
|
||||||
defer closeDB(t, db)
|
|
||||||
|
|
||||||
// Can get pgx.ConnPool from driver
|
|
||||||
driver := db.Driver().(*stdlib.Driver)
|
|
||||||
if driver.Pool == nil {
|
|
||||||
t.Fatal("Expected driver opened through OpenFromConnPool to have Pool, but it did not")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOpenWithDriverConfigAfterConnect(t *testing.T) {
|
func TestOpenWithDriverConfigAfterConnect(t *testing.T) {
|
||||||
driverConfig := stdlib.DriverConfig{
|
driverConfig := stdlib.DriverConfig{
|
||||||
AfterConnect: func(c *pgx.Conn) error {
|
AfterConnect: func(c *pgx.Conn) error {
|
||||||
@@ -217,6 +134,7 @@ func TestOpenWithDriverConfigAfterConnect(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("sql.Open failed: %v", err)
|
t.Fatalf("sql.Open failed: %v", err)
|
||||||
}
|
}
|
||||||
|
defer closeDB(t, db)
|
||||||
|
|
||||||
var n int64
|
var n int64
|
||||||
err = db.QueryRow("select nextval('pgx')").Scan(&n)
|
err = db.QueryRow("select nextval('pgx')").Scan(&n)
|
||||||
@@ -407,37 +325,32 @@ func (l *testLogger) Log(lvl pgx.LogLevel, msg string, data map[string]interface
|
|||||||
func TestConnQueryLog(t *testing.T) {
|
func TestConnQueryLog(t *testing.T) {
|
||||||
logger := &testLogger{}
|
logger := &testLogger{}
|
||||||
|
|
||||||
connConfig := pgx.ConnConfig{
|
driverConfig := stdlib.DriverConfig{
|
||||||
Host: "127.0.0.1",
|
ConnConfig: pgx.ConnConfig{
|
||||||
User: "pgx_md5",
|
Host: "127.0.0.1",
|
||||||
Password: "secret",
|
User: "pgx_md5",
|
||||||
Database: "pgx_test",
|
Password: "secret",
|
||||||
Logger: logger,
|
Database: "pgx_test",
|
||||||
|
Logger: logger,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
config := pgx.ConnPoolConfig{ConnConfig: connConfig}
|
stdlib.RegisterDriverConfig(&driverConfig)
|
||||||
pool, err := pgx.NewConnPool(config)
|
defer stdlib.UnregisterDriverConfig(&driverConfig)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
|
||||||
}
|
|
||||||
defer pool.Close()
|
|
||||||
|
|
||||||
db, err := stdlib.OpenFromConnPool(pool)
|
db, err := sql.Open("pgx", driverConfig.ConnectionString(""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
t.Fatalf("sql.Open failed: %v", err)
|
||||||
}
|
}
|
||||||
defer closeDB(t, db)
|
defer closeDB(t, db)
|
||||||
|
|
||||||
// clear logs from initial connection
|
|
||||||
logger.logs = []testLog{}
|
|
||||||
|
|
||||||
var n int64
|
var n int64
|
||||||
err = db.QueryRow("select 1").Scan(&n)
|
err = db.QueryRow("select 1").Scan(&n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("db.QueryRow unexpectedly failed: %v", err)
|
t.Fatalf("db.QueryRow unexpectedly failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
l := logger.logs[0]
|
l := logger.logs[len(logger.logs)-1]
|
||||||
if l.msg != "Query" {
|
if l.msg != "Query" {
|
||||||
t.Errorf("Expected to log Query, but got %v", l)
|
t.Errorf("Expected to log Query, but got %v", l)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user