From fb90fb27295133aa4dc9cf597be0e78fa00b2e84 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sun, 4 Jun 2017 21:18:26 -0500 Subject: [PATCH] Add notification response hook refs #239 --- conn.go | 41 +++++++++++++++++++++++++++++++++++++++++ conn_test.go | 27 +++++++++++++++++++++++++++ messages.go | 4 ++++ v3.md | 2 ++ 4 files changed, 74 insertions(+) diff --git a/conn.go b/conn.go index 223808c5..20a56807 100644 --- a/conn.go +++ b/conn.go @@ -47,6 +47,13 @@ func init() { }) } +// NoticeHandler is a function that can handle notices received from the +// PostgreSQL server. Notices can be received at any time, usually during +// handling of a query response. The *Conn is provided so the handler is aware +// of the origin of the notice, but it must not invoke any query method. Be +// aware that this is distinct from LISTEN/NOTIFY notification. +type NoticeHandler func(*Conn, *Notice) + // DialFunc is a function that can be used to connect to a PostgreSQL server type DialFunc func(network, addr string) (net.Conn, error) @@ -64,6 +71,7 @@ type ConnConfig struct { LogLevel int Dial DialFunc RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) + OnNotice NoticeHandler // Callback function called when a notice response is received. } func (cc *ConnConfig) networkAddress() (network, address string) { @@ -102,6 +110,7 @@ type Conn struct { fp *fastpath poolResetCount int preallocatedRows []Rows + onNotice NoticeHandler mux sync.Mutex status byte // One of connStatus* constants @@ -235,6 +244,8 @@ func connect(config ConnConfig, connInfo *pgtype.ConnInfo) (c *Conn, err error) } } + c.onNotice = config.OnNotice + network, address := c.config.networkAddress() if c.config.Dial == nil { c.config.Dial = (&net.Dialer{KeepAlive: 5 * time.Minute}).Dial @@ -1079,6 +1090,8 @@ func (c *Conn) processContextFreeMsg(msg pgproto3.BackendMessage) (err error) { switch msg := msg.(type) { case *pgproto3.ErrorResponse: return c.rxErrorResponse(msg) + case *pgproto3.NoticeResponse: + c.rxNoticeResponse(msg) case *pgproto3.NotificationResponse: c.rxNotificationResponse(msg) case *pgproto3.ReadyForQuery: @@ -1163,6 +1176,34 @@ func (c *Conn) rxErrorResponse(msg *pgproto3.ErrorResponse) PgError { return err } +func (c *Conn) rxNoticeResponse(msg *pgproto3.NoticeResponse) { + if c.onNotice == nil { + return + } + + notice := &Notice{ + Severity: msg.Severity, + Code: msg.Code, + Message: msg.Message, + Detail: msg.Detail, + Hint: msg.Hint, + Position: msg.Position, + InternalPosition: msg.InternalPosition, + InternalQuery: msg.InternalQuery, + Where: msg.Where, + SchemaName: msg.SchemaName, + TableName: msg.TableName, + ColumnName: msg.ColumnName, + DataTypeName: msg.DataTypeName, + ConstraintName: msg.ConstraintName, + File: msg.File, + Line: msg.Line, + Routine: msg.Routine, + } + + c.onNotice(c, notice) +} + func (c *Conn) rxBackendKeyData(msg *pgproto3.BackendKeyData) { c.pid = msg.ProcessID c.secretKey = msg.SecretKey diff --git a/conn_test.go b/conn_test.go index 8ec3c131..d9369a1a 100644 --- a/conn_test.go +++ b/conn_test.go @@ -1898,3 +1898,30 @@ func TestIdentifierSanitize(t *testing.T) { } } } + +func TestConnOnNotice(t *testing.T) { + t.Parallel() + + var msg string + + connConfig := *defaultConnConfig + connConfig.OnNotice = func(c *pgx.Conn, notice *pgx.Notice) { + msg = notice.Message + } + conn := mustConnect(t, connConfig) + defer closeConn(t, conn) + + _, err := conn.Exec(`do $$ +begin + raise notice 'hello, world'; +end$$;`) + if err != nil { + t.Fatal(err) + } + + if msg != "hello, world" { + t.Errorf("msg => %v, want %v", msg, "hello, world") + } + + ensureConnValid(t, conn) +} diff --git a/messages.go b/messages.go index 841aa286..53a5a67c 100644 --- a/messages.go +++ b/messages.go @@ -49,6 +49,10 @@ func (pe PgError) Error() string { return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" } +// Notice represents a notice response message reported by the PostgreSQL +// server. Be aware that this is distinct from LISTEN/NOTIFY notification. +type Notice PgError + // appendParse appends a PostgreSQL wire protocol parse message to buf and returns it. func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte { buf = append(buf, 'P') diff --git a/v3.md b/v3.md index a30ca474..993f9e24 100644 --- a/v3.md +++ b/v3.md @@ -54,6 +54,8 @@ Added batch operations Use Go casing convention for OID, UUID, JSON(B), ACLItem, CID, TID, XID, and CIDR +Add OnNotice + ## TODO / Possible / Investigate Organize errors better