From c88c1101698756d3d42ecc05e2d2896ed9b61deb Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 11:32:02 -0800 Subject: [PATCH 1/9] ReplicationConn refactor --- helper_test.go | 16 ++++++++ msg_reader.go | 2 +- replication.go | 95 ++++++++++++++++++++++++++++++--------------- replication_test.go | 9 ++--- value_reader.go | 2 +- values.go | 8 ++-- 6 files changed, 89 insertions(+), 43 deletions(-) diff --git a/helper_test.go b/helper_test.go index ed5a9644..eff731e8 100644 --- a/helper_test.go +++ b/helper_test.go @@ -13,6 +13,15 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn { return conn } +func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn { + conn, err := pgx.ReplicationConnect(config) + if err != nil { + t.Fatalf("Unable to establish connection: %v", err) + } + return conn +} + + func closeConn(t testing.TB, conn *pgx.Conn) { err := conn.Close() if err != nil { @@ -20,6 +29,13 @@ func closeConn(t testing.TB, conn *pgx.Conn) { } } +func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) { + err := conn.Close() + if err != nil { + t.Fatalf("conn.Close unexpectedly failed: %v", err) + } +} + func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgx.CommandTag) { var err error if commandTag, err = conn.Exec(sql, arguments...); err != nil { diff --git a/msg_reader.go b/msg_reader.go index 59617b73..21db5d26 100644 --- a/msg_reader.go +++ b/msg_reader.go @@ -21,7 +21,7 @@ func (r *msgReader) Err() error { return r.err } -// fatal tells r that a Fatal error has occurred +// fatal tells rc that a Fatal error has occurred func (r *msgReader) fatal(err error) { if r.shouldLog(LogLevelTrace) { r.log(LogLevelTrace, "msgReader.fatal", "error", err, "msgBytesRemaining", r.msgBytesRemaining) diff --git a/replication.go b/replication.go index 7d4c56e2..f73b8528 100644 --- a/replication.go +++ b/replication.go @@ -151,11 +151,28 @@ func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) return } +func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) { + if config.RuntimeParams == nil { + config.RuntimeParams = make(map[string]string) + } + config.RuntimeParams["replication"] = "database" + + c,err := Connect(config) + if err != nil { + return + } + return &ReplicationConn{c: c}, nil +} + +type ReplicationConn struct { + c *Conn +} + // Send standby status to the server, which both acts as a keepalive // message to the server, as well as carries the WAL position of the // client, which then updates the server's replication slot position. -func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { - writeBuf := newWriteBuf(c, copyData) +func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { + writeBuf := newWriteBuf(rc.c, copyData) writeBuf.WriteByte(standbyStatusUpdate) writeBuf.WriteInt64(int64(k.WalWritePosition)) writeBuf.WriteInt64(int64(k.WalFlushPosition)) @@ -165,9 +182,9 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { writeBuf.closeMsg() - _, err = c.conn.Write(writeBuf.buf) + _, err = rc.c.conn.Write(writeBuf.buf) if err != nil { - c.die(err) + rc.c.die(err) } return @@ -175,37 +192,41 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { // Send the message to formally stop the replication stream. This // is done before calling Close() during a clean shutdown. -func (c *Conn) StopReplication() (err error) { - writeBuf := newWriteBuf(c, copyDone) +func (rc *ReplicationConn) StopReplication() (err error) { + writeBuf := newWriteBuf(rc.c, copyDone) writeBuf.closeMsg() - _, err = c.conn.Write(writeBuf.buf) + _, err = rc.c.conn.Write(writeBuf.buf) if err != nil { - c.die(err) + rc.c.die(err) } return } +func (rc *ReplicationConn) Close() error { + return rc.c.Close() +} -func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { + +func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte var reader *msgReader - t, reader, err = c.rxMsg() + t, reader, err = rc.c.rxMsg() if err != nil { return } switch t { case noticeResponse: - pgError := c.rxErrorResponse(reader) - if c.shouldLog(LogLevelInfo) { - c.log(LogLevelInfo, pgError.Error()) + pgError := rc.c.rxErrorResponse(reader) + if rc.c.shouldLog(LogLevelInfo) { + rc.c.log(LogLevelInfo, pgError.Error()) } case errorResponse: - err = c.rxErrorResponse(reader) - if c.shouldLog(LogLevelError) { - c.log(LogLevelError, err.Error()) + err = rc.c.rxErrorResponse(reader) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError, err.Error()) } return case copyBothResponse: @@ -235,13 +256,13 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil default: - if c.shouldLog(LogLevelError) { - c.log(LogLevelError,"Unexpected data playload message type %v", t) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError,"Unexpected data playload message type %v", t) } } default: - if c.shouldLog(LogLevelError) { - c.log(LogLevelError,"Unexpected replication message type %v", t) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError,"Unexpected replication message type %v", t) } } return @@ -256,7 +277,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // // This returns pgx.ErrNotificationTimeout when there is no replication message by the specified // duration. -func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { +func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { var zeroTime time.Time deadline := time.Now().Add(timeout) @@ -269,27 +290,27 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM // deadline and peek into the reader. If a timeout error occurs there // we don't break the pgx connection. If the Peek returns that data // is available then we turn off the read deadline before the rxMsg. - err = c.conn.SetReadDeadline(deadline) + err = rc.c.conn.SetReadDeadline(deadline) if err != nil { return nil, err } // Wait until there is a byte available before continuing onto the normal msg reading path - _, err = c.reader.Peek(1) + _, err = rc.c.reader.Peek(1) if err != nil { - c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline + rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline if err, ok := err.(*net.OpError); ok && err.Timeout() { return nil, ErrNotificationTimeout } return nil, err } - err = c.conn.SetReadDeadline(zeroTime) + err = rc.c.conn.SetReadDeadline(zeroTime) if err != nil { return nil, err } - return c.readReplicationMessage() + return rc.readReplicationMessage() } // Start a replication connection, sending WAL data to the given replication @@ -303,7 +324,7 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM // // This function assumes that slotName has already been created. In order to omit the timeline argument // pass a -1 for the timeline to get the server default behavior. -func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { +func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { var queryString string if timeline >= 0 { queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline) @@ -315,7 +336,7 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64 queryString += fmt.Sprintf(" %s", arg) } - if err = c.sendQuery(queryString); err != nil { + if err = rc.c.sendQuery(queryString); err != nil { return } @@ -324,12 +345,24 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64 // started. This call will either return nil, nil or if it returns an error // that indicates the start replication command failed var r *ReplicationMessage - r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout) + r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout) if err != nil && r != nil { - if c.shouldLog(LogLevelError) { - c.log(LogLevelError, "Unxpected replication message %v", r) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError, "Unxpected replication message %v", r) } } return } + +// Create the replication slot, using the given name and output plugin. +func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + return +} + +// Drop the replication slot for the given name +func (rc *ReplicationConn) DropReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) + return +} diff --git a/replication_test.go b/replication_test.go index 866fe45e..20572edd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -48,13 +48,10 @@ func TestSimpleReplicationConnection(t *testing.T) { conn := mustConnect(t, *replicationConnConfig) defer closeConn(t, conn) - replicationConnConfig.RuntimeParams = make(map[string]string) - replicationConnConfig.RuntimeParams["replication"] = "database" + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) - replicationConn := mustConnect(t, *replicationConnConfig) - defer closeConn(t, replicationConn) - - _, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding") + err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } diff --git a/value_reader.go b/value_reader.go index a4897543..fb036dd6 100644 --- a/value_reader.go +++ b/value_reader.go @@ -17,7 +17,7 @@ func (r *ValueReader) Err() error { return r.err } -// Fatal tells r that a Fatal error has occurred +// Fatal tells rc that a Fatal error has occurred func (r *ValueReader) Fatal(err error) { r.err = err } diff --git a/values.go b/values.go index b4466b82..bd912020 100644 --- a/values.go +++ b/values.go @@ -129,9 +129,9 @@ func (e SerializationError) Error() string { // Scanner is an interface used to decode values from the PostgreSQL server. type Scanner interface { - // Scan MUST check r.Type().DataType (to check by OID) or - // r.Type().DataTypeName (to check by name) to ensure that it is scanning an - // expected column type. It also MUST check r.Type().FormatCode before + // Scan MUST check rc.Type().DataType (to check by OID) or + // rc.Type().DataTypeName (to check by name) to ensure that it is scanning an + // expected column type. It also MUST check rc.Type().FormatCode before // decoding. It should not assume that it was called on a data type or format // that it understands. Scan(r *ValueReader) error @@ -3167,7 +3167,7 @@ func parseQuotedAclItem(reader *strings.Reader) (AclItem, error) { } } -// Returns the next rune from r, unless it is a backslash; +// Returns the next rune from rc, unless it is a backslash; // in that case, it returns the rune after the backslash. The second // return value tells us whether or not the rune was // preceeded by a backslash (escaped). From af01afca001022722c6239be8b076c202422627a Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 14:19:08 -0800 Subject: [PATCH 2/9] Add the drop replication slot functionality --- replication.go | 2 +- replication_test.go | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/replication.go b/replication.go index f73b8528..6d78a7a1 100644 --- a/replication.go +++ b/replication.go @@ -362,7 +362,7 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) } // Drop the replication slot for the given name -func (rc *ReplicationConn) DropReplicationSlot(slotName, outputPlugin string) (err error) { +func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) { _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) return } diff --git a/replication_test.go b/replication_test.go index 20572edd..42d133c4 100644 --- a/replication_test.go +++ b/replication_test.go @@ -173,6 +173,10 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } + err = replicationConn.DropReplicationSlot("pgx_test") + if err != nil { + t.Fatalf("Failed to drop replication slot %v", err) + } err = replicationConn.Close() if err != nil { t.Fatalf("Replication connection close failed: %v", err) @@ -183,10 +187,4 @@ func TestSimpleReplicationConnection(t *testing.T) { if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } - - _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") - if err != nil { - t.Fatalf("Failed to drop replication slot: %v", err) - } - } From b2f416c07d8ed08f4dc21602a9c57c6eb602a03d Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 14:27:34 -0800 Subject: [PATCH 3/9] Drop replication slot has to run on a live connection, so we'll use the function form for the test. --- replication_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/replication_test.go b/replication_test.go index 42d133c4..20572edd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -173,10 +173,6 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } - err = replicationConn.DropReplicationSlot("pgx_test") - if err != nil { - t.Fatalf("Failed to drop replication slot %v", err) - } err = replicationConn.Close() if err != nil { t.Fatalf("Replication connection close failed: %v", err) @@ -187,4 +183,10 @@ func TestSimpleReplicationConnection(t *testing.T) { if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } + + _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + } From 41d9c0f3381c66e534ff8c9ca16f5c637d93326d Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 26 Jan 2017 18:24:21 -0800 Subject: [PATCH 4/9] Add tests for replication slot drop, and go fmt --- replication.go | 21 ++++++++++++++------- replication_test.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/replication.go b/replication.go index 6d78a7a1..8923d533 100644 --- a/replication.go +++ b/replication.go @@ -8,10 +8,10 @@ import ( ) const ( - copyBothResponse = 'W' - walData = 'w' - senderKeepalive = 'k' - standbyStatusUpdate = 'r' + copyBothResponse = 'W' + walData = 'w' + senderKeepalive = 'k' + standbyStatusUpdate = 'r' initialReplicationResponseTimeout = 5 * time.Second ) @@ -157,7 +157,7 @@ func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) { } config.RuntimeParams["replication"] = "database" - c,err := Connect(config) + c, err := Connect(config) if err != nil { return } @@ -208,6 +208,13 @@ func (rc *ReplicationConn) Close() error { return rc.c.Close() } +func (rc *ReplicationConn) IsAlive() bool { + return rc.c.IsAlive() +} + +func (rc *ReplicationConn) CauseOfDeath() error { + return rc.c.CauseOfDeath() +} func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte @@ -257,12 +264,12 @@ func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err return &ReplicationMessage{ServerHeartbeat: h}, nil default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError,"Unexpected data playload message type %v", t) + rc.c.log(LogLevelError, "Unexpected data playload message type %v", t) } } default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError,"Unexpected replication message type %v", t) + rc.c.log(LogLevelError, "Unexpected replication message type %v", t) } } return diff --git a/replication_test.go b/replication_test.go index 20572edd..ee187ec2 100644 --- a/replication_test.go +++ b/replication_test.go @@ -1,13 +1,13 @@ package pgx_test import ( + "fmt" "github.com/jackc/pgx" + "reflect" "strconv" "strings" "testing" "time" - "reflect" - "fmt" ) // This function uses a postgresql 9.6 specific column @@ -51,7 +51,7 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding") + err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } @@ -152,14 +152,23 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn.SendStandbyStatus(status) replicationConn.StopReplication() + if replicationConn.IsAlive() == false { + t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) + } + + err = replicationConn.Close() + if err != nil { + t.Fatalf("Replication connection close failed: %v", err) + } + // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err = pgx.NewStandbyStatus(0,1,2,3,4) + status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4) if err == nil { - t.Errorf("Expected error from new standby status, got %v",status) + t.Errorf("Expected error from new standby status, got %v", status) } // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1,2,3) + status, err = pgx.NewStandbyStatus(1, 2, 3) if err != nil { t.Errorf("Failed to create test status: %v", err) } @@ -173,20 +182,23 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } - err = replicationConn.Close() - if err != nil { - t.Fatalf("Replication connection close failed: %v", err) - } - restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") integerRestartLsn, _ := pgx.ParseLSN(restartLsn) if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } - _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn2) + + err = replicationConn2.DropReplicationSlot("pgx_test") if err != nil { t.Fatalf("Failed to drop replication slot: %v", err) } + droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") + if droppedLsn != "" { + t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) + } + } From 1424fb2b4284fdf5dc42b93036f4b51ed66b7083 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 11:35:48 -0800 Subject: [PATCH 5/9] Add IdentifySystem and TimelineHistory functions, and tighten up the testing --- replication.go | 82 ++++++++++++++++++---- replication_test.go | 167 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 214 insertions(+), 35 deletions(-) diff --git a/replication.go b/replication.go index 8923d533..7b28d6b6 100644 --- a/replication.go +++ b/replication.go @@ -190,20 +190,6 @@ func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { return } -// Send the message to formally stop the replication stream. This -// is done before calling Close() during a clean shutdown. -func (rc *ReplicationConn) StopReplication() (err error) { - writeBuf := newWriteBuf(rc.c, copyDone) - - writeBuf.closeMsg() - - _, err = rc.c.conn.Write(writeBuf.buf) - if err != nil { - rc.c.die(err) - } - return -} - func (rc *ReplicationConn) Close() error { return rc.c.Close() } @@ -320,6 +306,74 @@ func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r * return rc.readReplicationMessage() } +func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { + rc.c.lastActivityTime = time.Now() + + rows := rc.c.getRows(sql, nil) + + if err := rc.c.lock(); err != nil { + rows.abort(err) + return rows, err + } + rows.unlockConn = true + + err := rc.c.sendSimpleQuery(sql) + if err != nil { + rows.abort(err) + } + + var t byte + var r *msgReader + t, r, err = rc.c.rxMsg() + if err != nil { + return nil, err + } + + switch t { + case rowDescription: + rows.fields = rc.c.rxRowDescription(r) + // We don't have c.PgTypes here because we're a replication + // connection. This means the field descriptions will have + // only Oids. Not much we can do about this. + default: + if e := rc.c.processContextFreeMsg(t, r); e != nil { + rows.abort(e) + return rows, e + } + } + + return rows, rows.err +} + +// Execute the "IDENTIFY_SYSTEM" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the systemid, current timeline, xlogpos and database +// name. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { + return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") +} + +// Execute the "TIMELINE_HISTORY" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the filename of the history file and the content +// of the history file. If called for timeline 1, typically this will +// generate an error that the timeline history file does not exist. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { + return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) +} + // Start a replication connection, sending WAL data to the given replication // receiver. This function wraps a START_REPLICATION command as documented // here: diff --git a/replication_test.go b/replication_test.go index ee187ec2..73874a1f 100644 --- a/replication_test.go +++ b/replication_test.go @@ -150,7 +150,6 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Failed to create standby status %v", err) } replicationConn.SendStandbyStatus(status) - replicationConn.StopReplication() if replicationConn.IsAlive() == false { t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) @@ -161,25 +160,8 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Replication connection close failed: %v", err) } - // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4) - if err == nil { - t.Errorf("Expected error from new standby status, got %v", status) - } - - // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1, 2, 3) - if err != nil { - t.Errorf("Failed to create test status: %v", err) - } - if status.WalFlushPosition != 1 { - t.Errorf("Unexpected flush position %d", status.WalFlushPosition) - } - if status.WalApplyPosition != 2 { - t.Errorf("Unexpected apply position %d", status.WalApplyPosition) - } - if status.WalWritePosition != 3 { - t.Errorf("Unexpected write position %d", status.WalWritePosition) + if replicationConn.IsAlive() == true { + t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath()) } restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") @@ -200,5 +182,148 @@ func TestSimpleReplicationConnection(t *testing.T) { if droppedLsn != "" { t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) } - } + +func TestReplicationConn_DropReplicationSlot(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + + // We re-create to ensure the drop worked. + err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + + // And finally we drop to ensure we don't leave dirty state + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } +} + +func TestIdentifySystem(t *testing.T) { + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn2) + + r, err := replicationConn2.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + t.Error(r.Err()) + } + + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { + r, err := rc.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for r.Next() { + values, e := r.Values() + if e != nil { + t.Error(e) + } + timeline, e := strconv.Atoi(values[1].(string)) + if e != nil { + t.Error(e) + } + return timeline + } + t.Fatal("Failed to read timeline") + return -1 +} + + +func TestGetTimelineHistory(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + timeline := getCurrentTimeline(t, replicationConn) + + r, err := replicationConn.TimelineHistory(timeline) + if err != nil { + t.Errorf("%#v", err) + } + defer r.Close() + + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + if strings.Contains(r.Err().Error(), "No such file or directory") { + // This is normal, this means the timeline we're on has no + // history, which is the common case in a test db that + // has only one timeline + return + } + t.Error(r.Err()) + } + + // If we have a timeline history (see above) there should have been + // rows emitted + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func TestStandbyStatusParsing(t *testing.T) { + // Let's push the boundary conditions of the standby status and ensure it errors correctly + status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4) + if err == nil { + t.Errorf("Expected error from new standby status, got %v", status) + } + + // And if you provide 3 args, ensure the right fields are set + status, err = pgx.NewStandbyStatus(1, 2, 3) + if err != nil { + t.Errorf("Failed to create test status: %v", err) + } + if status.WalFlushPosition != 1 { + t.Errorf("Unexpected flush position %d", status.WalFlushPosition) + } + if status.WalApplyPosition != 2 { + t.Errorf("Unexpected apply position %d", status.WalApplyPosition) + } + if status.WalWritePosition != 3 { + t.Errorf("Unexpected write position %d", status.WalWritePosition) + } +} \ No newline at end of file From 86fef0e5d74d2bb3472d8a2443ea46fb9f25d519 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 11:37:07 -0800 Subject: [PATCH 6/9] go fmt --- replication_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/replication_test.go b/replication_test.go index 73874a1f..96e223df 100644 --- a/replication_test.go +++ b/replication_test.go @@ -262,7 +262,6 @@ func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { return -1 } - func TestGetTimelineHistory(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) @@ -326,4 +325,4 @@ func TestStandbyStatusParsing(t *testing.T) { if status.WalWritePosition != 3 { t.Errorf("Unexpected write position %d", status.WalWritePosition) } -} \ No newline at end of file +} From 76ac06083e7d32fab964ba782a54ce140cd241c6 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 13:55:18 -0800 Subject: [PATCH 7/9] Dont test when you dont have a config --- replication_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/replication_test.go b/replication_test.go index 96e223df..26399d0c 100644 --- a/replication_test.go +++ b/replication_test.go @@ -185,6 +185,10 @@ func TestSimpleReplicationConnection(t *testing.T) { } func TestReplicationConn_DropReplicationSlot(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) @@ -211,6 +215,10 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { } func TestIdentifySystem(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn2) @@ -263,6 +271,10 @@ func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { } func TestGetTimelineHistory(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) From be5a9a0aff30cb9fd8870d1292ec9d1cc9a5d21f Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 14:12:12 -0800 Subject: [PATCH 8/9] Clean shutdown after the flush lsn check --- replication_test.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/replication_test.go b/replication_test.go index 26399d0c..4f810c78 100644 --- a/replication_test.go +++ b/replication_test.go @@ -151,25 +151,14 @@ func TestSimpleReplicationConnection(t *testing.T) { } replicationConn.SendStandbyStatus(status) - if replicationConn.IsAlive() == false { - t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) - } - - err = replicationConn.Close() - if err != nil { - t.Fatalf("Replication connection close failed: %v", err) - } - - if replicationConn.IsAlive() == true { - t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath()) - } - restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") integerRestartLsn, _ := pgx.ParseLSN(restartLsn) if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } + closeReplicationConn(t, replicationConn) + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn2) From 27b90681e860cb2cc7de8c777a51850f3587f32e Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 2 Feb 2017 19:30:26 -0600 Subject: [PATCH 9/9] Fix find-and-replace errors --- value_reader.go | 2 +- values.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/value_reader.go b/value_reader.go index fb036dd6..a4897543 100644 --- a/value_reader.go +++ b/value_reader.go @@ -17,7 +17,7 @@ func (r *ValueReader) Err() error { return r.err } -// Fatal tells rc that a Fatal error has occurred +// Fatal tells r that a Fatal error has occurred func (r *ValueReader) Fatal(err error) { r.err = err } diff --git a/values.go b/values.go index bd912020..b4466b82 100644 --- a/values.go +++ b/values.go @@ -129,9 +129,9 @@ func (e SerializationError) Error() string { // Scanner is an interface used to decode values from the PostgreSQL server. type Scanner interface { - // Scan MUST check rc.Type().DataType (to check by OID) or - // rc.Type().DataTypeName (to check by name) to ensure that it is scanning an - // expected column type. It also MUST check rc.Type().FormatCode before + // Scan MUST check r.Type().DataType (to check by OID) or + // r.Type().DataTypeName (to check by name) to ensure that it is scanning an + // expected column type. It also MUST check r.Type().FormatCode before // decoding. It should not assume that it was called on a data type or format // that it understands. Scan(r *ValueReader) error @@ -3167,7 +3167,7 @@ func parseQuotedAclItem(reader *strings.Reader) (AclItem, error) { } } -// Returns the next rune from rc, unless it is a backslash; +// Returns the next rune from r, unless it is a backslash; // in that case, it returns the rune after the backslash. The second // return value tells us whether or not the rune was // preceeded by a backslash (escaped).