Rename package pool to pgxpool
pool is too common a name to occupy.
This commit is contained in:
@@ -0,0 +1,52 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
)
|
||||
|
||||
type errBatchResults struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (br errBatchResults) ExecResults() (pgconn.CommandTag, error) {
|
||||
return nil, br.err
|
||||
}
|
||||
|
||||
func (br errBatchResults) QueryResults() (pgx.Rows, error) {
|
||||
return errRows{err: br.err}, br.err
|
||||
}
|
||||
|
||||
func (br errBatchResults) QueryRowResults() pgx.Row {
|
||||
return errRow{err: br.err}
|
||||
}
|
||||
|
||||
func (br errBatchResults) Close() error {
|
||||
return br.err
|
||||
}
|
||||
|
||||
type poolBatchResults struct {
|
||||
br pgx.BatchResults
|
||||
c *Conn
|
||||
}
|
||||
|
||||
func (br *poolBatchResults) ExecResults() (pgconn.CommandTag, error) {
|
||||
return br.br.ExecResults()
|
||||
}
|
||||
|
||||
func (br *poolBatchResults) QueryResults() (pgx.Rows, error) {
|
||||
return br.br.QueryResults()
|
||||
}
|
||||
|
||||
func (br *poolBatchResults) QueryRowResults() pgx.Row {
|
||||
return br.br.QueryRowResults()
|
||||
}
|
||||
|
||||
func (br *poolBatchResults) Close() error {
|
||||
err := br.br.Close()
|
||||
if br.c != nil {
|
||||
br.c.Release()
|
||||
br.c = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package pgxpool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkAcquireAndRelease(b *testing.B) {
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(b, err)
|
||||
defer pool.Close()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c, err := pool.Acquire(context.Background())
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Release()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMinimalPreparedSelectBaseline(b *testing.B) {
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(b, err)
|
||||
|
||||
config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
|
||||
_, err := c.Prepare(ctx, "ps1", "select $1::int8")
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
require.NoError(b, err)
|
||||
|
||||
conn, err := db.Acquire(context.Background())
|
||||
require.NoError(b, err)
|
||||
defer conn.Release()
|
||||
|
||||
var n int64
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err = conn.QueryRow(context.Background(), "ps1", i).Scan(&n)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
if n != int64(i) {
|
||||
b.Fatalf("expected %d, got %d", i, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMinimalPreparedSelect(b *testing.B) {
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(b, err)
|
||||
|
||||
config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
|
||||
_, err := c.Prepare(ctx, "ps1", "select $1::int8")
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
require.NoError(b, err)
|
||||
|
||||
var n int64
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err = db.QueryRow(context.Background(), "ps1", i).Scan(&n)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
if n != int64(i) {
|
||||
b.Fatalf("expected %d, got %d", i, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,127 @@
|
||||
package pgxpool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Conn.Release is an asynchronous process that returns immediately. There is no signal when the actual work is
|
||||
// completed. To test something that relies on the actual work for Conn.Release being completed we must simply wait.
|
||||
// This function wraps the sleep so there is more meaning for the callers.
|
||||
func waitForReleaseToComplete() {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
|
||||
type execer interface {
|
||||
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
|
||||
}
|
||||
|
||||
func testExec(t *testing.T, db execer) {
|
||||
results, err := db.Exec(context.Background(), "set time zone 'America/Chicago'")
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, "SET", results)
|
||||
}
|
||||
|
||||
type queryer interface {
|
||||
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
|
||||
}
|
||||
|
||||
func testQuery(t *testing.T, db queryer) {
|
||||
var sum, rowCount int32
|
||||
|
||||
rows, err := db.Query(context.Background(), "select generate_series(1,$1)", 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
for rows.Next() {
|
||||
var n int32
|
||||
rows.Scan(&n)
|
||||
sum += n
|
||||
rowCount++
|
||||
}
|
||||
|
||||
assert.NoError(t, rows.Err())
|
||||
assert.Equal(t, int32(10), rowCount)
|
||||
assert.Equal(t, int32(55), sum)
|
||||
}
|
||||
|
||||
type queryRower interface {
|
||||
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
|
||||
}
|
||||
|
||||
func testQueryRow(t *testing.T, db queryRower) {
|
||||
var what, who string
|
||||
err := db.QueryRow(context.Background(), "select 'hello', $1::text", "world").Scan(&what, &who)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "hello", what)
|
||||
assert.Equal(t, "world", who)
|
||||
}
|
||||
|
||||
type sendBatcher interface {
|
||||
SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
|
||||
}
|
||||
|
||||
func testSendBatch(t *testing.T, db sendBatcher) {
|
||||
batch := &pgx.Batch{}
|
||||
batch.Queue("select 1", nil, nil, nil)
|
||||
batch.Queue("select 2", nil, nil, nil)
|
||||
|
||||
br := db.SendBatch(context.Background(), batch)
|
||||
|
||||
var err error
|
||||
var n int32
|
||||
err = br.QueryRowResults().Scan(&n)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, n)
|
||||
|
||||
err = br.QueryRowResults().Scan(&n)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 2, n)
|
||||
|
||||
err = br.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
type copyFromer interface {
|
||||
CopyFrom(context.Context, pgx.Identifier, []string, pgx.CopyFromSource) (int64, error)
|
||||
}
|
||||
|
||||
func testCopyFrom(t *testing.T, db interface {
|
||||
execer
|
||||
queryer
|
||||
copyFromer
|
||||
}) {
|
||||
_, err := db.Exec(context.Background(), `create temporary table foo(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
|
||||
require.NoError(t, err)
|
||||
|
||||
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
|
||||
|
||||
inputRows := [][]interface{}{
|
||||
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
|
||||
{nil, nil, nil, nil, nil, nil, nil},
|
||||
}
|
||||
|
||||
copyCount, err := db.CopyFrom(context.Background(), pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, len(inputRows), copyCount)
|
||||
|
||||
rows, err := db.Query(context.Background(), "select * from foo")
|
||||
assert.NoError(t, err)
|
||||
|
||||
var outputRows [][]interface{}
|
||||
for rows.Next() {
|
||||
row, err := rows.Values()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
||||
}
|
||||
outputRows = append(outputRows, row)
|
||||
}
|
||||
|
||||
assert.NoError(t, rows.Err())
|
||||
assert.Equal(t, inputRows, outputRows)
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/puddle"
|
||||
)
|
||||
|
||||
// Conn is an acquired *pgx.Conn from a Pool.
|
||||
type Conn struct {
|
||||
res *puddle.Resource
|
||||
p *Pool
|
||||
}
|
||||
|
||||
// Release returns c to the pool it was acquired from. Once Release has been called, other methods must not be called.
|
||||
// However, it is safe to call Release multiple times. Subsequent calls after the first will be ignored.
|
||||
func (c *Conn) Release() {
|
||||
if c.res == nil {
|
||||
return
|
||||
}
|
||||
|
||||
conn := c.Conn()
|
||||
res := c.res
|
||||
c.res = nil
|
||||
|
||||
now := time.Now()
|
||||
if !conn.IsAlive() || conn.PgConn().TxStatus != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) {
|
||||
res.Destroy()
|
||||
return
|
||||
}
|
||||
|
||||
if c.p.afterRelease == nil {
|
||||
res.Release()
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
if c.p.afterRelease(conn) {
|
||||
res.Release()
|
||||
} else {
|
||||
res.Destroy()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
||||
return c.Conn().Exec(ctx, sql, arguments...)
|
||||
}
|
||||
|
||||
func (c *Conn) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
|
||||
return c.Conn().Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (c *Conn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
|
||||
return c.Conn().QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||
return c.Conn().SendBatch(ctx, b)
|
||||
}
|
||||
|
||||
func (c *Conn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
|
||||
return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
|
||||
}
|
||||
|
||||
func (c *Conn) Begin(ctx context.Context, txOptions *pgx.TxOptions) (*pgx.Tx, error) {
|
||||
return c.Conn().Begin(ctx, txOptions)
|
||||
}
|
||||
|
||||
func (c *Conn) Conn() *pgx.Conn {
|
||||
return c.connResource().conn
|
||||
}
|
||||
|
||||
func (c *Conn) connResource() *connResource {
|
||||
return c.res.Value().(*connResource)
|
||||
}
|
||||
|
||||
func (c *Conn) getPoolRow(r pgx.Row) *poolRow {
|
||||
return c.connResource().getPoolRow(c, r)
|
||||
}
|
||||
|
||||
func (c *Conn) getPoolRows(r pgx.Rows) *poolRows {
|
||||
return c.connResource().getPoolRows(c, r)
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package pgxpool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConnExec(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer c.Release()
|
||||
|
||||
testExec(t, c)
|
||||
}
|
||||
|
||||
func TestConnQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer c.Release()
|
||||
|
||||
testQuery(t, c)
|
||||
}
|
||||
|
||||
func TestConnQueryRow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer c.Release()
|
||||
|
||||
testQueryRow(t, c)
|
||||
}
|
||||
|
||||
func TestConnSendBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer c.Release()
|
||||
|
||||
testSendBatch(t, c)
|
||||
}
|
||||
|
||||
func TestConnCopyFrom(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer c.Release()
|
||||
|
||||
testCopyFrom(t, c)
|
||||
}
|
||||
+377
@@ -0,0 +1,377 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/puddle"
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var defaultMaxConns = int32(4)
|
||||
var defaultMaxConnLifetime = time.Hour
|
||||
var defaultHealthCheckPeriod = time.Minute
|
||||
|
||||
type connResource struct {
|
||||
conn *pgx.Conn
|
||||
conns []Conn
|
||||
poolRows []poolRow
|
||||
poolRowss []poolRows
|
||||
}
|
||||
|
||||
func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn {
|
||||
if len(cr.conns) == 0 {
|
||||
cr.conns = make([]Conn, 128)
|
||||
}
|
||||
|
||||
c := &cr.conns[len(cr.conns)-1]
|
||||
cr.conns = cr.conns[0 : len(cr.conns)-1]
|
||||
|
||||
c.res = res
|
||||
c.p = p
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
|
||||
if len(cr.poolRows) == 0 {
|
||||
cr.poolRows = make([]poolRow, 128)
|
||||
}
|
||||
|
||||
pr := &cr.poolRows[len(cr.poolRows)-1]
|
||||
cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
||||
if len(cr.poolRowss) == 0 {
|
||||
cr.poolRowss = make([]poolRows, 128)
|
||||
}
|
||||
|
||||
pr := &cr.poolRowss[len(cr.poolRowss)-1]
|
||||
cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
type Pool struct {
|
||||
p *puddle.Pool
|
||||
afterConnect func(context.Context, *pgx.Conn) error
|
||||
beforeAcquire func(*pgx.Conn) bool
|
||||
afterRelease func(*pgx.Conn) bool
|
||||
maxConnLifetime time.Duration
|
||||
healthCheckPeriod time.Duration
|
||||
closeChan chan struct{}
|
||||
}
|
||||
|
||||
// Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by
|
||||
// ParseConfig rather than to construct a Config from scratch.
|
||||
type Config struct {
|
||||
ConnConfig *pgx.ConnConfig
|
||||
|
||||
// AfterConnect is called after a connection is established, but before it is added to the pool.
|
||||
AfterConnect func(context.Context, *pgx.Conn) error
|
||||
|
||||
// BeforeAcquire is called before before a connection is acquired from the pool. It must return true to allow the
|
||||
// acquision or false to indicate that the connection should be destroyed and a different connection should be
|
||||
// acquired.
|
||||
BeforeAcquire func(*pgx.Conn) bool
|
||||
|
||||
// AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
|
||||
// return the connection to the pool or false to destroy the connection.
|
||||
AfterRelease func(*pgx.Conn) bool
|
||||
|
||||
// MaxConnLifetime is the duration after which a connection will be automatically closed.
|
||||
MaxConnLifetime time.Duration
|
||||
|
||||
// MaxConns is the maximum size of the pool.
|
||||
MaxConns int32
|
||||
|
||||
// HealthCheckPeriod is the duration between checks of the health of idle connections.
|
||||
HealthCheckPeriod time.Duration
|
||||
}
|
||||
|
||||
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
||||
// connection. See ParseConfig for information on connString format.
|
||||
func Connect(ctx context.Context, connString string) (*Pool, error) {
|
||||
config, err := ParseConfig(connString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ConnectConfig(ctx, config)
|
||||
}
|
||||
|
||||
// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
||||
// connection.
|
||||
func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||
p := &Pool{
|
||||
afterConnect: config.AfterConnect,
|
||||
beforeAcquire: config.BeforeAcquire,
|
||||
afterRelease: config.AfterRelease,
|
||||
maxConnLifetime: config.MaxConnLifetime,
|
||||
healthCheckPeriod: config.HealthCheckPeriod,
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
p.p = puddle.NewPool(
|
||||
func(ctx context.Context) (interface{}, error) {
|
||||
conn, err := pgx.ConnectConfig(ctx, config.ConnConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.afterConnect != nil {
|
||||
err = p.afterConnect(ctx, conn)
|
||||
if err != nil {
|
||||
conn.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cr := &connResource{
|
||||
conn: conn,
|
||||
conns: make([]Conn, 64),
|
||||
poolRows: make([]poolRow, 64),
|
||||
poolRowss: make([]poolRows, 64),
|
||||
}
|
||||
|
||||
return cr, nil
|
||||
},
|
||||
func(value interface{}) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
value.(*connResource).conn.Close(ctx)
|
||||
cancel()
|
||||
}()
|
||||
},
|
||||
config.MaxConns,
|
||||
)
|
||||
|
||||
go p.backgroundHealthCheck()
|
||||
|
||||
// Initially establish one connection
|
||||
res, err := p.p.Acquire(ctx)
|
||||
if err != nil {
|
||||
p.p.Close()
|
||||
return nil, err
|
||||
}
|
||||
res.Release()
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// ParseConfig builds a Config from connString. It parses connString with the same behavior as pgx.ParseConfig with the
|
||||
// addition of the following variables:
|
||||
//
|
||||
// pool_max_conns: integer greater than 0
|
||||
// pool_max_conn_lifetime: duration string
|
||||
// pool_health_check_period: duration string
|
||||
//
|
||||
// See Config for definitions of these arguments.
|
||||
//
|
||||
// # Example DSN
|
||||
// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
|
||||
//
|
||||
// # Example URL
|
||||
// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
|
||||
func ParseConfig(connString string) (*Config, error) {
|
||||
connConfig, err := pgx.ParseConfig(connString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config := &Config{ConnConfig: connConfig}
|
||||
|
||||
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
|
||||
delete(connConfig.Config.RuntimeParams, "pool_max_conns")
|
||||
n, err := strconv.ParseInt(s, 10, 32)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("cannot parse pool_max_conns: %w", err)
|
||||
}
|
||||
if n < 1 {
|
||||
return nil, errors.Errorf("pool_max_conns too small: %d", n)
|
||||
}
|
||||
config.MaxConns = int32(n)
|
||||
} else {
|
||||
config.MaxConns = defaultMaxConns
|
||||
if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
|
||||
config.MaxConns = numCPU
|
||||
}
|
||||
}
|
||||
|
||||
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
|
||||
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
|
||||
d, err := time.ParseDuration(s)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("invalid pool_max_conn_lifetime: %w", err)
|
||||
}
|
||||
config.MaxConnLifetime = d
|
||||
} else {
|
||||
config.MaxConnLifetime = defaultMaxConnLifetime
|
||||
}
|
||||
|
||||
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
|
||||
delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
|
||||
d, err := time.ParseDuration(s)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("invalid pool_health_check_period: %w", err)
|
||||
}
|
||||
config.HealthCheckPeriod = d
|
||||
} else {
|
||||
config.HealthCheckPeriod = defaultHealthCheckPeriod
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
|
||||
// to pool and closed.
|
||||
func (p *Pool) Close() {
|
||||
close(p.closeChan)
|
||||
p.p.Close()
|
||||
}
|
||||
|
||||
func (p *Pool) backgroundHealthCheck() {
|
||||
ticker := time.NewTicker(p.healthCheckPeriod)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.closeChan:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
p.checkIdleConnsHealth()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) checkIdleConnsHealth() {
|
||||
resources := p.p.AcquireAllIdle()
|
||||
|
||||
now := time.Now()
|
||||
for _, res := range resources {
|
||||
if now.Sub(res.CreationTime()) > p.maxConnLifetime {
|
||||
res.Destroy()
|
||||
} else {
|
||||
res.Release()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
||||
for {
|
||||
res, err := p.p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cr := res.Value().(*connResource)
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
||||
return cr.getConn(p, res), nil
|
||||
}
|
||||
|
||||
res.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
|
||||
// keep-alive functionality. It does not update pool statistics.
|
||||
func (p *Pool) AcquireAllIdle() []*Conn {
|
||||
resources := p.p.AcquireAllIdle()
|
||||
conns := make([]*Conn, 0, len(resources))
|
||||
for _, res := range resources {
|
||||
cr := res.Value().(*connResource)
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
||||
conns = append(conns, cr.getConn(p, res))
|
||||
} else {
|
||||
res.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
return conns
|
||||
}
|
||||
|
||||
func (p *Pool) Stat() *Stat {
|
||||
return &Stat{s: p.p.Stat()}
|
||||
}
|
||||
|
||||
func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.Release()
|
||||
|
||||
return c.Exec(ctx, sql, arguments...)
|
||||
}
|
||||
|
||||
func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return errRows{err: err}, err
|
||||
}
|
||||
|
||||
rows, err := c.Query(ctx, sql, args...)
|
||||
if err != nil {
|
||||
c.Release()
|
||||
return errRows{err: err}, err
|
||||
}
|
||||
|
||||
return c.getPoolRows(rows), nil
|
||||
}
|
||||
|
||||
func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return errRow{err: err}
|
||||
}
|
||||
|
||||
row := c.QueryRow(ctx, sql, args...)
|
||||
return c.getPoolRow(row)
|
||||
}
|
||||
|
||||
func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return errBatchResults{err: err}
|
||||
}
|
||||
|
||||
br := c.SendBatch(ctx, b)
|
||||
return &poolBatchResults{br: br, c: c}
|
||||
}
|
||||
|
||||
func (p *Pool) Begin(ctx context.Context, txOptions *pgx.TxOptions) (*Tx, error) {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t, err := c.Begin(ctx, txOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Tx{t: t, c: c}, err
|
||||
}
|
||||
|
||||
func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
|
||||
c, err := p.Acquire(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer c.Release()
|
||||
|
||||
return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
|
||||
}
|
||||
@@ -0,0 +1,455 @@
|
||||
package pgxpool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
pool.Close()
|
||||
}
|
||||
|
||||
func TestParseConfigExtractsPoolArguments(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig("pool_max_conns=42")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 42, config.MaxConns)
|
||||
assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
|
||||
}
|
||||
|
||||
func TestConnectCancel(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||
assert.Nil(t, pool)
|
||||
assert.Equal(t, context.Canceled, err)
|
||||
}
|
||||
|
||||
func TestPoolAcquireAndConnRelease(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
c.Release()
|
||||
}
|
||||
|
||||
func TestPoolAfterConnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
|
||||
config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
|
||||
_, err := c.Prepare(ctx, "ps1", "select 1")
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
var n int32
|
||||
err = db.QueryRow(context.Background(), "ps1").Scan(&n)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 1, n)
|
||||
}
|
||||
|
||||
func TestPoolBeforeAcquire(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
|
||||
acquireAttempts := 0
|
||||
|
||||
config.BeforeAcquire = func(c *pgx.Conn) bool {
|
||||
acquireAttempts += 1
|
||||
return acquireAttempts%2 == 0
|
||||
}
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
conns := make([]*pgxpool.Conn, 4)
|
||||
for i := range conns {
|
||||
conns[i], err = db.Acquire(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, c := range conns {
|
||||
c.Release()
|
||||
}
|
||||
waitForReleaseToComplete()
|
||||
|
||||
assert.EqualValues(t, 8, acquireAttempts)
|
||||
|
||||
conns = db.AcquireAllIdle()
|
||||
assert.Len(t, conns, 2)
|
||||
|
||||
for _, c := range conns {
|
||||
c.Release()
|
||||
}
|
||||
waitForReleaseToComplete()
|
||||
|
||||
assert.EqualValues(t, 12, acquireAttempts)
|
||||
}
|
||||
|
||||
func TestPoolAfterRelease(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
|
||||
afterReleaseCount := 0
|
||||
|
||||
config.AfterRelease = func(c *pgx.Conn) bool {
|
||||
afterReleaseCount += 1
|
||||
return afterReleaseCount%2 == 1
|
||||
}
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
connPIDs := map[uint32]struct{}{}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
conn, err := db.Acquire(context.Background())
|
||||
assert.NoError(t, err)
|
||||
connPIDs[conn.Conn().PgConn().PID()] = struct{}{}
|
||||
conn.Release()
|
||||
waitForReleaseToComplete()
|
||||
}
|
||||
|
||||
assert.EqualValues(t, 5, len(connPIDs))
|
||||
}
|
||||
|
||||
func TestPoolAcquireAllIdle(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
conns := db.AcquireAllIdle()
|
||||
assert.Len(t, conns, 1)
|
||||
|
||||
for _, c := range conns {
|
||||
c.Release()
|
||||
}
|
||||
waitForReleaseToComplete()
|
||||
|
||||
conns = make([]*pgxpool.Conn, 3)
|
||||
for i := range conns {
|
||||
conns[i], err = db.Acquire(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, c := range conns {
|
||||
if c != nil {
|
||||
c.Release()
|
||||
}
|
||||
}
|
||||
waitForReleaseToComplete()
|
||||
|
||||
conns = db.AcquireAllIdle()
|
||||
assert.Len(t, conns, 3)
|
||||
|
||||
for _, c := range conns {
|
||||
c.Release()
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
|
||||
config.MaxConnLifetime = 250 * time.Millisecond
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
defer db.Close()
|
||||
|
||||
c, err := db.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(config.MaxConnLifetime)
|
||||
|
||||
c.Release()
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats := db.Stat()
|
||||
assert.EqualValues(t, 0, stats.TotalConns())
|
||||
}
|
||||
|
||||
func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
|
||||
config.MaxConnLifetime = 100 * time.Millisecond
|
||||
config.HealthCheckPeriod = 100 * time.Millisecond
|
||||
|
||||
db, err := pgxpool.ConnectConfig(context.Background(), config)
|
||||
defer db.Close()
|
||||
|
||||
c, err := db.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
c.Release()
|
||||
time.Sleep(config.MaxConnLifetime + 50*time.Millisecond)
|
||||
|
||||
stats := db.Stat()
|
||||
assert.EqualValues(t, 0, stats.TotalConns())
|
||||
}
|
||||
|
||||
func TestPoolExec(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
testExec(t, pool)
|
||||
}
|
||||
|
||||
func TestPoolQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
// Test common usage
|
||||
testQuery(t, pool)
|
||||
waitForReleaseToComplete()
|
||||
|
||||
// Test expected pool behavior
|
||||
rows, err := pool.Query(context.Background(), "select generate_series(1,$1)", 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
stats := pool.Stat()
|
||||
assert.EqualValues(t, 1, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
|
||||
rows.Close()
|
||||
assert.NoError(t, rows.Err())
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats = pool.Stat()
|
||||
assert.EqualValues(t, 0, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
|
||||
}
|
||||
|
||||
func TestPoolQueryRow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
testQueryRow(t, pool)
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats := pool.Stat()
|
||||
assert.EqualValues(t, 0, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
}
|
||||
|
||||
func TestPoolSendBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
testSendBatch(t, pool)
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats := pool.Stat()
|
||||
assert.EqualValues(t, 0, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
}
|
||||
|
||||
func TestPoolCopyFrom(t *testing.T) {
|
||||
// Not able to use testCopyFrom because it relies on temporary tables and the pool may run subsequent calls under
|
||||
// different connections.
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
_, err = pool.Exec(ctx, `drop table if exists poolcopyfromtest`)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pool.Exec(ctx, `create table poolcopyfromtest(a int2, b int4, c int8, d varchar, e text, f date, g timestamptz)`)
|
||||
require.NoError(t, err)
|
||||
defer pool.Exec(ctx, `drop table poolcopyfromtest`)
|
||||
|
||||
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
|
||||
|
||||
inputRows := [][]interface{}{
|
||||
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
|
||||
{nil, nil, nil, nil, nil, nil, nil},
|
||||
}
|
||||
|
||||
copyCount, err := pool.CopyFrom(ctx, pgx.Identifier{"poolcopyfromtest"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, len(inputRows), copyCount)
|
||||
|
||||
rows, err := pool.Query(ctx, "select * from poolcopyfromtest")
|
||||
assert.NoError(t, err)
|
||||
|
||||
var outputRows [][]interface{}
|
||||
for rows.Next() {
|
||||
row, err := rows.Values()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
||||
}
|
||||
outputRows = append(outputRows, row)
|
||||
}
|
||||
|
||||
assert.NoError(t, rows.Err())
|
||||
assert.Equal(t, inputRows, outputRows)
|
||||
}
|
||||
|
||||
func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
pid := c.Conn().PgConn().PID()
|
||||
|
||||
assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
_, err = c.Exec(ctx, "begin")
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
_, err = c.Exec(ctx, "selct")
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.Equal(t, byte('E'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
c.Release()
|
||||
waitForReleaseToComplete()
|
||||
|
||||
c, err = pool.Acquire(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, pid, c.Conn().PgConn().PID())
|
||||
assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
c.Release()
|
||||
}
|
||||
|
||||
func TestConnReleaseClosesConnInTransaction(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
pid := c.Conn().PgConn().PID()
|
||||
|
||||
assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
_, err = c.Exec(ctx, "begin")
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, byte('T'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
c.Release()
|
||||
waitForReleaseToComplete()
|
||||
|
||||
c, err = pool.Acquire(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, pid, c.Conn().PgConn().PID())
|
||||
assert.Equal(t, byte('I'), c.Conn().PgConn().TxStatus)
|
||||
|
||||
c.Release()
|
||||
}
|
||||
|
||||
func TestConnReleaseDestroysClosedConn(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
c, err := pool.Acquire(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
c.Conn().Close(ctx)
|
||||
|
||||
assert.EqualValues(t, 1, pool.Stat().TotalConns())
|
||||
|
||||
c.Release()
|
||||
waitForReleaseToComplete()
|
||||
|
||||
assert.EqualValues(t, 0, pool.Stat().TotalConns())
|
||||
}
|
||||
|
||||
func TestConnPoolQueryConcurrentLoad(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
n := 100
|
||||
done := make(chan bool)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
defer func() { done <- true }()
|
||||
testQuery(t, pool)
|
||||
testQueryRow(t, pool)
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgproto3/v2"
|
||||
"github.com/jackc/pgx/v4"
|
||||
)
|
||||
|
||||
type errRows struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (errRows) Close() {}
|
||||
func (e errRows) Err() error { return e.err }
|
||||
func (errRows) FieldDescriptions() []pgproto3.FieldDescription { return nil }
|
||||
func (errRows) Next() bool { return false }
|
||||
func (e errRows) Scan(dest ...interface{}) error { return e.err }
|
||||
func (e errRows) Values() ([]interface{}, error) { return nil, e.err }
|
||||
|
||||
type errRow struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e errRow) Scan(dest ...interface{}) error { return e.err }
|
||||
|
||||
type poolRows struct {
|
||||
r pgx.Rows
|
||||
c *Conn
|
||||
err error
|
||||
}
|
||||
|
||||
func (rows *poolRows) Close() {
|
||||
rows.r.Close()
|
||||
if rows.c != nil {
|
||||
rows.c.Release()
|
||||
rows.c = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (rows *poolRows) Err() error {
|
||||
if rows.err != nil {
|
||||
return rows.err
|
||||
}
|
||||
return rows.r.Err()
|
||||
}
|
||||
|
||||
func (rows *poolRows) FieldDescriptions() []pgproto3.FieldDescription {
|
||||
return rows.r.FieldDescriptions()
|
||||
}
|
||||
|
||||
func (rows *poolRows) Next() bool {
|
||||
if rows.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
n := rows.r.Next()
|
||||
if !n {
|
||||
rows.Close()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (rows *poolRows) Scan(dest ...interface{}) error {
|
||||
err := rows.r.Scan(dest...)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (rows *poolRows) Values() ([]interface{}, error) {
|
||||
values, err := rows.r.Values()
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
}
|
||||
return values, err
|
||||
}
|
||||
|
||||
type poolRow struct {
|
||||
r pgx.Row
|
||||
c *Conn
|
||||
err error
|
||||
}
|
||||
|
||||
func (row *poolRow) Scan(dest ...interface{}) error {
|
||||
if row.err != nil {
|
||||
return row.err
|
||||
}
|
||||
|
||||
err := row.r.Scan(dest...)
|
||||
if row.c != nil {
|
||||
row.c.Release()
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/jackc/puddle"
|
||||
)
|
||||
|
||||
type Stat struct {
|
||||
s *puddle.Stat
|
||||
}
|
||||
|
||||
func (s *Stat) AcquireCount() int64 {
|
||||
return s.s.AcquireCount()
|
||||
}
|
||||
|
||||
func (s *Stat) AcquireDuration() time.Duration {
|
||||
return s.s.AcquireDuration()
|
||||
}
|
||||
|
||||
func (s *Stat) AcquiredConns() int32 {
|
||||
return s.s.AcquiredResources()
|
||||
}
|
||||
|
||||
func (s *Stat) CanceledAcquireCount() int64 {
|
||||
return s.s.CanceledAcquireCount()
|
||||
}
|
||||
|
||||
func (s *Stat) ConstructingConns() int32 {
|
||||
return s.s.ConstructingResources()
|
||||
}
|
||||
|
||||
func (s *Stat) EmptyAcquireCount() int64 {
|
||||
return s.s.EmptyAcquireCount()
|
||||
}
|
||||
|
||||
func (s *Stat) IdleConns() int32 {
|
||||
return s.s.IdleResources()
|
||||
}
|
||||
|
||||
func (s *Stat) MaxConns() int32 {
|
||||
return s.s.MaxResources()
|
||||
}
|
||||
|
||||
func (s *Stat) TotalConns() int32 {
|
||||
return s.s.TotalResources()
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package pgxpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
)
|
||||
|
||||
type Tx struct {
|
||||
t *pgx.Tx
|
||||
c *Conn
|
||||
}
|
||||
|
||||
func (tx *Tx) Commit(ctx context.Context) error {
|
||||
err := tx.t.Commit(ctx)
|
||||
if tx.c != nil {
|
||||
tx.c.Release()
|
||||
tx.c = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (tx *Tx) Rollback(ctx context.Context) error {
|
||||
err := tx.t.Rollback(ctx)
|
||||
if tx.c != nil {
|
||||
tx.c.Release()
|
||||
tx.c = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (tx *Tx) Err() error {
|
||||
return tx.t.Err()
|
||||
}
|
||||
|
||||
func (tx *Tx) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
||||
return tx.c.Exec(ctx, sql, arguments...)
|
||||
}
|
||||
|
||||
func (tx *Tx) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
|
||||
return tx.c.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (tx *Tx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
|
||||
return tx.c.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (tx *Tx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||
return tx.c.SendBatch(ctx, b)
|
||||
}
|
||||
|
||||
func (tx *Tx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
|
||||
return tx.c.CopyFrom(ctx, tableName, columnNames, rowSrc)
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package pgxpool_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTxExec(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
tx, err := pool.Begin(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
testExec(t, tx)
|
||||
}
|
||||
|
||||
func TestTxQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
tx, err := pool.Begin(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
testQuery(t, tx)
|
||||
}
|
||||
|
||||
func TestTxQueryRow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
tx, err := pool.Begin(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
testQueryRow(t, tx)
|
||||
}
|
||||
|
||||
func TestTxSendBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
tx, err := pool.Begin(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
testSendBatch(t, tx)
|
||||
}
|
||||
|
||||
func TestTxCopyFrom(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
|
||||
tx, err := pool.Begin(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
testCopyFrom(t, tx)
|
||||
}
|
||||
Reference in New Issue
Block a user