2
0

Merge branch 'seatme-feature/replication_conn'

This commit is contained in:
Jack Christensen
2017-02-02 19:30:37 -06:00
4 changed files with 317 additions and 73 deletions
+16
View File
@@ -13,6 +13,15 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn {
return conn
}
func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn {
conn, err := pgx.ReplicationConnect(config)
if err != nil {
t.Fatalf("Unable to establish connection: %v", err)
}
return conn
}
func closeConn(t testing.TB, conn *pgx.Conn) {
err := conn.Close()
if err != nil {
@@ -20,6 +29,13 @@ func closeConn(t testing.TB, conn *pgx.Conn) {
}
}
func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) {
err := conn.Close()
if err != nil {
t.Fatalf("conn.Close unexpectedly failed: %v", err)
}
}
func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgx.CommandTag) {
var err error
if commandTag, err = conn.Exec(sql, arguments...); err != nil {
+1 -1
View File
@@ -21,7 +21,7 @@ func (r *msgReader) Err() error {
return r.err
}
// fatal tells r that a Fatal error has occurred
// fatal tells rc that a Fatal error has occurred
func (r *msgReader) fatal(err error) {
if r.shouldLog(LogLevelTrace) {
r.log(LogLevelTrace, "msgReader.fatal", "error", err, "msgBytesRemaining", r.msgBytesRemaining)
+137 -43
View File
@@ -8,10 +8,10 @@ import (
)
const (
copyBothResponse = 'W'
walData = 'w'
senderKeepalive = 'k'
standbyStatusUpdate = 'r'
copyBothResponse = 'W'
walData = 'w'
senderKeepalive = 'k'
standbyStatusUpdate = 'r'
initialReplicationResponseTimeout = 5 * time.Second
)
@@ -151,11 +151,28 @@ func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error)
return
}
func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) {
if config.RuntimeParams == nil {
config.RuntimeParams = make(map[string]string)
}
config.RuntimeParams["replication"] = "database"
c, err := Connect(config)
if err != nil {
return
}
return &ReplicationConn{c: c}, nil
}
type ReplicationConn struct {
c *Conn
}
// Send standby status to the server, which both acts as a keepalive
// message to the server, as well as carries the WAL position of the
// client, which then updates the server's replication slot position.
func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
writeBuf := newWriteBuf(c, copyData)
func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) {
writeBuf := newWriteBuf(rc.c, copyData)
writeBuf.WriteByte(standbyStatusUpdate)
writeBuf.WriteInt64(int64(k.WalWritePosition))
writeBuf.WriteInt64(int64(k.WalFlushPosition))
@@ -165,47 +182,44 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
writeBuf.closeMsg()
_, err = c.conn.Write(writeBuf.buf)
_, err = rc.c.conn.Write(writeBuf.buf)
if err != nil {
c.die(err)
rc.c.die(err)
}
return
}
// Send the message to formally stop the replication stream. This
// is done before calling Close() during a clean shutdown.
func (c *Conn) StopReplication() (err error) {
writeBuf := newWriteBuf(c, copyDone)
writeBuf.closeMsg()
_, err = c.conn.Write(writeBuf.buf)
if err != nil {
c.die(err)
}
return
func (rc *ReplicationConn) Close() error {
return rc.c.Close()
}
func (rc *ReplicationConn) IsAlive() bool {
return rc.c.IsAlive()
}
func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
func (rc *ReplicationConn) CauseOfDeath() error {
return rc.c.CauseOfDeath()
}
func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) {
var t byte
var reader *msgReader
t, reader, err = c.rxMsg()
t, reader, err = rc.c.rxMsg()
if err != nil {
return
}
switch t {
case noticeResponse:
pgError := c.rxErrorResponse(reader)
if c.shouldLog(LogLevelInfo) {
c.log(LogLevelInfo, pgError.Error())
pgError := rc.c.rxErrorResponse(reader)
if rc.c.shouldLog(LogLevelInfo) {
rc.c.log(LogLevelInfo, pgError.Error())
}
case errorResponse:
err = c.rxErrorResponse(reader)
if c.shouldLog(LogLevelError) {
c.log(LogLevelError, err.Error())
err = rc.c.rxErrorResponse(reader)
if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError, err.Error())
}
return
case copyBothResponse:
@@ -235,13 +249,13 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
return &ReplicationMessage{ServerHeartbeat: h}, nil
default:
if c.shouldLog(LogLevelError) {
c.log(LogLevelError,"Unexpected data playload message type %v", t)
if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError, "Unexpected data playload message type %v", t)
}
}
default:
if c.shouldLog(LogLevelError) {
c.log(LogLevelError,"Unexpected replication message type %v", t)
if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError, "Unexpected replication message type %v", t)
}
}
return
@@ -256,7 +270,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
//
// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
// duration.
func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
var zeroTime time.Time
deadline := time.Now().Add(timeout)
@@ -269,27 +283,95 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM
// deadline and peek into the reader. If a timeout error occurs there
// we don't break the pgx connection. If the Peek returns that data
// is available then we turn off the read deadline before the rxMsg.
err = c.conn.SetReadDeadline(deadline)
err = rc.c.conn.SetReadDeadline(deadline)
if err != nil {
return nil, err
}
// Wait until there is a byte available before continuing onto the normal msg reading path
_, err = c.reader.Peek(1)
_, err = rc.c.reader.Peek(1)
if err != nil {
c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
if err, ok := err.(*net.OpError); ok && err.Timeout() {
return nil, ErrNotificationTimeout
}
return nil, err
}
err = c.conn.SetReadDeadline(zeroTime)
err = rc.c.conn.SetReadDeadline(zeroTime)
if err != nil {
return nil, err
}
return c.readReplicationMessage()
return rc.readReplicationMessage()
}
func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
rc.c.lastActivityTime = time.Now()
rows := rc.c.getRows(sql, nil)
if err := rc.c.lock(); err != nil {
rows.abort(err)
return rows, err
}
rows.unlockConn = true
err := rc.c.sendSimpleQuery(sql)
if err != nil {
rows.abort(err)
}
var t byte
var r *msgReader
t, r, err = rc.c.rxMsg()
if err != nil {
return nil, err
}
switch t {
case rowDescription:
rows.fields = rc.c.rxRowDescription(r)
// We don't have c.PgTypes here because we're a replication
// connection. This means the field descriptions will have
// only Oids. Not much we can do about this.
default:
if e := rc.c.processContextFreeMsg(t, r); e != nil {
rows.abort(e)
return rows, e
}
}
return rows, rows.err
}
// Execute the "IDENTIFY_SYSTEM" command as documented here:
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
//
// This will return (if successful) a result set that has a single row
// that contains the systemid, current timeline, xlogpos and database
// name.
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
// Oids and no DataTypeName values
func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) {
return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM")
}
// Execute the "TIMELINE_HISTORY" command as documented here:
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
//
// This will return (if successful) a result set that has a single row
// that contains the filename of the history file and the content
// of the history file. If called for timeline 1, typically this will
// generate an error that the timeline history file does not exist.
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
// Oids and no DataTypeName values
func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) {
return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline))
}
// Start a replication connection, sending WAL data to the given replication
@@ -303,7 +385,7 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM
//
// This function assumes that slotName has already been created. In order to omit the timeline argument
// pass a -1 for the timeline to get the server default behavior.
func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
var queryString string
if timeline >= 0 {
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline)
@@ -315,7 +397,7 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64
queryString += fmt.Sprintf(" %s", arg)
}
if err = c.sendQuery(queryString); err != nil {
if err = rc.c.sendQuery(queryString); err != nil {
return
}
@@ -324,12 +406,24 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64
// started. This call will either return nil, nil or if it returns an error
// that indicates the start replication command failed
var r *ReplicationMessage
r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout)
r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout)
if err != nil && r != nil {
if c.shouldLog(LogLevelError) {
c.log(LogLevelError, "Unxpected replication message %v", r)
if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError, "Unxpected replication message %v", r)
}
}
return
}
// Create the replication slot, using the given name and output plugin.
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) {
_, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
return
}
// Drop the replication slot for the given name
func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) {
_, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName))
return
}
+163 -29
View File
@@ -1,13 +1,13 @@
package pgx_test
import (
"fmt"
"github.com/jackc/pgx"
"reflect"
"strconv"
"strings"
"testing"
"time"
"reflect"
"fmt"
)
// This function uses a postgresql 9.6 specific column
@@ -48,13 +48,10 @@ func TestSimpleReplicationConnection(t *testing.T) {
conn := mustConnect(t, *replicationConnConfig)
defer closeConn(t, conn)
replicationConnConfig.RuntimeParams = make(map[string]string)
replicationConnConfig.RuntimeParams["replication"] = "database"
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
replicationConn := mustConnect(t, *replicationConnConfig)
defer closeConn(t, replicationConn)
_, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding")
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
@@ -153,16 +150,170 @@ func TestSimpleReplicationConnection(t *testing.T) {
t.Errorf("Failed to create standby status %v", err)
}
replicationConn.SendStandbyStatus(status)
replicationConn.StopReplication()
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
if integerRestartLsn != maxWal {
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
}
closeReplicationConn(t, replicationConn)
replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
err = replicationConn2.DropReplicationSlot("pgx_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
if droppedLsn != "" {
t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn)
}
}
func TestReplicationConn_DropReplicationSlot(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
// We re-create to ensure the drop worked.
err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
// And finally we drop to ensure we don't leave dirty state
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
}
func TestIdentifySystem(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
r, err := replicationConn2.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
t.Error(r.Err())
}
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int {
r, err := rc.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for r.Next() {
values, e := r.Values()
if e != nil {
t.Error(e)
}
timeline, e := strconv.Atoi(values[1].(string))
if e != nil {
t.Error(e)
}
return timeline
}
t.Fatal("Failed to read timeline")
return -1
}
func TestGetTimelineHistory(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
timeline := getCurrentTimeline(t, replicationConn)
r, err := replicationConn.TimelineHistory(timeline)
if err != nil {
t.Errorf("%#v", err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
if strings.Contains(r.Err().Error(), "No such file or directory") {
// This is normal, this means the timeline we're on has no
// history, which is the common case in a test db that
// has only one timeline
return
}
t.Error(r.Err())
}
// If we have a timeline history (see above) there should have been
// rows emitted
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func TestStandbyStatusParsing(t *testing.T) {
// Let's push the boundary conditions of the standby status and ensure it errors correctly
status, err = pgx.NewStandbyStatus(0,1,2,3,4)
status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4)
if err == nil {
t.Errorf("Expected error from new standby status, got %v",status)
t.Errorf("Expected error from new standby status, got %v", status)
}
// And if you provide 3 args, ensure the right fields are set
status, err = pgx.NewStandbyStatus(1,2,3)
status, err = pgx.NewStandbyStatus(1, 2, 3)
if err != nil {
t.Errorf("Failed to create test status: %v", err)
}
@@ -175,21 +326,4 @@ func TestSimpleReplicationConnection(t *testing.T) {
if status.WalWritePosition != 3 {
t.Errorf("Unexpected write position %d", status.WalWritePosition)
}
err = replicationConn.Close()
if err != nil {
t.Fatalf("Replication connection close failed: %v", err)
}
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
if integerRestartLsn != maxWal {
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
}
_, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
}