2
0
Files
pgx/replication_test.go
T
Jack Christensen 11b82b3ca4 msgReader implemented in terms of ChunkReader
This should substantially reduce memory allocations and memory copies.

It also means that PostgreSQL messages are always entirely buffered in memory
before processing begins. This simplifies the message processing code.

In particular, Conn.WaitForNotification is dramatically simplified by this
change.
2017-02-13 20:45:42 -06:00

330 lines
9.0 KiB
Go

package pgx_test
import (
"context"
"fmt"
"github.com/jackc/pgx"
"reflect"
"strconv"
"strings"
"testing"
"time"
)
// This function uses a postgresql 9.6 specific column
func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string {
// Fetch the restart LSN of the slot, to establish a starting point
rows, err := conn.Query(fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot))
if err != nil {
t.Fatalf("conn.Query failed: %v", err)
}
defer rows.Close()
var restartLsn string
for rows.Next() {
rows.Scan(&restartLsn)
}
return restartLsn
}
// This battleship test (at least somewhat by necessity) does
// several things all at once in a single run. It:
// - Establishes a replication connection & slot
// - Does a series of operations to create some known WAL entries
// - Replicates the entries down, and checks that the rows it
// created come down in order
// - Sends a standby status message to update the server with the
// wal position of the slot
// - Checks the wal position of the slot on the server to make sure
// the update succeeded
func TestSimpleReplicationConnection(t *testing.T) {
t.Parallel()
var err error
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
conn := mustConnect(t, *replicationConnConfig)
defer closeConn(t, conn)
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
// Do a simple change so we can get some wal data
_, err = conn.Exec("create table if not exists replication_test (a integer)")
if err != nil {
t.Fatalf("Failed to create table: %v", err)
}
err = replicationConn.StartReplication("pgx_test", 0, -1)
if err != nil {
t.Fatalf("Failed to start replication: %v", err)
}
var i int32
var insertedTimes []int64
for i < 5 {
var ct pgx.CommandTag
currentTime := time.Now().Unix()
insertedTimes = append(insertedTimes, currentTime)
ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
t.Logf("Inserted %d rows", ct.RowsAffected())
i++
}
i = 0
var foundTimes []int64
var foundCount int
var maxWal uint64
for {
var message *pgx.ReplicationMessage
ctx, _ := context.WithTimeout(context.Background(), time.Second)
message, err = replicationConn.WaitForReplicationMessage(ctx)
if err != nil && err != context.DeadlineExceeded {
t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err))
}
if message != nil {
if message.WalMessage != nil {
// The waldata payload with the test_decoding plugin looks like:
// public.replication_test: INSERT: a[integer]:2
// What we wanna do here is check that once we find one of our inserted times,
// that they occur in the wal stream in the order we executed them.
walString := string(message.WalMessage.WalData)
if strings.Contains(walString, "public.replication_test: INSERT") {
stringParts := strings.Split(walString, ":")
offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64)
if err != nil {
t.Fatalf("Failed to parse walString %s", walString)
}
if foundCount > 0 || offset == insertedTimes[0] {
foundTimes = append(foundTimes, offset)
foundCount++
}
}
if message.WalMessage.WalStart > maxWal {
maxWal = message.WalMessage.WalStart
}
}
if message.ServerHeartbeat != nil {
t.Logf("Got heartbeat: %s", message.ServerHeartbeat)
}
} else {
t.Log("Timed out waiting for wal message")
i++
}
if i > 3 {
t.Log("Actual timeout")
break
}
}
if foundCount != len(insertedTimes) {
t.Fatalf("Failed to find all inserted time values in WAL stream (found %d expected %d)", foundCount, len(insertedTimes))
}
for i := range insertedTimes {
if foundTimes[i] != insertedTimes[i] {
t.Fatalf("Found %d expected %d", foundTimes[i], insertedTimes[i])
}
}
t.Logf("Found %d times, as expected", len(foundTimes))
// Before closing our connection, let's send a standby status to update our wal
// position, which should then be reflected if we fetch out our current wal position
// for the slot
status, err := pgx.NewStandbyStatus(maxWal)
if err != nil {
t.Errorf("Failed to create standby status %v", err)
}
replicationConn.SendStandbyStatus(status)
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
if integerRestartLsn != maxWal {
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
}
closeReplicationConn(t, replicationConn)
replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
err = replicationConn2.DropReplicationSlot("pgx_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
if droppedLsn != "" {
t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn)
}
}
func TestReplicationConn_DropReplicationSlot(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
// We re-create to ensure the drop worked.
err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
// And finally we drop to ensure we don't leave dirty state
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
}
func TestIdentifySystem(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
r, err := replicationConn2.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
t.Error(r.Err())
}
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int {
r, err := rc.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for r.Next() {
values, e := r.Values()
if e != nil {
t.Error(e)
}
timeline, e := strconv.Atoi(values[1].(string))
if e != nil {
t.Error(e)
}
return timeline
}
t.Fatal("Failed to read timeline")
return -1
}
func TestGetTimelineHistory(t *testing.T) {
if replicationConnConfig == nil {
t.Skip("Skipping due to undefined replicationConnConfig")
}
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
timeline := getCurrentTimeline(t, replicationConn)
r, err := replicationConn.TimelineHistory(timeline)
if err != nil {
t.Errorf("%#v", err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
if strings.Contains(r.Err().Error(), "No such file or directory") {
// This is normal, this means the timeline we're on has no
// history, which is the common case in a test db that
// has only one timeline
return
}
t.Error(r.Err())
}
// If we have a timeline history (see above) there should have been
// rows emitted
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func TestStandbyStatusParsing(t *testing.T) {
// Let's push the boundary conditions of the standby status and ensure it errors correctly
status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4)
if err == nil {
t.Errorf("Expected error from new standby status, got %v", status)
}
// And if you provide 3 args, ensure the right fields are set
status, err = pgx.NewStandbyStatus(1, 2, 3)
if err != nil {
t.Errorf("Failed to create test status: %v", err)
}
if status.WalFlushPosition != 1 {
t.Errorf("Unexpected flush position %d", status.WalFlushPosition)
}
if status.WalApplyPosition != 2 {
t.Errorf("Unexpected apply position %d", status.WalApplyPosition)
}
if status.WalWritePosition != 3 {
t.Errorf("Unexpected write position %d", status.WalWritePosition)
}
}