2
0

Changed CreateReplicationSlot to return the consistent_point and snapshot_name.

This commit is contained in:
Mark Fletcher
2017-09-19 21:12:15 -07:00
parent fd7b776540
commit a1e4b1b9b5
2 changed files with 11 additions and 5 deletions
+8 -2
View File
@@ -435,8 +435,14 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti
} }
// Create the replication slot, using the given name and output plugin. // Create the replication slot, using the given name and output plugin.
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
_, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) var dummy string
var rows *Rows
rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
defer rows.Close()
for rows.Next() {
rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy)
}
return return
} }
+3 -3
View File
@@ -56,7 +56,7 @@ func TestSimpleReplicationConnection(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig) replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn) defer closeReplicationConn(t, replicationConn)
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") _, _, err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
if err != nil { if err != nil {
t.Fatalf("replication slot create failed: %v", err) t.Fatalf("replication slot create failed: %v", err)
} }
@@ -178,7 +178,7 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig) replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn) defer closeReplicationConn(t, replicationConn)
err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") _, _, err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil { if err != nil {
t.Logf("replication slot create failed: %v", err) t.Logf("replication slot create failed: %v", err)
} }
@@ -188,7 +188,7 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) {
} }
// We re-create to ensure the drop worked. // We re-create to ensure the drop worked.
err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") _, _, err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil { if err != nil {
t.Logf("replication slot create failed: %v", err) t.Logf("replication slot create failed: %v", err)
} }