Add SelectValueTo
This commit is contained in:
+103
@@ -278,6 +278,109 @@ func (c *Connection) SelectValue(sql string, arguments ...interface{}) (v interf
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectValueTo executes sql that returns a single value and writes that value to w.
|
||||||
|
// No type conversions will be performed. The raw bytes will be written directly to w.
|
||||||
|
// This is ideal for returning JSON, files, or large text values directly over HTTP.
|
||||||
|
|
||||||
|
// sql can be either a prepared statement name or an SQL string. arguments will be
|
||||||
|
// sanitized before being interpolated into sql strings. arguments should be
|
||||||
|
// referenced positionally from the sql string as $1, $2, etc.
|
||||||
|
//
|
||||||
|
// Returns a UnexpectedColumnCountError if exactly one column is not found
|
||||||
|
// Returns a NotSingleRowError if exactly one row is not found
|
||||||
|
func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
||||||
|
if err = c.sendQuery(sql, arguments...); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var numRowsFound int64
|
||||||
|
|
||||||
|
for {
|
||||||
|
if t, bodySize, rxErr := c.rxMsgHeader(); rxErr == nil {
|
||||||
|
if t == dataRow {
|
||||||
|
numRowsFound++
|
||||||
|
|
||||||
|
if numRowsFound > 1 {
|
||||||
|
err = NotSingleRowError{RowCount: numRowsFound}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
c.rxMsgBody(bodySize) // Read and discard rest of message
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.rxDataRowValueTo(w, bodySize)
|
||||||
|
} else {
|
||||||
|
var body *bytes.Buffer
|
||||||
|
if body, rxErr = c.rxMsgBody(bodySize); rxErr == nil {
|
||||||
|
r := newMessageReader(body)
|
||||||
|
switch t {
|
||||||
|
case readyForQuery:
|
||||||
|
c.rxReadyForQuery(r)
|
||||||
|
return
|
||||||
|
case rowDescription:
|
||||||
|
case commandComplete:
|
||||||
|
case bindComplete:
|
||||||
|
default:
|
||||||
|
if e := c.processContextFreeMsg(t, r); e != nil && err == nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return rxErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return rxErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Connection) rxDataRowValueTo(w io.Writer, bodySize int32) (err error) {
|
||||||
|
buf := c.getBuf()
|
||||||
|
if _, err = io.CopyN(buf, c.conn, 6); err != nil {
|
||||||
|
c.die(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var columnCount int16
|
||||||
|
err = binary.Read(buf, binary.BigEndian, &columnCount)
|
||||||
|
if err != nil {
|
||||||
|
c.die(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if columnCount != 1 {
|
||||||
|
// Read the rest of the data row so it can be discarded
|
||||||
|
if _, err = io.CopyN(buf, c.conn, int64(bodySize-6)); err != nil {
|
||||||
|
c.die(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = UnexpectedColumnCountError{ExpectedCount: 1, ActualCount: columnCount}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var valueSize int32
|
||||||
|
err = binary.Read(buf, binary.BigEndian, &valueSize)
|
||||||
|
if err != nil {
|
||||||
|
c.die(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if valueSize == -1 {
|
||||||
|
err = errors.New("SelectValueTo cannot handle null")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = io.CopyN(w, c.conn, int64(valueSize))
|
||||||
|
if err != nil {
|
||||||
|
c.die(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// SelectValues executes sql and returns a slice of values. sql can be either a prepared
|
// SelectValues executes sql and returns a slice of values. sql can be either a prepared
|
||||||
// statement name or an SQL string. arguments will be sanitized before being
|
// statement name or an SQL string. arguments will be sanitized before being
|
||||||
// interpolated into sql strings. arguments should be referenced positionally from
|
// interpolated into sql strings. arguments should be referenced positionally from
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package pgx
|
package pgx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -189,6 +190,17 @@ func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v in
|
|||||||
return c.SelectValue(sql, arguments...)
|
return c.SelectValue(sql, arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
|
func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
|
||||||
|
var c *Connection
|
||||||
|
if c, err = p.Acquire(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer p.Release(c)
|
||||||
|
|
||||||
|
return c.SelectValueTo(w, sql, arguments...)
|
||||||
|
}
|
||||||
|
|
||||||
// SelectValues acquires a connection, delegates the call to that connection, and releases the connection
|
// SelectValues acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
|
func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
|
||||||
var c *Connection
|
var c *Connection
|
||||||
|
|||||||
@@ -319,6 +319,53 @@ func TestConnectionSelectValue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConnectionSelectValueTo(t *testing.T) {
|
||||||
|
conn := getSharedConnection(t)
|
||||||
|
var err error
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := conn.SelectValueTo(&buf, "select '[1,2,3,4,5]'::json"); err != nil {
|
||||||
|
t.Fatalf("SelectValueTo failed: %v", err)
|
||||||
|
}
|
||||||
|
if bytes.Compare(buf.Bytes(), []byte("[1,2,3,4,5]")) != 0 {
|
||||||
|
t.Fatalf("SelectValueTo did not write expected data: %v", string(buf.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotSingleRowError
|
||||||
|
err = conn.SelectValueTo(&buf, "select * from (values ('Matthew'), ('Mark'), ('Luke')) t")
|
||||||
|
if _, ok := err.(pgx.NotSingleRowError); !ok {
|
||||||
|
t.Fatalf("Multiple matching rows should have returned NotSingleRowError: %#v", err)
|
||||||
|
}
|
||||||
|
if conn.IsAlive() {
|
||||||
|
mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable
|
||||||
|
} else {
|
||||||
|
t.Fatal("SelectValueTo NotSingleRowError should not have killed connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnexpectedColumnCountError
|
||||||
|
err = conn.SelectValueTo(&buf, "select * from (values ('Matthew', 'Mark', 'Luke')) t")
|
||||||
|
if _, ok := err.(pgx.UnexpectedColumnCountError); !ok {
|
||||||
|
t.Fatalf("Multiple matching rows should have returned UnexpectedColumnCountError: %#v", err)
|
||||||
|
}
|
||||||
|
if conn.IsAlive() {
|
||||||
|
mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable
|
||||||
|
} else {
|
||||||
|
t.Fatal("SelectValueTo UnexpectedColumnCountError should not have killed connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Null
|
||||||
|
err = conn.SelectValueTo(&buf, "select null")
|
||||||
|
if err == nil || err.Error() != "SelectValueTo cannot handle null" {
|
||||||
|
t.Fatalf("Expected null error: %#v", err)
|
||||||
|
}
|
||||||
|
if conn.IsAlive() {
|
||||||
|
mustSelectValue(t, conn, "select 1") // ensure it really is alive and usable
|
||||||
|
} else {
|
||||||
|
t.Fatal("SelectValueTo null error should not have killed connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestSelectValues(t *testing.T) {
|
func TestSelectValues(t *testing.T) {
|
||||||
conn := getSharedConnection(t)
|
conn := getSharedConnection(t)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user