Add additional testing around listen/notify
This commit is contained in:
+103
@@ -907,6 +907,109 @@ func TestListenNotify(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
listenerDone := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
|
defer closeConn(t, conn)
|
||||||
|
defer func() {
|
||||||
|
listenerDone <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := conn.Listen("busysafe"); err != nil {
|
||||||
|
t.Fatalf("Unable to start listening: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
var sum int32
|
||||||
|
var rowCount int32
|
||||||
|
|
||||||
|
rows, err := conn.Query("select generate_series(1,$1)", 100)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("conn.Query failed: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var n int32
|
||||||
|
rows.Scan(&n)
|
||||||
|
sum += n
|
||||||
|
rowCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if rows.Err() != nil {
|
||||||
|
t.Fatalf("conn.Query failed: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sum != 5050 {
|
||||||
|
t.Fatalf("Wrong rows sum: %v", sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowCount != 100 {
|
||||||
|
t.Fatalf("Wrong number of rows: %v", rowCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
notifierDone := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
|
defer closeConn(t, conn)
|
||||||
|
defer func() {
|
||||||
|
notifierDone <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := 0; i < 100000; i++ {
|
||||||
|
mustExec(t, conn, "notify busysafe, 'hello'")
|
||||||
|
time.Sleep(1 * time.Microsecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-listenerDone
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListenNotifySelfNotification(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
|
defer closeConn(t, conn)
|
||||||
|
|
||||||
|
if err := conn.Listen("self"); err != nil {
|
||||||
|
t.Fatalf("Unable to start listening: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify self and WaitForNotification immediately
|
||||||
|
mustExec(t, conn, "notify self")
|
||||||
|
|
||||||
|
notification, err := conn.WaitForNotification(time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error on WaitForNotification: %v", err)
|
||||||
|
}
|
||||||
|
if notification.Channel != "self" {
|
||||||
|
t.Errorf("Did not receive notification on expected channel: %v", notification.Channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify self and do something else before WaitForNotification
|
||||||
|
mustExec(t, conn, "notify self")
|
||||||
|
|
||||||
|
rows, _ := conn.Query("select 1")
|
||||||
|
rows.Close()
|
||||||
|
if rows.Err() != nil {
|
||||||
|
t.Fatalf("Unexpected error on Query: %v", rows.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
notification, err = conn.WaitForNotification(time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error on WaitForNotification: %v", err)
|
||||||
|
}
|
||||||
|
if notification.Channel != "self" {
|
||||||
|
t.Errorf("Did not receive notification on expected channel: %v", notification.Channel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFatalRxError(t *testing.T) {
|
func TestFatalRxError(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user