2
0

add prefer-standby target_session_attrs

This commit is contained in:
sergey.bashilov
2022-06-17 14:51:56 +03:00
committed by Jack Christensen
parent 7ddbd74d5e
commit 25935a39b6
4 changed files with 90 additions and 30 deletions
+21 -1
View File
@@ -50,6 +50,8 @@ type Config struct {
// fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs. // fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs.
ValidateConnect ValidateConnectFunc ValidateConnect ValidateConnectFunc
HasPreferStandbyTargetSessionAttr bool
// AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables // AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables
// or prepare statements). If this returns an error the connection attempt fails. // or prepare statements). If this returns an error the connection attempt fails.
AfterConnect AfterConnectFunc AfterConnect AfterConnectFunc
@@ -367,7 +369,10 @@ func ParseConfig(connString string) (*Config, error) {
config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary
case "standby": case "standby":
config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby
case "any", "prefer-standby": case "prefer-standby":
config.ValidateConnect = ValidateConnectTargetSessionAttrsPrefferStandby
config.HasPreferStandbyTargetSessionAttr = true
case "any":
// do nothing // do nothing
default: default:
return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)} return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)}
@@ -810,3 +815,18 @@ func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgCon
return nil return nil
} }
// ValidateConnectTargetSessionAttrsPrimary is an ValidateConnectFunc that implements libpq compatible
// target_session_attrs=prefer-standby.
func ValidateConnectTargetSessionAttrsPrefferStandby(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
if result.Err != nil {
return result.Err
}
if string(result.Rows[0][0]) != "t" {
return &preferStanbyNotFoundError{err: errors.New("server is not in hot standby mode")}
}
return nil
}
+24 -21
View File
@@ -584,13 +584,13 @@ func TestParseConfig(t *testing.T) {
name: "target_session_attrs primary", name: "target_session_attrs primary",
connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=primary", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=primary",
config: &pgconn.Config{ config: &pgconn.Config{
User: "jack", User: "jack",
Password: "secret", Password: "secret",
Host: "localhost", Host: "localhost",
Port: 5432, Port: 5432,
Database: "mydb", Database: "mydb",
TLSConfig: nil, TLSConfig: nil,
RuntimeParams: map[string]string{}, RuntimeParams: map[string]string{},
ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrimary, ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrimary,
}, },
}, },
@@ -598,13 +598,13 @@ func TestParseConfig(t *testing.T) {
name: "target_session_attrs standby", name: "target_session_attrs standby",
connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=standby", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=standby",
config: &pgconn.Config{ config: &pgconn.Config{
User: "jack", User: "jack",
Password: "secret", Password: "secret",
Host: "localhost", Host: "localhost",
Port: 5432, Port: 5432,
Database: "mydb", Database: "mydb",
TLSConfig: nil, TLSConfig: nil,
RuntimeParams: map[string]string{}, RuntimeParams: map[string]string{},
ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsStandby, ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsStandby,
}, },
}, },
@@ -612,13 +612,15 @@ func TestParseConfig(t *testing.T) {
name: "target_session_attrs prefer-standby", name: "target_session_attrs prefer-standby",
connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=prefer-standby", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=prefer-standby",
config: &pgconn.Config{ config: &pgconn.Config{
User: "jack", User: "jack",
Password: "secret", Password: "secret",
Host: "localhost", Host: "localhost",
Port: 5432, Port: 5432,
Database: "mydb", Database: "mydb",
TLSConfig: nil, TLSConfig: nil,
RuntimeParams: map[string]string{}, RuntimeParams: map[string]string{},
ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrefferStandby,
HasPreferStandbyTargetSessionAttr: true,
}, },
}, },
{ {
@@ -783,6 +785,7 @@ func assertConfigsEqual(t *testing.T, expected, actual *pgconn.Config, testName
// Can't test function equality, so just test that they are set or not. // Can't test function equality, so just test that they are set or not.
assert.Equalf(t, expected.ValidateConnect == nil, actual.ValidateConnect == nil, "%s - ValidateConnect", testName) assert.Equalf(t, expected.ValidateConnect == nil, actual.ValidateConnect == nil, "%s - ValidateConnect", testName)
assert.Equalf(t, expected.AfterConnect == nil, actual.AfterConnect == nil, "%s - AfterConnect", testName) assert.Equalf(t, expected.AfterConnect == nil, actual.AfterConnect == nil, "%s - AfterConnect", testName)
assert.Equalf(t, expected.HasPreferStandbyTargetSessionAttr, actual.HasPreferStandbyTargetSessionAttr, "%s - HasPreferStandbyTargetSessionAttr", testName)
if assert.Equalf(t, expected.TLSConfig == nil, actual.TLSConfig == nil, "%s - TLSConfig", testName) { if assert.Equalf(t, expected.TLSConfig == nil, actual.TLSConfig == nil, "%s - TLSConfig", testName) {
if expected.TLSConfig != nil { if expected.TLSConfig != nil {
+17
View File
@@ -219,3 +219,20 @@ func redactURL(u *url.URL) string {
} }
return u.String() return u.String()
} }
type preferStanbyNotFoundError struct {
err error
safeToRetry bool
}
func (e *preferStanbyNotFoundError) Error() string {
return fmt.Sprintf("standby server not found: %s", e.err.Error())
}
func (e *preferStanbyNotFoundError) SafeToRetry() bool {
return e.safeToRetry
}
func (e *preferStanbyNotFoundError) Unwrap() error {
return e.err
}
+28 -8
View File
@@ -148,25 +148,34 @@ func ConnectConfig(ctx context.Context, config *Config) (pgConn *PgConn, err err
return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")} return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
} }
foundBestServer := false
var fallbackConfig *FallbackConfig
for _, fc := range fallbackConfigs { for _, fc := range fallbackConfigs {
pgConn, err = connect(ctx, config, fc) pgConn, err = connect(ctx, config, fc)
if err == nil { if err == nil {
foundBestServer = true
break break
} else if pgerr, ok := err.(*PgError); ok { } else if pgerr, ok := err.(*PgError); ok {
err = &connectError{config: config, msg: "server error", err: pgerr} err = &connectError{config: config, msg: "server error", err: pgerr}
const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password if checkPgError(pgerr) {
const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
if pgerr.Code == ERRCODE_INVALID_PASSWORD ||
pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION ||
pgerr.Code == ERRCODE_INVALID_CATALOG_NAME ||
pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
break break
} }
} else if cerr, ok := err.(*connectError); ok && config.HasPreferStandbyTargetSessionAttr {
if _, ok := cerr.err.(*preferStanbyNotFoundError); ok {
fallbackConfig = fc
}
} }
} }
if !foundBestServer && fallbackConfig != nil {
config.ValidateConnect = nil
pgConn, err = connect(ctx, config, fallbackConfig)
if pgerr, ok := err.(*PgError); ok {
err = &connectError{config: config, msg: "server error", err: pgerr}
}
config.ValidateConnect = ValidateConnectTargetSessionAttrsPrefferStandby
}
if err != nil { if err != nil {
return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError
} }
@@ -182,6 +191,17 @@ func ConnectConfig(ctx context.Context, config *Config) (pgConn *PgConn, err err
return pgConn, nil return pgConn, nil
} }
func checkPgError(pgerr *PgError) bool {
const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password
const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
return pgerr.Code == ERRCODE_INVALID_PASSWORD ||
pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION ||
pgerr.Code == ERRCODE_INVALID_CATALOG_NAME ||
pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE
}
func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) { func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) {
var configs []*FallbackConfig var configs []*FallbackConfig