From 7bbb1c7307a19afe92b4b0eae8d940253798567c Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Sun, 4 Dec 2016 21:35:22 -0800 Subject: [PATCH 01/16] Add basic logical replication protocol support --- README.md | 8 ++ conn.go | 6 + replication.go | 272 ++++++++++++++++++++++++++++++++++++++++++++ replication_test.go | 164 ++++++++++++++++++++++++++ 4 files changed, 450 insertions(+) create mode 100644 replication.go create mode 100644 replication_test.go diff --git a/README.md b/README.md index ccbd1dc1..51a01339 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Pgx supports many additional features beyond what is available through database/ * Large object support * Null mapping to Null* struct or pointer to pointer. * Supports database/sql.Scanner and database/sql/driver.Valuer interfaces for custom types +* Logical replication connections, including receiving WAL and sending standby status updates ## Performance @@ -68,6 +69,7 @@ Then run the following SQL: create user pgx_md5 password 'secret'; create user " tricky, ' } "" \ test user " password 'secret'; create database pgx_test; + create user pgx_replication with replication password 'secret'; Connect to database pgx_test and run: @@ -100,6 +102,12 @@ If you are developing on Windows with TCP connections: host pgx_test pgx_pw 127.0.0.1/32 password host pgx_test pgx_md5 127.0.0.1/32 md5 +For replication testing, add the following to your postgresql.conf: + + wal_level='logical' + max_wal_senders=5 + max_replication_slots=5 + ## Version Policy pgx follows semantic versioning for the documented public API on stable releases. Branch ```v2``` is the latest stable release. ```master``` can contain new features or behavior that will change or be removed before being merged to the stable ```v2``` branch (in practice, this occurs very rarely). diff --git a/conn.go b/conn.go index 14d89897..602ecbff 100644 --- a/conn.go +++ b/conn.go @@ -305,6 +305,12 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl c.log(LogLevelInfo, "Connection established") } + // Replication connections can't execute the queries to + // populate the c.PgTypes and c.pgsqlAfInet + if _, ok := msg.options["replication"]; ok { + return nil + } + if c.PgTypes == nil { err = c.loadPgTypes() if err != nil { diff --git a/replication.go b/replication.go new file mode 100644 index 00000000..a107274b --- /dev/null +++ b/replication.go @@ -0,0 +1,272 @@ +package pgx + +import ( + "errors" + "fmt" + "net" + "time" +) + +const ( + copyBothResponse = 'W' + walData = 'w' + senderKeepalive = 'k' + standbyStatusUpdate = 'r' +) + +var epochNano int64 + +func init() { + epochNano = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano() +} + +// Format the given 64bit LSN value into the XXX/XXX format, +// which is the format reported by postgres. +func FormatLsn(lsn int64) string { + return fmt.Sprintf("%X/%X", lsn>>32, int32(lsn)) +} + +// Parse the given XXX/XXX format LSN as reported by postgres, +// into a 64 bit integer as used internally by the wire procotols +func ParseLsn(lsn string) (outputLsn int64, err error) { + var upperHalf int64 + var lowerHalf int64 + var nparsed int + nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf) + if err != nil { + return + } + + if nparsed != 2 { + err = errors.New(fmt.Sprintf("Failed to parsed LSN: %s", lsn)) + return + } + + outputLsn = (upperHalf << 32) + lowerHalf + return +} + +// The WAL message contains WAL payload entry data +type WalMessage struct { + // The WAL start position of this data. This + // is the WAL position we need to track. + WalStart int64 + // The server wal end and server time are + // documented to track the end position and current + // time of the server, both of which appear to be + // unimplemented in pg 9.5. + ServerWalEnd int64 + ServerTime int64 + // The WAL data is the raw unparsed binary WAL entry. + // The contents of this are determined by the output + // logical encoding plugin. + WalData []byte +} + +func (w *WalMessage) Time() time.Time { + return time.Unix(0, (w.ServerTime*1000)+epochNano) +} + +func (w *WalMessage) ByteLag() int64 { + return (w.ServerWalEnd - w.WalStart) +} + +func (w *WalMessage) String() string { + return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLsn(w.WalStart), w.Time(), w.ByteLag()) +} + +// The server heartbeat is sent periodically from the server, +// including server status, and a reply request field +type ServerHeartbeat struct { + // The current max wal position on the server, + // used for lag tracking + ServerWalEnd int64 + // The server time, in microseconds since jan 1 2000 + ServerTime int64 + // If 1, the server is requesting a standby status message + // to be sent immediately. + ReplyRequested byte +} + +func (s *ServerHeartbeat) Time() time.Time { + return time.Unix(0, (s.ServerTime*1000)+epochNano) +} + +func (s *ServerHeartbeat) String() string { + return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLsn(s.ServerWalEnd), s.ReplyRequested, s.Time()) +} + +// The replication message wraps all possible messages from the +// server received during replication. At most one of the wal message +// or server heartbeat will be non-nil +type ReplicationMessage struct { + WalMessage *WalMessage + ServerHeartbeat *ServerHeartbeat +} + +// The standby status is the client side heartbeat sent to the postgresql +// server to track the client wal positions. For practical purposes, +// all wal positions are typically set to the same value. +type StandbyStatus struct { + // The WAL position that's been locally written + WalWritePosition int64 + // The WAL position that's been locally flushed + WalFlushPosition int64 + // The WAL position that's been locally applied + WalApplyPosition int64 + // The client time in microseconds since jan 1 2000 + ClientTime int64 + // If 1, requests the server to immediately send a + // server heartbeat + ReplyRequested byte +} + +// Create a standby status struct, which sets all the WAL positions +// to the given wal position, and the client time to the current time. +func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { + status = new(StandbyStatus) + status.WalFlushPosition = walPosition + status.WalApplyPosition = walPosition + status.WalWritePosition = walPosition + status.ClientTime = (time.Now().UnixNano() - epochNano) / 1000 + return +} + +// Send standby status to the server, which both acts as a keepalive +// message to the server, as well as carries the WAL position of the +// client, which then updates the server's replication slot position. +func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { + writeBuf := newWriteBuf(c, copyData) + writeBuf.WriteByte(standbyStatusUpdate) + writeBuf.WriteInt64(k.WalWritePosition) + writeBuf.WriteInt64(k.WalFlushPosition) + writeBuf.WriteInt64(k.WalApplyPosition) + writeBuf.WriteInt64(k.ClientTime) + writeBuf.WriteByte(k.ReplyRequested) + + writeBuf.closeMsg() + + _, err = c.conn.Write(writeBuf.buf) + if err != nil { + fmt.Printf("Error sending standby status %v\n", err) + c.die(err) + } + fmt.Printf("Write complete, wal position is %s\n", FormatLsn(k.WalApplyPosition)) + + return +} + +func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { + var t byte + var reader *msgReader + t, reader, err = c.rxMsg() + if err != nil { + return + } + + switch t { + case noticeResponse: + pgError := c.rxErrorResponse(reader) + if c.shouldLog(LogLevelInfo) { + c.log(LogLevelInfo, pgError.Error()) + } + case errorResponse: + err = c.rxErrorResponse(reader) + if c.shouldLog(LogLevelError) { + c.log(LogLevelError, err.Error()) + } + return + case copyBothResponse: + // This is the tail end of the replication process start, + // and can be safely ignored + return + case copyData: + var msgType byte + msgType = reader.readByte() + switch msgType { + case walData: + walStart := reader.readInt64() + serverWalEnd := reader.readInt64() + serverTime := reader.readInt64() + walData := reader.readBytes(reader.msgBytesRemaining) + walMessage := WalMessage{WalStart: walStart, + ServerWalEnd: serverWalEnd, + ServerTime: serverTime, + WalData: walData, + } + + return &ReplicationMessage{WalMessage: &walMessage}, nil + case senderKeepalive: + serverWalEnd := reader.readInt64() + serverTime := reader.readInt64() + replyNow := reader.readByte() + h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} + return &ReplicationMessage{ServerHeartbeat: h}, nil + } + } + return +} + +// Wait for a single replication message up to timeout time. +// +// Properly using this requires some knowledge of the postgres replication mechanisms, +// as the client can receive both WAL data (the ultimate payload) and server heartbeat +// updates. The caller also must send standby status updates in order to keep the connection +// alive and working. +// +// There is also a condition (during startup) which can cause both the replication message +// to return as nil as well as the error, which is a normal part of the replication protocol +// startup. It's important the client correctly handle (ignore) this scenario. +func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { + var zeroTime time.Time + + deadline := time.Now().Add(timeout) + + // Use SetReadDeadline to implement the timeout. SetReadDeadline will + // cause operations to fail with a *net.OpError that has a Timeout() + // of true. Because the normal pgx rxMsg path considers any error to + // have potentially corrupted the state of the connection, it dies + // on any errors. So to avoid timeout errors in rxMsg we set the + // deadline and peek into the reader. If a timeout error occurs there + // we don't break the pgx connection. If the Peek returns that data + // is available then we turn off the read deadline before the rxMsg. + err = c.conn.SetReadDeadline(deadline) + if err != nil { + return nil, err + } + + // Wait until there is a byte available before continuing onto the normal msg reading path + _, err = c.reader.Peek(1) + if err != nil { + c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline + if err, ok := err.(*net.OpError); ok && err.Timeout() { + return nil, ErrNotificationTimeout + } + return nil, err + } + + err = c.conn.SetReadDeadline(zeroTime) + if err != nil { + return nil, err + } + + return c.readReplicationMessage() +} + +// Start a replication connection, sending WAL data to the given replication +// receiver. The sql string here should be a "START_REPLICATION" command, as +// per the postgresql docs here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// A typical query would look like: +// START_REPLICATION SLOT t LOGICAL test_decoder 0/0 +// +// Once started, the client needs to invoke WaitForReplicationMessage() in order +// to fetch the WAL and standby status. Also, it is the responsibility of the caller +// to periodically send StandbyStatus messages to update the replication slot position. +func (c *Conn) StartReplication(sql string, arguments ...interface{}) (err error) { + if err = c.sendQuery(sql, arguments...); err != nil { + return + } + return +} diff --git a/replication_test.go b/replication_test.go new file mode 100644 index 00000000..4c76deaf --- /dev/null +++ b/replication_test.go @@ -0,0 +1,164 @@ +package pgx_test + +import ( + "github.com/jackc/pgx" + "strconv" + "strings" + "testing" + "time" +) + +// This function uses a postgresql 9.6 specific column +func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string { + // Fetch the restart LSN of the slot, to establish a starting point + rows, err := conn.Query("select confirmed_flush_lsn from pg_replication_slots where slot_name='pgx_test'") + if err != nil { + t.Fatalf("conn.Query failed: %v", err) + } + defer rows.Close() + + var restartLsn string + for rows.Next() { + rows.Scan(&restartLsn) + } + return restartLsn +} + +// This battleship test (at least somewhat by necessity) does +// several things all at once in a single run. It: +// - Establishes a replication connection & slot +// - Does a series of operations to create some known WAL entries +// - Replicates the entries down, and checks that the rows it +// created come down in order +// - Sends a standby status message to update the server with the +// wal position of the slot +// - Checks the wal position of the slot on the server to make sure +// the update succeeded +func TestSimpleReplicationConnection(t *testing.T) { + t.Parallel() + + var err error + var replicationUserConfig pgx.ConnConfig + var replicationConnConfig pgx.ConnConfig + + replicationUserConfig = *defaultConnConfig + replicationUserConfig.User = "pgx_replication" + conn := mustConnect(t, replicationUserConfig) + defer closeConn(t, conn) + + replicationConnConfig = *defaultConnConfig + replicationConnConfig.User = "pgx_replication" + replicationConnConfig.RuntimeParams = make(map[string]string) + replicationConnConfig.RuntimeParams["replication"] = "database" + + replicationConn := mustConnect(t, replicationConnConfig) + defer closeConn(t, replicationConn) + + _, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + + // Do a simple change so we can get some wal data + _, err = conn.Exec("create table if not exists replication_test (a integer)") + if err != nil { + t.Fatalf("Failed to create table: %v", err) + } + + err = replicationConn.StartReplication("START_REPLICATION SLOT pgx_test LOGICAL 0/0") + if err != nil { + t.Fatalf("Failed to start replication: %v", err) + } + + var i int32 + var insertedTimes []int64 + for i < 5 { + var ct pgx.CommandTag + currentTime := time.Now().Unix() + insertedTimes = append(insertedTimes, currentTime) + ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + t.Logf("Inserted %d rows", ct.RowsAffected()) + i++ + } + + i = 0 + var foundTimes []int64 + var foundCount int + var maxWal int64 + for { + var message *pgx.ReplicationMessage + + message, err = replicationConn.WaitForReplicationMessage(time.Duration(1 * time.Second)) + if message != nil { + if message.WalMessage != nil { + // The waldata payload with the test_decoding plugin looks like: + // public.replication_test: INSERT: a[integer]:2 + // What we wanna do here is check that once we find one of our inserted times, + // that they occur in the wal stream in the order we executed them. + walString := string(message.WalMessage.WalData) + if strings.Contains(walString, "public.replication_test: INSERT") { + stringParts := strings.Split(walString, ":") + offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64) + if err != nil { + t.Fatalf("Failed to parse walString %s", walString) + } + if foundCount > 0 || offset == insertedTimes[0] { + foundTimes = append(foundTimes, offset) + foundCount++ + } + } + if message.WalMessage.WalStart > maxWal { + maxWal = message.WalMessage.WalStart + } + + } + if message.ServerHeartbeat != nil { + t.Logf("Got heartbeat: %s", message.ServerHeartbeat) + } + } else { + t.Log("Timed out waiting for wal message") + i++ + } + if i > 3 { + t.Log("Actual timeout") + break + } + } + + if foundCount != len(insertedTimes) { + t.Fatalf("Failed to find all inserted time values in WAL stream (found %d expected %d)", foundCount, len(insertedTimes)) + } + + for i := range insertedTimes { + if foundTimes[i] != insertedTimes[i] { + t.Fatalf("Found %d expected %d", foundTimes[i], insertedTimes[i]) + } + } + + t.Logf("Found %d times, as expected", len(foundTimes)) + + // Before closing our connection, let's send a standby status to update our wal + // position, which should then be reflected if we fetch out our current wal position + // for the slot + replicationConn.SendStandbyStatus(pgx.NewStandbyStatus(maxWal)) + + err = replicationConn.Close() + if err != nil { + t.Fatalf("Replication connection close failed: %v", err) + } + + restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") + integerRestartLsn, _ := pgx.ParseLsn(restartLsn) + if integerRestartLsn != maxWal { + t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLsn(maxWal), restartLsn) + } + + _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + +} From edbd30ea6ad35cb4df53f845cd403d4fb706bc79 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Tue, 6 Dec 2016 15:44:37 -0800 Subject: [PATCH 02/16] Add replication stop mechanism --- replication.go | 20 ++++++++++++++++++-- replication_test.go | 7 +++++++ stdlib/sql_test.go | 4 ++-- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/replication.go b/replication.go index a107274b..66860787 100644 --- a/replication.go +++ b/replication.go @@ -148,14 +148,27 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { _, err = c.conn.Write(writeBuf.buf) if err != nil { - fmt.Printf("Error sending standby status %v\n", err) c.die(err) } - fmt.Printf("Write complete, wal position is %s\n", FormatLsn(k.WalApplyPosition)) return } +// Send the message to formally stop the replication stream. This +// is done before calling Close() during a clean shutdown. +func (c *Conn) StopReplication() (err error) { + writeBuf := newWriteBuf(c, copyDone) + + writeBuf.closeMsg() + + _, err = c.conn.Write(writeBuf.buf) + if err != nil { + c.die(err) + } + return +} + + func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte var reader *msgReader @@ -217,6 +230,9 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // There is also a condition (during startup) which can cause both the replication message // to return as nil as well as the error, which is a normal part of the replication protocol // startup. It's important the client correctly handle (ignore) this scenario. +// +// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified +// duration. func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { var zeroTime time.Time diff --git a/replication_test.go b/replication_test.go index 4c76deaf..60119b14 100644 --- a/replication_test.go +++ b/replication_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" "time" + "reflect" ) // This function uses a postgresql 9.6 specific column @@ -92,6 +93,11 @@ func TestSimpleReplicationConnection(t *testing.T) { var message *pgx.ReplicationMessage message, err = replicationConn.WaitForReplicationMessage(time.Duration(1 * time.Second)) + if err != nil { + if err != pgx.ErrNotificationTimeout { + t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err)) + } + } if message != nil { if message.WalMessage != nil { // The waldata payload with the test_decoding plugin looks like: @@ -144,6 +150,7 @@ func TestSimpleReplicationConnection(t *testing.T) { // position, which should then be reflected if we fetch out our current wal position // for the slot replicationConn.SendStandbyStatus(pgx.NewStandbyStatus(maxWal)) + replicationConn.StopReplication() err = replicationConn.Close() if err != nil { diff --git a/stdlib/sql_test.go b/stdlib/sql_test.go index 602a9171..5a5f7049 100644 --- a/stdlib/sql_test.go +++ b/stdlib/sql_test.go @@ -30,7 +30,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) { rows, err := db.Query("select generate_series(1,$1)", 10) if err != nil { - t.Fatalf("db.Query failed: ", err) + t.Fatalf("db.Query failed: %v", err) } defer rows.Close() @@ -42,7 +42,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) { } if rows.Err() != nil { - t.Fatalf("db.Query failed: ", err) + t.Fatalf("db.Query failed: %v", err) } if rowCount != 10 { From 14497e4c6540b415633f1da7e465327a246625e5 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Wed, 7 Dec 2016 21:19:58 -0800 Subject: [PATCH 03/16] Capitalization --- replication.go | 10 +++++----- replication_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/replication.go b/replication.go index 66860787..5586209c 100644 --- a/replication.go +++ b/replication.go @@ -22,13 +22,13 @@ func init() { // Format the given 64bit LSN value into the XXX/XXX format, // which is the format reported by postgres. -func FormatLsn(lsn int64) string { - return fmt.Sprintf("%X/%X", lsn>>32, int32(lsn)) +func FormatLSN(lsn int64) string { + return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn)) } // Parse the given XXX/XXX format LSN as reported by postgres, // into a 64 bit integer as used internally by the wire procotols -func ParseLsn(lsn string) (outputLsn int64, err error) { +func ParseLSN(lsn string) (outputLsn int64, err error) { var upperHalf int64 var lowerHalf int64 var nparsed int @@ -72,7 +72,7 @@ func (w *WalMessage) ByteLag() int64 { } func (w *WalMessage) String() string { - return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLsn(w.WalStart), w.Time(), w.ByteLag()) + return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLSN(w.WalStart), w.Time(), w.ByteLag()) } // The server heartbeat is sent periodically from the server, @@ -93,7 +93,7 @@ func (s *ServerHeartbeat) Time() time.Time { } func (s *ServerHeartbeat) String() string { - return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLsn(s.ServerWalEnd), s.ReplyRequested, s.Time()) + return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLSN(s.ServerWalEnd), s.ReplyRequested, s.Time()) } // The replication message wraps all possible messages from the diff --git a/replication_test.go b/replication_test.go index 60119b14..08affdf1 100644 --- a/replication_test.go +++ b/replication_test.go @@ -158,9 +158,9 @@ func TestSimpleReplicationConnection(t *testing.T) { } restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") - integerRestartLsn, _ := pgx.ParseLsn(restartLsn) + integerRestartLsn, _ := pgx.ParseLSN(restartLsn) if integerRestartLsn != maxWal { - t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLsn(maxWal), restartLsn) + t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") From 2b096a7d08e148190097a6bf145ada8f4687684f Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 8 Dec 2016 15:26:44 -0800 Subject: [PATCH 04/16] It should all be unsigned. --- replication.go | 52 ++++++++++++++++++++++----------------------- replication_test.go | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/replication.go b/replication.go index 5586209c..16755f0e 100644 --- a/replication.go +++ b/replication.go @@ -22,15 +22,15 @@ func init() { // Format the given 64bit LSN value into the XXX/XXX format, // which is the format reported by postgres. -func FormatLSN(lsn int64) string { +func FormatLSN(lsn uint64) string { return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn)) } // Parse the given XXX/XXX format LSN as reported by postgres, // into a 64 bit integer as used internally by the wire procotols -func ParseLSN(lsn string) (outputLsn int64, err error) { - var upperHalf int64 - var lowerHalf int64 +func ParseLSN(lsn string) (outputLsn uint64, err error) { + var upperHalf uint64 + var lowerHalf uint64 var nparsed int nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf) if err != nil { @@ -50,13 +50,13 @@ func ParseLSN(lsn string) (outputLsn int64, err error) { type WalMessage struct { // The WAL start position of this data. This // is the WAL position we need to track. - WalStart int64 + WalStart uint64 // The server wal end and server time are // documented to track the end position and current // time of the server, both of which appear to be // unimplemented in pg 9.5. - ServerWalEnd int64 - ServerTime int64 + ServerWalEnd uint64 + ServerTime uint64 // The WAL data is the raw unparsed binary WAL entry. // The contents of this are determined by the output // logical encoding plugin. @@ -64,10 +64,10 @@ type WalMessage struct { } func (w *WalMessage) Time() time.Time { - return time.Unix(0, (w.ServerTime*1000)+epochNano) + return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano) } -func (w *WalMessage) ByteLag() int64 { +func (w *WalMessage) ByteLag() uint64 { return (w.ServerWalEnd - w.WalStart) } @@ -80,16 +80,16 @@ func (w *WalMessage) String() string { type ServerHeartbeat struct { // The current max wal position on the server, // used for lag tracking - ServerWalEnd int64 + ServerWalEnd uint64 // The server time, in microseconds since jan 1 2000 - ServerTime int64 + ServerTime uint64 // If 1, the server is requesting a standby status message // to be sent immediately. ReplyRequested byte } func (s *ServerHeartbeat) Time() time.Time { - return time.Unix(0, (s.ServerTime*1000)+epochNano) + return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano) } func (s *ServerHeartbeat) String() string { @@ -109,13 +109,13 @@ type ReplicationMessage struct { // all wal positions are typically set to the same value. type StandbyStatus struct { // The WAL position that's been locally written - WalWritePosition int64 + WalWritePosition uint64 // The WAL position that's been locally flushed - WalFlushPosition int64 + WalFlushPosition uint64 // The WAL position that's been locally applied - WalApplyPosition int64 + WalApplyPosition uint64 // The client time in microseconds since jan 1 2000 - ClientTime int64 + ClientTime uint64 // If 1, requests the server to immediately send a // server heartbeat ReplyRequested byte @@ -123,12 +123,12 @@ type StandbyStatus struct { // Create a standby status struct, which sets all the WAL positions // to the given wal position, and the client time to the current time. -func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { +func NewStandbyStatus(walPosition uint64) (status *StandbyStatus) { status = new(StandbyStatus) status.WalFlushPosition = walPosition status.WalApplyPosition = walPosition status.WalWritePosition = walPosition - status.ClientTime = (time.Now().UnixNano() - epochNano) / 1000 + status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000) return } @@ -138,10 +138,10 @@ func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { writeBuf := newWriteBuf(c, copyData) writeBuf.WriteByte(standbyStatusUpdate) - writeBuf.WriteInt64(k.WalWritePosition) - writeBuf.WriteInt64(k.WalFlushPosition) - writeBuf.WriteInt64(k.WalApplyPosition) - writeBuf.WriteInt64(k.ClientTime) + writeBuf.WriteInt64(int64(k.WalWritePosition)) + writeBuf.WriteInt64(int64(k.WalFlushPosition)) + writeBuf.WriteInt64(int64(k.WalApplyPosition)) + writeBuf.WriteInt64(int64(k.ClientTime)) writeBuf.WriteByte(k.ReplyRequested) writeBuf.closeMsg() @@ -202,9 +202,9 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { serverWalEnd := reader.readInt64() serverTime := reader.readInt64() walData := reader.readBytes(reader.msgBytesRemaining) - walMessage := WalMessage{WalStart: walStart, - ServerWalEnd: serverWalEnd, - ServerTime: serverTime, + walMessage := WalMessage{WalStart: uint64(walStart), + ServerWalEnd: uint64(serverWalEnd), + ServerTime: uint64(serverTime), WalData: walData, } @@ -213,7 +213,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { serverWalEnd := reader.readInt64() serverTime := reader.readInt64() replyNow := reader.readByte() - h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} + h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil } } diff --git a/replication_test.go b/replication_test.go index 08affdf1..1d77982a 100644 --- a/replication_test.go +++ b/replication_test.go @@ -88,7 +88,7 @@ func TestSimpleReplicationConnection(t *testing.T) { i = 0 var foundTimes []int64 var foundCount int - var maxWal int64 + var maxWal uint64 for { var message *pgx.ReplicationMessage From dad2c383af26e4df584955e6060f44833de24d6a Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Tue, 3 Jan 2017 11:49:13 -0800 Subject: [PATCH 05/16] Start replication now wraps the sql and returns errors properly --- replication.go | 51 +++++++++++++++++++++++++++++++++++---------- replication_test.go | 5 +++-- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/replication.go b/replication.go index 16755f0e..fdbdce92 100644 --- a/replication.go +++ b/replication.go @@ -12,6 +12,7 @@ const ( walData = 'w' senderKeepalive = 'k' standbyStatusUpdate = 'r' + initialReplicationResponseTimeout = 5 * time.Second ) var epochNano int64 @@ -215,6 +216,14 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { replyNow := reader.readByte() h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil + default: + if c.shouldLog(LogLevelError) { + c.log(LogLevelError,"Unexpected data playload message type %v", t) + } + } + default: + if c.shouldLog(LogLevelError) { + c.log(LogLevelError,"Unexpected replication message type %v", t) } } return @@ -227,10 +236,6 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // updates. The caller also must send standby status updates in order to keep the connection // alive and working. // -// There is also a condition (during startup) which can cause both the replication message -// to return as nil as well as the error, which is a normal part of the replication protocol -// startup. It's important the client correctly handle (ignore) this scenario. -// // This returns pgx.ErrNotificationTimeout when there is no replication message by the specified // duration. func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { @@ -270,19 +275,43 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM } // Start a replication connection, sending WAL data to the given replication -// receiver. The sql string here should be a "START_REPLICATION" command, as -// per the postgresql docs here: +// receiver. This function wraps a START_REPLICATION command as documented +// here: // https://www.postgresql.org/docs/9.5/static/protocol-replication.html // -// A typical query would look like: -// START_REPLICATION SLOT t LOGICAL test_decoder 0/0 -// // Once started, the client needs to invoke WaitForReplicationMessage() in order // to fetch the WAL and standby status. Also, it is the responsibility of the caller // to periodically send StandbyStatus messages to update the replication slot position. -func (c *Conn) StartReplication(sql string, arguments ...interface{}) (err error) { - if err = c.sendQuery(sql, arguments...); err != nil { +// +// This function assumes that slotName has already been created. In order to omit the timeline argument +// pass a -1 for the timeline to get the server default behavior. +func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { + var queryString string + if timeline >= 0 { + queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline) + } else { + queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn)) + } + + for _, arg := range pluginArguments { + queryString += fmt.Sprintf(" %s", arg) + } + + if err = c.sendQuery(queryString); err != nil { return } + + // The first replication message that comes back here will be (in a success case) + // a empty CopyBoth that is (apparently) sent as the confirmation that the replication has + // started. This call will either return nil, nil or if it returns an error + // that indicates the start replication command failed + var r *ReplicationMessage + r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout) + if err != nil && r != nil { + if c.shouldLog(LogLevelError) { + c.log(LogLevelError, "Unxpected replication message %v", r) + } + } + return } diff --git a/replication_test.go b/replication_test.go index 1d77982a..411b449e 100644 --- a/replication_test.go +++ b/replication_test.go @@ -7,12 +7,13 @@ import ( "testing" "time" "reflect" + "fmt" ) // This function uses a postgresql 9.6 specific column func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string { // Fetch the restart LSN of the slot, to establish a starting point - rows, err := conn.Query("select confirmed_flush_lsn from pg_replication_slots where slot_name='pgx_test'") + rows, err := conn.Query(fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot)) if err != nil { t.Fatalf("conn.Query failed: %v", err) } @@ -66,7 +67,7 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Failed to create table: %v", err) } - err = replicationConn.StartReplication("START_REPLICATION SLOT pgx_test LOGICAL 0/0") + err = replicationConn.StartReplication("pgx_test", 0, -1) if err != nil { t.Fatalf("Failed to start replication: %v", err) } From cf225c83656f5bf74a0a4364f23ec1ed27b68298 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Tue, 3 Jan 2017 13:56:34 -0800 Subject: [PATCH 06/16] Add the ability to set all the fields in the constructor --- replication.go | 28 +++++++++++++++++++++++----- replication_test.go | 27 ++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/replication.go b/replication.go index fdbdce92..7d4c56e2 100644 --- a/replication.go +++ b/replication.go @@ -124,11 +124,29 @@ type StandbyStatus struct { // Create a standby status struct, which sets all the WAL positions // to the given wal position, and the client time to the current time. -func NewStandbyStatus(walPosition uint64) (status *StandbyStatus) { - status = new(StandbyStatus) - status.WalFlushPosition = walPosition - status.WalApplyPosition = walPosition - status.WalWritePosition = walPosition +// The wal positions are, in order: +// WalFlushPosition +// WalApplyPosition +// WalWritePosition +// +// If only one position is provided, it will be used as the value for all 3 +// status fields. Note you must provide either 1 wal position, or all 3 +// in order to initialize the standby status. +func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) { + if len(walPositions) == 1 { + status = new(StandbyStatus) + status.WalFlushPosition = walPositions[0] + status.WalApplyPosition = walPositions[0] + status.WalWritePosition = walPositions[0] + } else if len(walPositions) == 3 { + status = new(StandbyStatus) + status.WalFlushPosition = walPositions[0] + status.WalApplyPosition = walPositions[1] + status.WalWritePosition = walPositions[2] + } else { + err = errors.New(fmt.Sprintf("Invalid number of wal positions provided, need 1 or 3, got %d", len(walPositions))) + return + } status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000) return } diff --git a/replication_test.go b/replication_test.go index 411b449e..b86da951 100644 --- a/replication_test.go +++ b/replication_test.go @@ -150,9 +150,34 @@ func TestSimpleReplicationConnection(t *testing.T) { // Before closing our connection, let's send a standby status to update our wal // position, which should then be reflected if we fetch out our current wal position // for the slot - replicationConn.SendStandbyStatus(pgx.NewStandbyStatus(maxWal)) + status, err := pgx.NewStandbyStatus(maxWal) + if err != nil { + t.Errorf("Failed to create standby status %v", err) + } + replicationConn.SendStandbyStatus(status) replicationConn.StopReplication() + // Let's push the boundary conditions of the standby status and ensure it errors correctly + status, err = pgx.NewStandbyStatus(0,1,2,3,4) + if err == nil { + t.Errorf("Expected error from new standby status, got %v",status) + } + + // And if you provide 3 args, ensure the right fields are set + status, err = pgx.NewStandbyStatus(1,2,3) + if err != nil { + t.Errorf("Failed to create test status: %v", err) + } + if status.WalFlushPosition != 1 { + t.Errorf("Unexpected flush position %d", status.WalFlushPosition) + } + if status.WalApplyPosition != 2 { + t.Errorf("Unexpected apply position %d", status.WalApplyPosition) + } + if status.WalWritePosition != 3 { + t.Errorf("Unexpected write position %d", status.WalWritePosition) + } + err = replicationConn.Close() if err != nil { t.Fatalf("Replication connection close failed: %v", err) From 8f5875b7b204f9148befdfb822b7fc400dcd69fd Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 09:49:27 -0800 Subject: [PATCH 07/16] Try to fix travis --- .travis.yml | 6 ++++++ README.md | 2 ++ 2 files changed, 8 insertions(+) diff --git a/.travis.yml b/.travis.yml index b120b33a..9f58e426 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,11 @@ before_install: - echo "host all pgx_md5 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "host all pgx_pw 127.0.0.1/32 password" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "hostssl all pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf + - echo "host replication pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf + - sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf + - echo "wal_level='logical'" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - echo "max_wal_senders=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - echo "max_replication_slots=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - sudo /etc/init.d/postgresql restart env: @@ -38,6 +43,7 @@ before_script: - psql -U postgres -c "create user pgx_ssl SUPERUSER PASSWORD 'secret'" - psql -U postgres -c "create user pgx_md5 SUPERUSER PASSWORD 'secret'" - psql -U postgres -c "create user pgx_pw SUPERUSER PASSWORD 'secret'" + - psql -U postgres -c "create user pgx_replication with replication password 'secret'" - psql -U postgres -c "create user \" tricky, ' } \"\" \\ test user \" superuser password 'secret'" install: diff --git a/README.md b/README.md index 51a01339..6f8b5015 100644 --- a/README.md +++ b/README.md @@ -95,12 +95,14 @@ If you are developing on Unix with domain socket connections: local pgx_test pgx_none trust local pgx_test pgx_pw password local pgx_test pgx_md5 md5 + local replication pgx_replication 127.0.0.1/32 md5 If you are developing on Windows with TCP connections: host pgx_test pgx_none 127.0.0.1/32 trust host pgx_test pgx_pw 127.0.0.1/32 password host pgx_test pgx_md5 127.0.0.1/32 md5 + host replication pgx_replication 127.0.0.1/32 md5 For replication testing, add the following to your postgresql.conf: From 71a61cd2c523468f24460fc6228a6aee2df90105 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 09:55:38 -0800 Subject: [PATCH 08/16] Dont break old postgres --- .travis.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9f58e426..61ec1d47 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,11 +19,11 @@ before_install: - echo "host all pgx_md5 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "host all pgx_pw 127.0.0.1/32 password" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "hostssl all pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - - echo "host replication pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf + - echo "host replication pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf - - echo "wal_level='logical'" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - - echo "max_wal_senders=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - - echo "max_replication_slots=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - [[ $PGVERSION > 9.3 ]] && echo "wal_level='logical'" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - [[ $PGVERSION > 9.3 ]] && echo "max_wal_senders=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - [[ $PGVERSION > 9.3 ]] && echo "max_replication_slots=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - sudo /etc/init.d/postgresql restart env: From 1b8606a458beab4b7b95209117fe1ad02241dd32 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 10:00:51 -0800 Subject: [PATCH 09/16] Valid YAML helps. --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 61ec1d47..95cd9d8e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,9 +21,9 @@ before_install: - echo "hostssl all pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "host replication pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf - - [[ $PGVERSION > 9.3 ]] && echo "wal_level='logical'" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - - [[ $PGVERSION > 9.3 ]] && echo "max_wal_senders=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf - - [[ $PGVERSION > 9.3 ]] && echo "max_replication_slots=5" >> /etc/postgresql/$PGVERSION/main/postgresql.conf + - "[[ $PGVERSION > 9.3 ]] && echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION > 9.3 ]] && echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION > 9.3 ]] && echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - sudo /etc/init.d/postgresql restart env: From c4de74fea2462098e6c86f2156ad71f86f6fc527 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 10:04:58 -0800 Subject: [PATCH 10/16] One more try for travis --- .travis.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 95cd9d8e..02bf3c50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,10 +20,11 @@ before_install: - echo "host all pgx_pw 127.0.0.1/32 password" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "hostssl all pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "host replication pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf + - echo "host pgx_test pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf - - "[[ $PGVERSION > 9.3 ]] && echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - - "[[ $PGVERSION > 9.3 ]] && echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - - "[[ $PGVERSION > 9.3 ]] && echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.4 ]] || echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.4 ]] || echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.4 ]] || echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - sudo /etc/init.d/postgresql restart env: From 883e604d0e26d6e255ff7e7d3365e98be4cf3e28 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 10:14:05 -0800 Subject: [PATCH 11/16] I forgot the tests are 9.6+ --- .travis.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 02bf3c50..32b35bbd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,13 +22,14 @@ before_install: - echo "host replication pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - echo "host pgx_test pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf - sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf - - "[[ $PGVERSION < 9.4 ]] || echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - - "[[ $PGVERSION < 9.4 ]] || echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - - "[[ $PGVERSION < 9.4 ]] || echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.6 ]] || echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.6 ]] || echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" + - "[[ $PGVERSION < 9.6 ]] || echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf" - sudo /etc/init.d/postgresql restart env: matrix: + - PGVERSION=9.6 - PGVERSION=9.5 - PGVERSION=9.4 - PGVERSION=9.3 From 69852595d89da821cef980f06fa58a39d7db0b04 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 10:25:48 -0800 Subject: [PATCH 12/16] Properly make the replication tests skippable on 9.5 and below --- conn_config_test.go.example | 1 + conn_config_test.go.travis | 10 ++++++++++ replication_test.go | 13 +++++++------ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/conn_config_test.go.example b/conn_config_test.go.example index 0b80d490..6deedd90 100644 --- a/conn_config_test.go.example +++ b/conn_config_test.go.example @@ -14,6 +14,7 @@ var plainPasswordConnConfig *pgx.ConnConfig = nil var invalidUserConnConfig *pgx.ConnConfig = nil var tlsConnConfig *pgx.ConnConfig = nil var customDialerConnConfig *pgx.ConnConfig = nil +var replicationConfig *pgx.ConnConfig = nil // var tcpConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} // var unixSocketConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "/private/tmp", User: "pgx_none", Database: "pgx_test"} diff --git a/conn_config_test.go.travis b/conn_config_test.go.travis index 2b2691de..f8fb4260 100644 --- a/conn_config_test.go.travis +++ b/conn_config_test.go.travis @@ -3,6 +3,8 @@ package pgx_test import ( "crypto/tls" "github.com/jackc/pgx" + "os" + "strconv" ) var defaultConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} @@ -13,3 +15,11 @@ var plainPasswordConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_pw", var invalidUserConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"} var tlsConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_ssl", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}} var customDialerConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} +var pgVersion = os.getenv("PGVERSION") +var replicationConfig *pgx.ConnConfig = nil +if len(pgVersion) > 0 { + version, err := strconv.ParseFloat(pgVersion) + if err == nil && version >= 9.6 { + replicationConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} + } +} diff --git a/replication_test.go b/replication_test.go index b86da951..789bb27d 100644 --- a/replication_test.go +++ b/replication_test.go @@ -41,19 +41,20 @@ func TestSimpleReplicationConnection(t *testing.T) { var err error var replicationUserConfig pgx.ConnConfig - var replicationConnConfig pgx.ConnConfig - replicationUserConfig = *defaultConnConfig - replicationUserConfig.User = "pgx_replication" + // /.s.PGSQL.5432 + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + + replicationUserConfig = *replicationConnConfig conn := mustConnect(t, replicationUserConfig) defer closeConn(t, conn) - replicationConnConfig = *defaultConnConfig - replicationConnConfig.User = "pgx_replication" replicationConnConfig.RuntimeParams = make(map[string]string) replicationConnConfig.RuntimeParams["replication"] = "database" - replicationConn := mustConnect(t, replicationConnConfig) + replicationConn := mustConnect(t, *replicationConnConfig) defer closeConn(t, replicationConn) _, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding") From 24f06aed9ac4de6111fb1caa26612298e51e4db8 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 10:57:49 -0800 Subject: [PATCH 13/16] Fix the syntax --- conn_config_test.go.travis | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/conn_config_test.go.travis b/conn_config_test.go.travis index f8fb4260..564d704c 100644 --- a/conn_config_test.go.travis +++ b/conn_config_test.go.travis @@ -15,8 +15,10 @@ var plainPasswordConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_pw", var invalidUserConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"} var tlsConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_ssl", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}} var customDialerConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} -var pgVersion = os.getenv("PGVERSION") +var pgVersion string var replicationConfig *pgx.ConnConfig = nil + +pgVersion = os.getenv("PGVERSION") if len(pgVersion) > 0 { version, err := strconv.ParseFloat(pgVersion) if err == nil && version >= 9.6 { From d9a80caaad0dbdf274cc0fc96729adaa2d02e1f9 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 12:50:42 -0800 Subject: [PATCH 14/16] The naming really matters --- conn_config_test.go.example | 2 +- conn_config_test.go.travis | 7 +++---- replication_test.go | 5 +---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/conn_config_test.go.example b/conn_config_test.go.example index 6deedd90..cac798b7 100644 --- a/conn_config_test.go.example +++ b/conn_config_test.go.example @@ -14,7 +14,7 @@ var plainPasswordConnConfig *pgx.ConnConfig = nil var invalidUserConnConfig *pgx.ConnConfig = nil var tlsConnConfig *pgx.ConnConfig = nil var customDialerConnConfig *pgx.ConnConfig = nil -var replicationConfig *pgx.ConnConfig = nil +var replicationConnConfig *pgx.ConnConfig = nil // var tcpConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} // var unixSocketConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "/private/tmp", User: "pgx_none", Database: "pgx_test"} diff --git a/conn_config_test.go.travis b/conn_config_test.go.travis index 564d704c..5ad7525d 100644 --- a/conn_config_test.go.travis +++ b/conn_config_test.go.travis @@ -15,13 +15,12 @@ var plainPasswordConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_pw", var invalidUserConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"} var tlsConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_ssl", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}} var customDialerConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} -var pgVersion string -var replicationConfig *pgx.ConnConfig = nil +var replicationConnConfig *pgx.ConnConfig = nil -pgVersion = os.getenv("PGVERSION") +pgVersion := os.getenv("PGVERSION") if len(pgVersion) > 0 { version, err := strconv.ParseFloat(pgVersion) if err == nil && version >= 9.6 { - replicationConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} + replicationConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} } } diff --git a/replication_test.go b/replication_test.go index 789bb27d..866fe45e 100644 --- a/replication_test.go +++ b/replication_test.go @@ -40,15 +40,12 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Parallel() var err error - var replicationUserConfig pgx.ConnConfig - // /.s.PGSQL.5432 if replicationConnConfig == nil { t.Skip("Skipping due to undefined replicationConnConfig") } - replicationUserConfig = *replicationConnConfig - conn := mustConnect(t, replicationUserConfig) + conn := mustConnect(t, *replicationConnConfig) defer closeConn(t, conn) replicationConnConfig.RuntimeParams = make(map[string]string) From 5584040249a7fa192a2dd43d04705807446ed78b Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 5 Jan 2017 12:55:34 -0800 Subject: [PATCH 15/16] Properly make it a func init() --- conn_config_test.go.travis | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/conn_config_test.go.travis b/conn_config_test.go.travis index 5ad7525d..75714bf3 100644 --- a/conn_config_test.go.travis +++ b/conn_config_test.go.travis @@ -17,10 +17,14 @@ var tlsConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_ssl", Password var customDialerConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} var replicationConnConfig *pgx.ConnConfig = nil -pgVersion := os.getenv("PGVERSION") -if len(pgVersion) > 0 { - version, err := strconv.ParseFloat(pgVersion) - if err == nil && version >= 9.6 { - replicationConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} - } +func init() { + version := os.Getenv("PGVERSION") + + if len(version) > 0 { + v, err := strconv.ParseFloat(version,64) + if err == nil && v >= 9.6 { + replicationConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} + } + } } + From 0ee01e0c4af8d51a07056b6f5c9a7b876621aa8c Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Fri, 6 Jan 2017 15:04:03 -0600 Subject: [PATCH 16/16] Tweak replication test setup --- README.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6f8b5015..f5e54139 100644 --- a/README.md +++ b/README.md @@ -95,18 +95,26 @@ If you are developing on Unix with domain socket connections: local pgx_test pgx_none trust local pgx_test pgx_pw password local pgx_test pgx_md5 md5 - local replication pgx_replication 127.0.0.1/32 md5 If you are developing on Windows with TCP connections: host pgx_test pgx_none 127.0.0.1/32 trust host pgx_test pgx_pw 127.0.0.1/32 password host pgx_test pgx_md5 127.0.0.1/32 md5 - host replication pgx_replication 127.0.0.1/32 md5 -For replication testing, add the following to your postgresql.conf: +### Replication Test Environment - wal_level='logical' +Add a replication user: + + create user pgx_replication with replication password 'secret'; + +Add a replication line to your pg_hba.conf: + + host replication pgx_replication 127.0.0.1/32 md5 + +Change the following settings in your postgresql.conf: + + wal_level=logical max_wal_senders=5 max_replication_slots=5