diff --git a/conn_test.go b/conn_test.go index 147a925c..5256d328 100644 --- a/conn_test.go +++ b/conn_test.go @@ -907,6 +907,109 @@ func TestListenNotify(t *testing.T) { } } +func TestListenNotifyWhileBusyIsSafe(t *testing.T) { + t.Parallel() + + listenerDone := make(chan bool) + go func() { + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + defer func() { + listenerDone <- true + }() + + if err := conn.Listen("busysafe"); err != nil { + t.Fatalf("Unable to start listening: %v", err) + } + + for i := 0; i < 5000; i++ { + var sum int32 + var rowCount int32 + + rows, err := conn.Query("select generate_series(1,$1)", 100) + if err != nil { + t.Fatalf("conn.Query failed: ", err) + } + + for rows.Next() { + var n int32 + rows.Scan(&n) + sum += n + rowCount++ + } + + if rows.Err() != nil { + t.Fatalf("conn.Query failed: ", err) + } + + if sum != 5050 { + t.Fatalf("Wrong rows sum: %v", sum) + } + + if rowCount != 100 { + t.Fatalf("Wrong number of rows: %v", rowCount) + } + + time.Sleep(1 * time.Microsecond) + } + }() + + notifierDone := make(chan bool) + go func() { + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + defer func() { + notifierDone <- true + }() + + for i := 0; i < 100000; i++ { + mustExec(t, conn, "notify busysafe, 'hello'") + time.Sleep(1 * time.Microsecond) + } + }() + + <-listenerDone +} + +func TestListenNotifySelfNotification(t *testing.T) { + t.Parallel() + + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + + if err := conn.Listen("self"); err != nil { + t.Fatalf("Unable to start listening: %v", err) + } + + // Notify self and WaitForNotification immediately + mustExec(t, conn, "notify self") + + notification, err := conn.WaitForNotification(time.Second) + if err != nil { + t.Fatalf("Unexpected error on WaitForNotification: %v", err) + } + if notification.Channel != "self" { + t.Errorf("Did not receive notification on expected channel: %v", notification.Channel) + } + + // Notify self and do something else before WaitForNotification + mustExec(t, conn, "notify self") + + rows, _ := conn.Query("select 1") + rows.Close() + if rows.Err() != nil { + t.Fatalf("Unexpected error on Query: %v", rows.Err()) + } + + notification, err = conn.WaitForNotification(time.Second) + if err != nil { + t.Fatalf("Unexpected error on WaitForNotification: %v", err) + } + if notification.Channel != "self" { + t.Errorf("Did not receive notification on expected channel: %v", notification.Channel) + } +} + func TestFatalRxError(t *testing.T) { t.Parallel()