Add basic logical replication protocol support
This commit is contained in:
+272
@@ -0,0 +1,272 @@
|
||||
package pgx
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
copyBothResponse = 'W'
|
||||
walData = 'w'
|
||||
senderKeepalive = 'k'
|
||||
standbyStatusUpdate = 'r'
|
||||
)
|
||||
|
||||
var epochNano int64
|
||||
|
||||
func init() {
|
||||
epochNano = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
||||
}
|
||||
|
||||
// Format the given 64bit LSN value into the XXX/XXX format,
|
||||
// which is the format reported by postgres.
|
||||
func FormatLsn(lsn int64) string {
|
||||
return fmt.Sprintf("%X/%X", lsn>>32, int32(lsn))
|
||||
}
|
||||
|
||||
// Parse the given XXX/XXX format LSN as reported by postgres,
|
||||
// into a 64 bit integer as used internally by the wire procotols
|
||||
func ParseLsn(lsn string) (outputLsn int64, err error) {
|
||||
var upperHalf int64
|
||||
var lowerHalf int64
|
||||
var nparsed int
|
||||
nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if nparsed != 2 {
|
||||
err = errors.New(fmt.Sprintf("Failed to parsed LSN: %s", lsn))
|
||||
return
|
||||
}
|
||||
|
||||
outputLsn = (upperHalf << 32) + lowerHalf
|
||||
return
|
||||
}
|
||||
|
||||
// The WAL message contains WAL payload entry data
|
||||
type WalMessage struct {
|
||||
// The WAL start position of this data. This
|
||||
// is the WAL position we need to track.
|
||||
WalStart int64
|
||||
// The server wal end and server time are
|
||||
// documented to track the end position and current
|
||||
// time of the server, both of which appear to be
|
||||
// unimplemented in pg 9.5.
|
||||
ServerWalEnd int64
|
||||
ServerTime int64
|
||||
// The WAL data is the raw unparsed binary WAL entry.
|
||||
// The contents of this are determined by the output
|
||||
// logical encoding plugin.
|
||||
WalData []byte
|
||||
}
|
||||
|
||||
func (w *WalMessage) Time() time.Time {
|
||||
return time.Unix(0, (w.ServerTime*1000)+epochNano)
|
||||
}
|
||||
|
||||
func (w *WalMessage) ByteLag() int64 {
|
||||
return (w.ServerWalEnd - w.WalStart)
|
||||
}
|
||||
|
||||
func (w *WalMessage) String() string {
|
||||
return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLsn(w.WalStart), w.Time(), w.ByteLag())
|
||||
}
|
||||
|
||||
// The server heartbeat is sent periodically from the server,
|
||||
// including server status, and a reply request field
|
||||
type ServerHeartbeat struct {
|
||||
// The current max wal position on the server,
|
||||
// used for lag tracking
|
||||
ServerWalEnd int64
|
||||
// The server time, in microseconds since jan 1 2000
|
||||
ServerTime int64
|
||||
// If 1, the server is requesting a standby status message
|
||||
// to be sent immediately.
|
||||
ReplyRequested byte
|
||||
}
|
||||
|
||||
func (s *ServerHeartbeat) Time() time.Time {
|
||||
return time.Unix(0, (s.ServerTime*1000)+epochNano)
|
||||
}
|
||||
|
||||
func (s *ServerHeartbeat) String() string {
|
||||
return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLsn(s.ServerWalEnd), s.ReplyRequested, s.Time())
|
||||
}
|
||||
|
||||
// The replication message wraps all possible messages from the
|
||||
// server received during replication. At most one of the wal message
|
||||
// or server heartbeat will be non-nil
|
||||
type ReplicationMessage struct {
|
||||
WalMessage *WalMessage
|
||||
ServerHeartbeat *ServerHeartbeat
|
||||
}
|
||||
|
||||
// The standby status is the client side heartbeat sent to the postgresql
|
||||
// server to track the client wal positions. For practical purposes,
|
||||
// all wal positions are typically set to the same value.
|
||||
type StandbyStatus struct {
|
||||
// The WAL position that's been locally written
|
||||
WalWritePosition int64
|
||||
// The WAL position that's been locally flushed
|
||||
WalFlushPosition int64
|
||||
// The WAL position that's been locally applied
|
||||
WalApplyPosition int64
|
||||
// The client time in microseconds since jan 1 2000
|
||||
ClientTime int64
|
||||
// If 1, requests the server to immediately send a
|
||||
// server heartbeat
|
||||
ReplyRequested byte
|
||||
}
|
||||
|
||||
// Create a standby status struct, which sets all the WAL positions
|
||||
// to the given wal position, and the client time to the current time.
|
||||
func NewStandbyStatus(walPosition int64) (status *StandbyStatus) {
|
||||
status = new(StandbyStatus)
|
||||
status.WalFlushPosition = walPosition
|
||||
status.WalApplyPosition = walPosition
|
||||
status.WalWritePosition = walPosition
|
||||
status.ClientTime = (time.Now().UnixNano() - epochNano) / 1000
|
||||
return
|
||||
}
|
||||
|
||||
// Send standby status to the server, which both acts as a keepalive
|
||||
// message to the server, as well as carries the WAL position of the
|
||||
// client, which then updates the server's replication slot position.
|
||||
func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
||||
writeBuf := newWriteBuf(c, copyData)
|
||||
writeBuf.WriteByte(standbyStatusUpdate)
|
||||
writeBuf.WriteInt64(k.WalWritePosition)
|
||||
writeBuf.WriteInt64(k.WalFlushPosition)
|
||||
writeBuf.WriteInt64(k.WalApplyPosition)
|
||||
writeBuf.WriteInt64(k.ClientTime)
|
||||
writeBuf.WriteByte(k.ReplyRequested)
|
||||
|
||||
writeBuf.closeMsg()
|
||||
|
||||
_, err = c.conn.Write(writeBuf.buf)
|
||||
if err != nil {
|
||||
fmt.Printf("Error sending standby status %v\n", err)
|
||||
c.die(err)
|
||||
}
|
||||
fmt.Printf("Write complete, wal position is %s\n", FormatLsn(k.WalApplyPosition))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
||||
var t byte
|
||||
var reader *msgReader
|
||||
t, reader, err = c.rxMsg()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch t {
|
||||
case noticeResponse:
|
||||
pgError := c.rxErrorResponse(reader)
|
||||
if c.shouldLog(LogLevelInfo) {
|
||||
c.log(LogLevelInfo, pgError.Error())
|
||||
}
|
||||
case errorResponse:
|
||||
err = c.rxErrorResponse(reader)
|
||||
if c.shouldLog(LogLevelError) {
|
||||
c.log(LogLevelError, err.Error())
|
||||
}
|
||||
return
|
||||
case copyBothResponse:
|
||||
// This is the tail end of the replication process start,
|
||||
// and can be safely ignored
|
||||
return
|
||||
case copyData:
|
||||
var msgType byte
|
||||
msgType = reader.readByte()
|
||||
switch msgType {
|
||||
case walData:
|
||||
walStart := reader.readInt64()
|
||||
serverWalEnd := reader.readInt64()
|
||||
serverTime := reader.readInt64()
|
||||
walData := reader.readBytes(reader.msgBytesRemaining)
|
||||
walMessage := WalMessage{WalStart: walStart,
|
||||
ServerWalEnd: serverWalEnd,
|
||||
ServerTime: serverTime,
|
||||
WalData: walData,
|
||||
}
|
||||
|
||||
return &ReplicationMessage{WalMessage: &walMessage}, nil
|
||||
case senderKeepalive:
|
||||
serverWalEnd := reader.readInt64()
|
||||
serverTime := reader.readInt64()
|
||||
replyNow := reader.readByte()
|
||||
h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow}
|
||||
return &ReplicationMessage{ServerHeartbeat: h}, nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for a single replication message up to timeout time.
|
||||
//
|
||||
// Properly using this requires some knowledge of the postgres replication mechanisms,
|
||||
// as the client can receive both WAL data (the ultimate payload) and server heartbeat
|
||||
// updates. The caller also must send standby status updates in order to keep the connection
|
||||
// alive and working.
|
||||
//
|
||||
// There is also a condition (during startup) which can cause both the replication message
|
||||
// to return as nil as well as the error, which is a normal part of the replication protocol
|
||||
// startup. It's important the client correctly handle (ignore) this scenario.
|
||||
func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
|
||||
var zeroTime time.Time
|
||||
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
// Use SetReadDeadline to implement the timeout. SetReadDeadline will
|
||||
// cause operations to fail with a *net.OpError that has a Timeout()
|
||||
// of true. Because the normal pgx rxMsg path considers any error to
|
||||
// have potentially corrupted the state of the connection, it dies
|
||||
// on any errors. So to avoid timeout errors in rxMsg we set the
|
||||
// deadline and peek into the reader. If a timeout error occurs there
|
||||
// we don't break the pgx connection. If the Peek returns that data
|
||||
// is available then we turn off the read deadline before the rxMsg.
|
||||
err = c.conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wait until there is a byte available before continuing onto the normal msg reading path
|
||||
_, err = c.reader.Peek(1)
|
||||
if err != nil {
|
||||
c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
|
||||
if err, ok := err.(*net.OpError); ok && err.Timeout() {
|
||||
return nil, ErrNotificationTimeout
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.conn.SetReadDeadline(zeroTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.readReplicationMessage()
|
||||
}
|
||||
|
||||
// Start a replication connection, sending WAL data to the given replication
|
||||
// receiver. The sql string here should be a "START_REPLICATION" command, as
|
||||
// per the postgresql docs here:
|
||||
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
|
||||
//
|
||||
// A typical query would look like:
|
||||
// START_REPLICATION SLOT t LOGICAL test_decoder 0/0
|
||||
//
|
||||
// Once started, the client needs to invoke WaitForReplicationMessage() in order
|
||||
// to fetch the WAL and standby status. Also, it is the responsibility of the caller
|
||||
// to periodically send StandbyStatus messages to update the replication slot position.
|
||||
func (c *Conn) StartReplication(sql string, arguments ...interface{}) (err error) {
|
||||
if err = c.sendQuery(sql, arguments...); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user