2
0

Rows and Row are now interfaces

This commit is contained in:
Jack Christensen
2019-04-11 17:53:52 -05:00
parent 5ea8191003
commit 938ee9f434
11 changed files with 117 additions and 87 deletions
+3 -3
View File
@@ -144,7 +144,7 @@ func (b *Batch) ExecResults() (pgconn.CommandTag, error) {
// QueryResults reads the results from the next query in the batch as if the // QueryResults reads the results from the next query in the batch as if the
// query has been sent with Query. // query has been sent with Query.
func (b *Batch) QueryResults() (*Rows, error) { func (b *Batch) QueryResults() (Rows, error) {
rows := b.conn.getRows("batch query", nil) rows := b.conn.getRows("batch query", nil)
if !b.mrr.NextResult() { if !b.mrr.NextResult() {
@@ -162,9 +162,9 @@ func (b *Batch) QueryResults() (*Rows, error) {
// QueryRowResults reads the results from the next query in the batch as if the // QueryRowResults reads the results from the next query in the batch as if the
// query has been sent with QueryRow. // query has been sent with QueryRow.
func (b *Batch) QueryRowResults() *Row { func (b *Batch) QueryRowResults() Row {
rows, _ := b.QueryResults() rows, _ := b.QueryResults()
return (*Row)(rows) return (*connRow)(rows.(*connRows))
} }
+2 -2
View File
@@ -70,7 +70,7 @@ type Conn struct {
logLevel LogLevel logLevel LogLevel
fp *fastpath fp *fastpath
poolResetCount int poolResetCount int
preallocatedRows []Rows preallocatedRows []connRows
mux sync.Mutex mux sync.Mutex
status byte // One of connStatus* constants status byte // One of connStatus* constants
@@ -681,7 +681,7 @@ func (c *Conn) Ping(ctx context.Context) error {
return err return err
} }
func connInfoFromRows(rows *Rows, err error) (map[string]pgtype.OID, error) { func connInfoFromRows(rows Rows, err error) (map[string]pgtype.OID, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
+1 -1
View File
@@ -25,7 +25,7 @@ func (f *fastpath) addFunction(name string, oid pgtype.OID) {
f.fns[name] = oid f.fns[name] = oid
} }
func (f *fastpath) addFunctions(rows *Rows) error { func (f *fastpath) addFunctions(rows Rows) error {
for rows.Next() { for rows.Next() {
var name string var name string
var oid pgtype.OID var oid pgtype.OID
+3 -3
View File
@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/jackc/pgconn" "github.com/jackc/pgconn"
"github.com/jackc/pgx/pool" "github.com/jackc/pgx"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -37,7 +37,7 @@ func testExec(t *testing.T, db execer) {
} }
type queryer interface { type queryer interface {
Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*pool.Rows, error) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error)
} }
func testQuery(t *testing.T, db queryer) { func testQuery(t *testing.T, db queryer) {
@@ -59,7 +59,7 @@ func testQuery(t *testing.T, db queryer) {
} }
type queryRower interface { type queryRower interface {
QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *pool.Row QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row
} }
func testQueryRow(t *testing.T, db queryRower) { func testQueryRow(t *testing.T, db queryRower) {
+6 -10
View File
@@ -50,23 +50,19 @@ func (c *Conn) Release() {
} }
func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
conn := c.res.Value().(*pgx.Conn) return c.Conn().Exec(ctx, sql, arguments...)
return conn.Exec(ctx, sql, arguments...)
} }
func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*Rows, error) { func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) {
r, err := c.res.Value().(*pgx.Conn).Query(ctx, sql, optionsAndArgs...) return c.Conn().Query(ctx, sql, optionsAndArgs...)
rows := &Rows{r: r, err: err}
return rows, err
} }
func (c *Conn) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *Row { func (c *Conn) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row {
r := c.res.Value().(*pgx.Conn).QueryRow(ctx, sql, optionsAndArgs...) return c.Conn().QueryRow(ctx, sql, optionsAndArgs...)
return &Row{r: r}
} }
func (c *Conn) Begin() (*pgx.Tx, error) { func (c *Conn) Begin() (*pgx.Tx, error) {
return c.res.Value().(*pgx.Conn).Begin() return c.Conn().Begin()
} }
func (c *Conn) Conn() *pgx.Conn { func (c *Conn) Conn() *pgx.Conn {
+8 -10
View File
@@ -68,31 +68,29 @@ func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (
return c.Exec(ctx, sql, arguments...) return c.Exec(ctx, sql, arguments...)
} }
func (p *Pool) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*Rows, error) { func (p *Pool) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) {
c, err := p.Acquire(ctx) c, err := p.Acquire(ctx)
if err != nil { if err != nil {
return &Rows{err: err}, err return errRows{err: err}, err
} }
rows, err := c.Query(ctx, sql, optionsAndArgs...) rows, err := c.Query(ctx, sql, optionsAndArgs...)
if err == nil { if err != nil {
rows.c = c
} else {
c.Release() c.Release()
return errRows{err: err}, err
} }
return rows, err return &poolRows{r: rows, c: c}, nil
} }
func (p *Pool) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *Row { func (p *Pool) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row {
c, err := p.Acquire(ctx) c, err := p.Acquire(ctx)
if err != nil { if err != nil {
return &Row{err: err} return errRow{err: err}
} }
row := c.QueryRow(ctx, sql, optionsAndArgs...) row := c.QueryRow(ctx, sql, optionsAndArgs...)
row.c = c return &poolRow{r: row, c: c}
return row
} }
func (p *Pool) Begin() (*Tx, error) { func (p *Pool) Begin() (*Tx, error) {
+28 -11
View File
@@ -4,13 +4,30 @@ import (
"github.com/jackc/pgx" "github.com/jackc/pgx"
) )
type Rows struct { type errRows struct {
r *pgx.Rows err error
}
func (errRows) Close() {}
func (e errRows) Err() error { return e.err }
func (errRows) FieldDescriptions() []pgx.FieldDescription { return nil }
func (errRows) Next() bool { return false }
func (e errRows) Scan(dest ...interface{}) error { return e.err }
func (e errRows) Values() ([]interface{}, error) { return nil, e.err }
type errRow struct {
err error
}
func (e errRow) Scan(dest ...interface{}) error { return e.err }
type poolRows struct {
r pgx.Rows
c *Conn c *Conn
err error err error
} }
func (rows *Rows) Close() { func (rows *poolRows) Close() {
rows.r.Close() rows.r.Close()
if rows.c != nil { if rows.c != nil {
rows.c.Release() rows.c.Release()
@@ -18,18 +35,18 @@ func (rows *Rows) Close() {
} }
} }
func (rows *Rows) Err() error { func (rows *poolRows) Err() error {
if rows.err != nil { if rows.err != nil {
return rows.err return rows.err
} }
return rows.r.Err() return rows.r.Err()
} }
func (rows *Rows) FieldDescriptions() []pgx.FieldDescription { func (rows *poolRows) FieldDescriptions() []pgx.FieldDescription {
return rows.r.FieldDescriptions() return rows.r.FieldDescriptions()
} }
func (rows *Rows) Next() bool { func (rows *poolRows) Next() bool {
if rows.err != nil { if rows.err != nil {
return false return false
} }
@@ -41,7 +58,7 @@ func (rows *Rows) Next() bool {
return n return n
} }
func (rows *Rows) Scan(dest ...interface{}) error { func (rows *poolRows) Scan(dest ...interface{}) error {
err := rows.r.Scan(dest...) err := rows.r.Scan(dest...)
if err != nil { if err != nil {
rows.Close() rows.Close()
@@ -49,7 +66,7 @@ func (rows *Rows) Scan(dest ...interface{}) error {
return err return err
} }
func (rows *Rows) Values() ([]interface{}, error) { func (rows *poolRows) Values() ([]interface{}, error) {
values, err := rows.r.Values() values, err := rows.r.Values()
if err != nil { if err != nil {
rows.Close() rows.Close()
@@ -57,13 +74,13 @@ func (rows *Rows) Values() ([]interface{}, error) {
return values, err return values, err
} }
type Row struct { type poolRow struct {
r *pgx.Row r pgx.Row
c *Conn c *Conn
err error err error
} }
func (row *Row) Scan(dest ...interface{}) error { func (row *poolRow) Scan(dest ...interface{}) error {
if row.err != nil { if row.err != nil {
return row.err return row.err
} }
+2 -2
View File
@@ -38,10 +38,10 @@ func (tx *Tx) Exec(ctx context.Context, sql string, arguments ...interface{}) (p
return tx.c.Exec(ctx, sql, arguments...) return tx.c.Exec(ctx, sql, arguments...)
} }
func (tx *Tx) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*Rows, error) { func (tx *Tx) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (pgx.Rows, error) {
return tx.c.Query(ctx, sql, optionsAndArgs...) return tx.c.Query(ctx, sql, optionsAndArgs...)
} }
func (tx *Tx) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *Row { func (tx *Tx) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) pgx.Row {
return tx.c.QueryRow(ctx, sql, optionsAndArgs...) return tx.c.QueryRow(ctx, sql, optionsAndArgs...)
} }
+58 -39
View File
@@ -14,14 +14,45 @@ import (
"github.com/jackc/pgx/pgtype" "github.com/jackc/pgx/pgtype"
) )
// Row is a convenience wrapper over Rows that is returned by QueryRow. // Rows is the result set returned from *Conn.Query. Rows must be closed before
type Row Rows // the *Conn can be used again. Rows are closed by explicitly calling Close(),
// calling Next() until it returns false, or when a fatal error occurs.
type Rows interface {
// Close closes the rows, making the connection ready for use again. It is safe
// to call Close after rows is already closed.
Close()
// Scan works the same as (*Rows Scan) with the following exceptions. If no Err() error
// rows were found it returns ErrNoRows. If multiple rows are returned it FieldDescriptions() []FieldDescription
// ignores all but the first.
func (r *Row) Scan(dest ...interface{}) (err error) { // Next prepares the next row for reading. It returns true if there is another
rows := (*Rows)(r) // row and false if no more rows are available. It automatically closes rows
// when all rows are read.
Next() bool
// Scan reads the values from the current row into dest values positionally.
// dest can include pointers to core types, values implementing the Scanner
// interface, []byte, and nil. []byte will skip the decoding process and directly
// copy the raw bytes received from PostgreSQL. nil will skip the value entirely.
Scan(dest ...interface{}) error
// Values returns an array of the row values
Values() ([]interface{}, error)
}
// Row is a convenience wrapper over Rows that is returned by QueryRow.
type Row interface {
// Scan works the same as Rows. with the following exceptions. If no
// rows were found it returns ErrNoRows. If multiple rows are returned it
// ignores all but the first.
Scan(dest ...interface{}) error
}
// connRow implements the Row interface for Conn.QueryRow.
type connRow connRows
func (r *connRow) Scan(dest ...interface{}) (err error) {
rows := (*connRows)(r)
if rows.Err() != nil { if rows.Err() != nil {
return rows.Err() return rows.Err()
@@ -39,10 +70,8 @@ func (r *Row) Scan(dest ...interface{}) (err error) {
return rows.Err() return rows.Err()
} }
// Rows is the result set returned from *Conn.Query. Rows must be closed before // connRows implements the Rows interface for Conn.Query.
// the *Conn can be used again. Rows are closed by explicitly calling Close(), type connRows struct {
// calling Next() until it returns false, or when a fatal error occurs.
type Rows struct {
conn *Conn conn *Conn
batch *Batch batch *Batch
values [][]byte values [][]byte
@@ -60,13 +89,11 @@ type Rows struct {
multiResultReader *pgconn.MultiResultReader multiResultReader *pgconn.MultiResultReader
} }
func (rows *Rows) FieldDescriptions() []FieldDescription { func (rows *connRows) FieldDescriptions() []FieldDescription {
return rows.fields return rows.fields
} }
// Close closes the rows, making the connection ready for use again. It is safe func (rows *connRows) Close() {
// to call Close after rows is already closed.
func (rows *Rows) Close() {
if rows.closed { if rows.closed {
return return
} }
@@ -106,13 +133,13 @@ func (rows *Rows) Close() {
} }
} }
func (rows *Rows) Err() error { func (rows *connRows) Err() error {
return rows.err return rows.err
} }
// fatal signals an error occurred after the query was sent to the server. It // fatal signals an error occurred after the query was sent to the server. It
// closes the rows automatically. // closes the rows automatically.
func (rows *Rows) fatal(err error) { func (rows *connRows) fatal(err error) {
if rows.err != nil { if rows.err != nil {
return return
} }
@@ -121,10 +148,7 @@ func (rows *Rows) fatal(err error) {
rows.Close() rows.Close()
} }
// Next prepares the next row for reading. It returns true if there is another func (rows *connRows) Next() bool {
// row and false if no more rows are available. It automatically closes rows
// when all rows are read.
func (rows *Rows) Next() bool {
if rows.closed { if rows.closed {
return false return false
} }
@@ -147,7 +171,7 @@ func (rows *Rows) Next() bool {
} }
} }
func (rows *Rows) nextColumn() ([]byte, *FieldDescription, bool) { func (rows *connRows) nextColumn() ([]byte, *FieldDescription, bool) {
if rows.closed { if rows.closed {
return nil, nil, false return nil, nil, false
} }
@@ -162,11 +186,7 @@ func (rows *Rows) nextColumn() ([]byte, *FieldDescription, bool) {
return buf, fd, true return buf, fd, true
} }
// Scan reads the values from the current row into dest values positionally. func (rows *connRows) Scan(dest ...interface{}) (err error) {
// dest can include pointers to core types, values implementing the Scanner
// interface, []byte, and nil. []byte will skip the decoding process and directly
// copy the raw bytes received from PostgreSQL. nil will skip the value entirely.
func (rows *Rows) Scan(dest ...interface{}) (err error) {
if len(rows.fields) != len(dest) { if len(rows.fields) != len(dest) {
err = errors.Errorf("Scan received wrong number of arguments, got %d but expected %d", len(dest), len(rows.fields)) err = errors.Errorf("Scan received wrong number of arguments, got %d but expected %d", len(dest), len(rows.fields))
rows.fatal(err) rows.fatal(err)
@@ -243,8 +263,7 @@ func (rows *Rows) Scan(dest ...interface{}) (err error) {
return nil return nil
} }
// Values returns an array of the row values func (rows *connRows) Values() ([]interface{}, error) {
func (rows *Rows) Values() ([]interface{}, error) {
if rows.closed { if rows.closed {
return nil, errors.New("rows is closed") return nil, errors.New("rows is closed")
} }
@@ -307,9 +326,9 @@ func (e scanArgError) Error() string {
return fmt.Sprintf("can't scan into dest[%d]: %v", e.col, e.err) return fmt.Sprintf("can't scan into dest[%d]: %v", e.col, e.err)
} }
func (c *Conn) getRows(sql string, args []interface{}) *Rows { func (c *Conn) getRows(sql string, args []interface{}) *connRows {
if len(c.preallocatedRows) == 0 { if len(c.preallocatedRows) == 0 {
c.preallocatedRows = make([]Rows, 64) c.preallocatedRows = make([]connRows, 64)
} }
r := &c.preallocatedRows[len(c.preallocatedRows)-1] r := &c.preallocatedRows[len(c.preallocatedRows)-1]
@@ -333,10 +352,9 @@ type QueryExOptions struct {
SimpleProtocol bool SimpleProtocol bool
} }
// Query executes sql with args. If there is an error the returned *Rows will // Query executes sql with args. If there is an error the returned Rows will be returned in an error state. So it is
// be returned in an error state. So it is allowed to ignore the error returned // allowed to ignore the error returned from Query and handle it in Rows.
// from Query and handle it in *Rows. func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (Rows, error) {
func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (rows *Rows, err error) {
c.lastStmtSent = false c.lastStmtSent = false
// rows = c.getRows(sql, args) // rows = c.getRows(sql, args)
@@ -349,7 +367,7 @@ func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interfac
} }
} }
rows = &Rows{ rows := &connRows{
conn: c, conn: c,
startTime: time.Now(), startTime: time.Now(),
sql: sql, sql: sql,
@@ -368,6 +386,7 @@ func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interfac
// return rows, rows.err // return rows, rows.err
// } // }
var err error
if (options == nil && c.config.PreferSimpleProtocol) || (options != nil && options.SimpleProtocol) { if (options == nil && c.config.PreferSimpleProtocol) || (options != nil && options.SimpleProtocol) {
sql, err = c.sanitizeForSimpleQuery(sql, args...) sql, err = c.sanitizeForSimpleQuery(sql, args...)
if err != nil { if err != nil {
@@ -519,9 +538,9 @@ func (c *Conn) sanitizeForSimpleQuery(sql string, args ...interface{}) (string,
} }
// QueryRow is a convenience wrapper over Query. Any error that occurs while // QueryRow is a convenience wrapper over Query. Any error that occurs while
// querying is deferred until calling Scan on the returned *Row. That *Row will // querying is deferred until calling Scan on the returned Row. That Row will
// error with ErrNoRows if no rows are returned. // error with ErrNoRows if no rows are returned.
func (c *Conn) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *Row { func (c *Conn) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) Row {
rows, _ := c.Query(ctx, sql, optionsAndArgs...) rows, _ := c.Query(ctx, sql, optionsAndArgs...)
return (*Row)(rows) return (*connRow)(rows.(*connRows))
} }
+2 -2
View File
@@ -340,7 +340,7 @@ func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*Repl
// NOTE: Because this is a replication mode connection, we don't have // NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only // type names, so the field descriptions in the result will have only
// OIDs and no DataTypeName values // OIDs and no DataTypeName values
func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { func (rc *ReplicationConn) IdentifySystem() (r Rows, err error) {
return nil, errors.New("TODO") return nil, errors.New("TODO")
// return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") // return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM")
} }
@@ -356,7 +356,7 @@ func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) {
// NOTE: Because this is a replication mode connection, we don't have // NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only // type names, so the field descriptions in the result will have only
// OIDs and no DataTypeName values // OIDs and no DataTypeName values
func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { func (rc *ReplicationConn) TimelineHistory(timeline int) (r Rows, err error) {
return nil, errors.New("TODO") return nil, errors.New("TODO")
// return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) // return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline))
} }
+4 -4
View File
@@ -172,20 +172,20 @@ func (tx *Tx) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOp
} }
// Query delegates to the underlying *Conn // Query delegates to the underlying *Conn
func (tx *Tx) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (*Rows, error) { func (tx *Tx) Query(ctx context.Context, sql string, optionsAndArgs ...interface{}) (Rows, error) {
if tx.status != TxStatusInProgress { if tx.status != TxStatusInProgress {
// Because checking for errors can be deferred to the *Rows, build one with the error // Because checking for errors can be deferred to the *Rows, build one with the error
err := ErrTxClosed err := ErrTxClosed
return &Rows{closed: true, err: err}, err return &connRows{closed: true, err: err}, err
} }
return tx.conn.Query(ctx, sql, optionsAndArgs...) return tx.conn.Query(ctx, sql, optionsAndArgs...)
} }
// QueryRow delegates to the underlying *Conn // QueryRow delegates to the underlying *Conn
func (tx *Tx) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) *Row { func (tx *Tx) QueryRow(ctx context.Context, sql string, optionsAndArgs ...interface{}) Row {
rows, _ := tx.Query(ctx, sql, optionsAndArgs...) rows, _ := tx.Query(ctx, sql, optionsAndArgs...)
return (*Row)(rows) return (*connRow)(rows.(*connRows))
} }
// CopyFrom delegates to the underlying *Conn // CopyFrom delegates to the underlying *Conn