Add pool configuration
MaxConns is only knob at moment
This commit is contained in:
+37
-3
@@ -2,6 +2,8 @@ package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
@@ -9,24 +11,36 @@ import (
|
||||
"github.com/jackc/puddle"
|
||||
)
|
||||
|
||||
const defaultMaxConns = 5
|
||||
|
||||
type Pool struct {
|
||||
p *puddle.Pool
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
MaxConns int32
|
||||
ConnConfig *pgx.ConnConfig
|
||||
}
|
||||
|
||||
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial
|
||||
// connection.
|
||||
func Connect(ctx context.Context, connString string) (*Pool, error) {
|
||||
config, err := ParseConfig(connString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &Pool{}
|
||||
|
||||
maxConnections := 5 // TODO - unhard-code
|
||||
p.p = puddle.NewPool(
|
||||
func(ctx context.Context) (interface{}, error) { return pgx.Connect(ctx, connString) },
|
||||
func(ctx context.Context) (interface{}, error) { return pgx.ConnectConfig(ctx, config.ConnConfig) },
|
||||
func(value interface{}) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
value.(*pgx.Conn).Close(ctx)
|
||||
cancel()
|
||||
},
|
||||
maxConnections)
|
||||
config.MaxConns,
|
||||
)
|
||||
|
||||
// Initially establish one connection
|
||||
res, err := p.p.Acquire(ctx)
|
||||
@@ -39,6 +53,26 @@ func Connect(ctx context.Context, connString string) (*Pool, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func ParseConfig(connString string) (*Config, error) {
|
||||
connConfig, err := pgx.ParseConfig(connString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config := &Config{ConnConfig: connConfig, MaxConns: defaultMaxConns}
|
||||
|
||||
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
|
||||
delete(connConfig.Config.RuntimeParams, "pool_max_conns")
|
||||
n, err := strconv.ParseInt(s, 10, 32)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid pool_max_conns: %v", err)
|
||||
}
|
||||
config.MaxConns = int32(n)
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
|
||||
// to pool and closed.
|
||||
func (p *Pool) Close() {
|
||||
|
||||
+15
-8
@@ -17,6 +17,13 @@ func TestConnect(t *testing.T) {
|
||||
pool.Close()
|
||||
}
|
||||
|
||||
func TestParseConfigExtractsPoolArguments(t *testing.T) {
|
||||
config, err := pool.ParseConfig("pool_max_conns=42")
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 42, config.MaxConns)
|
||||
assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns")
|
||||
}
|
||||
|
||||
func TestConnectCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
@@ -57,16 +64,16 @@ func TestPoolQuery(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
stats := pool.Stat()
|
||||
assert.Equal(t, 1, stats.AcquiredConns())
|
||||
assert.Equal(t, 1, stats.TotalConns())
|
||||
assert.EqualValues(t, 1, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
|
||||
rows.Close()
|
||||
assert.NoError(t, rows.Err())
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats = pool.Stat()
|
||||
assert.Equal(t, 0, stats.AcquiredConns())
|
||||
assert.Equal(t, 1, stats.TotalConns())
|
||||
assert.EqualValues(t, 0, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
|
||||
}
|
||||
|
||||
@@ -79,8 +86,8 @@ func TestPoolQueryRow(t *testing.T) {
|
||||
waitForReleaseToComplete()
|
||||
|
||||
stats := pool.Stat()
|
||||
assert.Equal(t, 0, stats.AcquiredConns())
|
||||
assert.Equal(t, 1, stats.TotalConns())
|
||||
assert.EqualValues(t, 0, stats.AcquiredConns())
|
||||
assert.EqualValues(t, 1, stats.TotalConns())
|
||||
}
|
||||
|
||||
func TestConnReleaseRollsBackFailedTransaction(t *testing.T) {
|
||||
@@ -162,12 +169,12 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) {
|
||||
|
||||
c.Conn().Close(ctx)
|
||||
|
||||
assert.Equal(t, 1, pool.Stat().TotalConns())
|
||||
assert.EqualValues(t, 1, pool.Stat().TotalConns())
|
||||
|
||||
c.Release()
|
||||
waitForReleaseToComplete()
|
||||
|
||||
assert.Equal(t, 0, pool.Stat().TotalConns())
|
||||
assert.EqualValues(t, 0, pool.Stat().TotalConns())
|
||||
}
|
||||
|
||||
func TestConnPoolQueryConcurrentLoad(t *testing.T) {
|
||||
|
||||
+5
-5
@@ -18,7 +18,7 @@ func (s *Stat) AcquireDuration() time.Duration {
|
||||
return s.s.AcquireDuration()
|
||||
}
|
||||
|
||||
func (s *Stat) AcquiredConns() int {
|
||||
func (s *Stat) AcquiredConns() int32 {
|
||||
return s.s.AcquiredResources()
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ func (s *Stat) CanceledAcquireCount() int64 {
|
||||
return s.s.CanceledAcquireCount()
|
||||
}
|
||||
|
||||
func (s *Stat) ConstructingConns() int {
|
||||
func (s *Stat) ConstructingConns() int32 {
|
||||
return s.s.ConstructingResources()
|
||||
}
|
||||
|
||||
@@ -34,14 +34,14 @@ func (s *Stat) EmptyAcquireCount() int64 {
|
||||
return s.s.EmptyAcquireCount()
|
||||
}
|
||||
|
||||
func (s *Stat) IdleConns() int {
|
||||
func (s *Stat) IdleConns() int32 {
|
||||
return s.s.IdleResources()
|
||||
}
|
||||
|
||||
func (s *Stat) MaxConns() int {
|
||||
func (s *Stat) MaxConns() int32 {
|
||||
return s.s.MaxResources()
|
||||
}
|
||||
|
||||
func (s *Stat) TotalConns() int {
|
||||
func (s *Stat) TotalConns() int32 {
|
||||
return s.s.TotalResources()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user