diff --git a/connection.go b/connection.go index f55f5602..0a1d04b7 100644 --- a/connection.go +++ b/connection.go @@ -278,6 +278,109 @@ func (c *Connection) SelectValue(sql string, arguments ...interface{}) (v interf return } +// SelectValueTo executes sql that returns a single value and writes that value to w. +// No type conversions will be performed. The raw bytes will be written directly to w. +// This is ideal for returning JSON, files, or large text values directly over HTTP. + +// sql can be either a prepared statement name or an SQL string. arguments will be +// sanitized before being interpolated into sql strings. arguments should be +// referenced positionally from the sql string as $1, $2, etc. +// +// Returns a UnexpectedColumnCountError if exactly one column is not found +// Returns a NotSingleRowError if exactly one row is not found +func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) { + if err = c.sendQuery(sql, arguments...); err != nil { + return + } + + var numRowsFound int64 + + for { + if t, bodySize, rxErr := c.rxMsgHeader(); rxErr == nil { + if t == dataRow { + numRowsFound++ + + if numRowsFound > 1 { + err = NotSingleRowError{RowCount: numRowsFound} + } + + if err != nil { + c.rxMsgBody(bodySize) // Read and discard rest of message + continue + } + + err = c.rxDataRowValueTo(w, bodySize) + } else { + var body *bytes.Buffer + if body, rxErr = c.rxMsgBody(bodySize); rxErr == nil { + r := newMessageReader(body) + switch t { + case readyForQuery: + c.rxReadyForQuery(r) + return + case rowDescription: + case commandComplete: + case bindComplete: + default: + if e := c.processContextFreeMsg(t, r); e != nil && err == nil { + err = e + } + } + } else { + return rxErr + } + } + } else { + return rxErr + } + } + return +} + +func (c *Connection) rxDataRowValueTo(w io.Writer, bodySize int32) (err error) { + buf := c.getBuf() + if _, err = io.CopyN(buf, c.conn, 6); err != nil { + c.die(err) + return + } + + var columnCount int16 + err = binary.Read(buf, binary.BigEndian, &columnCount) + if err != nil { + c.die(err) + return + } + + if columnCount != 1 { + // Read the rest of the data row so it can be discarded + if _, err = io.CopyN(buf, c.conn, int64(bodySize-6)); err != nil { + c.die(err) + return + } + err = UnexpectedColumnCountError{ExpectedCount: 1, ActualCount: columnCount} + return + } + + var valueSize int32 + err = binary.Read(buf, binary.BigEndian, &valueSize) + if err != nil { + c.die(err) + return + } + + if valueSize == -1 { + err = errors.New("SelectValueTo cannot handle null") + return + } + + _, err = io.CopyN(w, c.conn, int64(valueSize)) + if err != nil { + c.die(err) + } + + return +} + // SelectValues executes sql and returns a slice of values. sql can be either a prepared // statement name or an SQL string. arguments will be sanitized before being // interpolated into sql strings. arguments should be referenced positionally from diff --git a/connection_pool.go b/connection_pool.go index 06fedfda..9b8e50f3 100644 --- a/connection_pool.go +++ b/connection_pool.go @@ -1,6 +1,7 @@ package pgx import ( + "io" "sync" ) @@ -189,6 +190,17 @@ func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v in return c.SelectValue(sql, arguments...) } +// SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection +func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) { + var c *Connection + if c, err = p.Acquire(); err != nil { + return + } + defer p.Release(c) + + return c.SelectValueTo(w, sql, arguments...) +} + // SelectValues acquires a connection, delegates the call to that connection, and releases the connection func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) { var c *Connection diff --git a/connection_test.go b/connection_test.go index 10ce147a..f87f69a2 100644 --- a/connection_test.go +++ b/connection_test.go @@ -319,6 +319,53 @@ func TestConnectionSelectValue(t *testing.T) { } } +func TestConnectionSelectValueTo(t *testing.T) { + conn := getSharedConnection(t) + var err error + + var buf bytes.Buffer + if err := conn.SelectValueTo(&buf, "select '[1,2,3,4,5]'::json"); err != nil { + t.Fatalf("SelectValueTo failed: %v", err) + } + if bytes.Compare(buf.Bytes(), []byte("[1,2,3,4,5]")) != 0 { + t.Fatalf("SelectValueTo did not write expected data: %v", string(buf.Bytes())) + } + + // NotSingleRowError + err = conn.SelectValueTo(&buf, "select * from (values ('Matthew'), ('Mark'), ('Luke')) t") + if _, ok := err.(pgx.NotSingleRowError); !ok { + t.Fatalf("Multiple matching rows should have returned NotSingleRowError: %#v", err) + } + if conn.IsAlive() { + mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable + } else { + t.Fatal("SelectValueTo NotSingleRowError should not have killed connection") + } + + // UnexpectedColumnCountError + err = conn.SelectValueTo(&buf, "select * from (values ('Matthew', 'Mark', 'Luke')) t") + if _, ok := err.(pgx.UnexpectedColumnCountError); !ok { + t.Fatalf("Multiple matching rows should have returned UnexpectedColumnCountError: %#v", err) + } + if conn.IsAlive() { + mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable + } else { + t.Fatal("SelectValueTo UnexpectedColumnCountError should not have killed connection") + } + + // Null + err = conn.SelectValueTo(&buf, "select null") + if err == nil || err.Error() != "SelectValueTo cannot handle null" { + t.Fatalf("Expected null error: %#v", err) + } + if conn.IsAlive() { + mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable + } else { + t.Fatal("SelectValueTo null error should not have killed connection") + } + +} + func TestSelectValues(t *testing.T) { conn := getSharedConnection(t)