Bulk allocate pool Conns
This commit is contained in:
@@ -0,0 +1,69 @@
|
|||||||
|
package pool_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
|
"github.com/jackc/pgx/v4/pool"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkMinimalPreparedSelectBaseline(b *testing.B) {
|
||||||
|
config, err := pool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
|
||||||
|
_, err := c.Prepare(ctx, "ps1", "select $1::int8")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := pool.ConnectConfig(context.Background(), config)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
conn, err := db.Acquire(context.Background())
|
||||||
|
require.NoError(b, err)
|
||||||
|
defer conn.Release()
|
||||||
|
|
||||||
|
var n int64
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
err = conn.QueryRow(context.Background(), "ps1", i).Scan(&n)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != int64(i) {
|
||||||
|
b.Fatalf("expected %d, got %d", i, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkMinimalPreparedSelect(b *testing.B) {
|
||||||
|
config, err := pool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
config.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
|
||||||
|
_, err := c.Prepare(ctx, "ps1", "select $1::int8")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := pool.ConnectConfig(context.Background(), config)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
var n int64
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
err = db.QueryRow(context.Background(), "ps1", i).Scan(&n)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n != int64(i) {
|
||||||
|
b.Fatalf("expected %d, got %d", i, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+24
-2
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgconn"
|
"github.com/jackc/pgconn"
|
||||||
@@ -24,6 +25,9 @@ type Pool struct {
|
|||||||
maxConnLifetime time.Duration
|
maxConnLifetime time.Duration
|
||||||
healthCheckPeriod time.Duration
|
healthCheckPeriod time.Duration
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
|
||||||
|
preallocatedConnsMux sync.Mutex
|
||||||
|
preallocatedConns []Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by
|
// Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by
|
||||||
@@ -212,6 +216,24 @@ func (p *Pool) checkIdleConnsHealth() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Pool) getConn(res *puddle.Resource) *Conn {
|
||||||
|
p.preallocatedConnsMux.Lock()
|
||||||
|
|
||||||
|
if len(p.preallocatedConns) == 0 {
|
||||||
|
p.preallocatedConns = make([]Conn, 128)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &p.preallocatedConns[len(p.preallocatedConns)-1]
|
||||||
|
p.preallocatedConns = p.preallocatedConns[0 : len(p.preallocatedConns)-1]
|
||||||
|
|
||||||
|
p.preallocatedConnsMux.Unlock()
|
||||||
|
|
||||||
|
c.res = res
|
||||||
|
c.p = p
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
||||||
for {
|
for {
|
||||||
res, err := p.p.Acquire(ctx)
|
res, err := p.p.Acquire(ctx)
|
||||||
@@ -220,7 +242,7 @@ func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||||
return &Conn{res: res, p: p}, nil
|
return p.getConn(res), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
res.Destroy()
|
res.Destroy()
|
||||||
@@ -234,7 +256,7 @@ func (p *Pool) AcquireAllIdle() []*Conn {
|
|||||||
conns := make([]*Conn, 0, len(resources))
|
conns := make([]*Conn, 0, len(resources))
|
||||||
for _, res := range resources {
|
for _, res := range resources {
|
||||||
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||||
conns = append(conns, &Conn{res: res, p: p})
|
conns = append(conns, p.getConn(res))
|
||||||
} else {
|
} else {
|
||||||
res.Destroy()
|
res.Destroy()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user