Remove pgtype.Row(), introduce Composite.Scan()
pgtype.Row() was optimized for a single line use without much ceremony at a cost of OID registration, which is cumbersome. In practice it so much incovnenience to create new Composite just before making a query. So now there is just a Composite type and 2 helper methods: - SetFields sets composite fields to values passed. This assignment fails if types passed are not assignable to Values pgtype is made of. - Scan acts exactly like query.Scan, but for a composite value. Passed values are set to values from SQL composite.
This commit is contained in:
+101
-94
@@ -1,146 +1,153 @@
|
|||||||
package pgtype
|
package pgtype
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/jackc/pgtype/binary"
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type composite struct {
|
type Composite struct {
|
||||||
fields []Value
|
fields []Value
|
||||||
status Status
|
Status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
// helper struct to act both as a scanning target and query argument
|
// NewComposite creates a Composite object, which acts as a "schema" for
|
||||||
type rowValue struct {
|
// SQL composite values.
|
||||||
args []interface{}
|
// To pass Composite as SQL parameter first set it's fields, either by
|
||||||
|
// passing initialized Value{} instances to NewComposite or by calling
|
||||||
|
// SetFields method
|
||||||
|
// To read composite fields back pass result of Scan() method
|
||||||
|
// to query Scan function.
|
||||||
|
func NewComposite(fields ...Value) *Composite {
|
||||||
|
return &Composite{fields, Present}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Row helper function builds a value which can be both used to
|
func (src Composite) Get() interface{} {
|
||||||
// "assemble" composite quiery arguments and to scan results back.
|
switch src.Status {
|
||||||
//
|
|
||||||
// When passed as an argument to query, values from Row args will
|
|
||||||
// be assigned to corresponding fields in a composite type and a single
|
|
||||||
// composite type will be passed to the PostgreSQL. Composite type need
|
|
||||||
// to be registered in ConnInfo first. This is required so that pgx
|
|
||||||
// can know which SQL types to use when constructing SQL composite argument
|
|
||||||
//
|
|
||||||
// When passed to Scan individual fields from composite query result
|
|
||||||
// are assigned to corresponding Row arguments. First argument MUST
|
|
||||||
// be of type *bool to flag when NULL value received. So total number
|
|
||||||
// of Row arguments, when passed to Scan should be number of composite
|
|
||||||
// fields you expect to read + 1
|
|
||||||
func Row(fields ...interface{}) rowValue {
|
|
||||||
return rowValue{fields}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Composite types is meant to be passed to ConnInfo.RegisterDataType only,
|
|
||||||
// so it is made private on purpose. Once registered, it allows Row
|
|
||||||
// function to correctly pass query arguments.
|
|
||||||
func Composite(fields ...Value) *composite {
|
|
||||||
return &composite{fields, Undefined}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (src composite) Get() interface{} {
|
|
||||||
switch src.status {
|
|
||||||
case Present:
|
case Present:
|
||||||
return src
|
return src
|
||||||
case Null:
|
case Null:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
return src.status
|
return src.Status
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set is called internally when passing query arguments.
|
// Set is called internally when passing query arguments.
|
||||||
// Only valid src is a result of pgtype.Row() or nil
|
func (dst *Composite) Set(src interface{}) error {
|
||||||
func (dst *composite) Set(src interface{}) error {
|
|
||||||
if src == nil {
|
if src == nil {
|
||||||
*dst = composite{status: Null}
|
*dst = Composite{Status: Null}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch value := src.(type) {
|
switch value := src.(type) {
|
||||||
case rowValue:
|
case []Value:
|
||||||
if len(value.args) != len(dst.fields) {
|
if len(value) != len(dst.fields) {
|
||||||
return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields))
|
return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields))
|
||||||
}
|
}
|
||||||
for i, v := range value.args {
|
for i, v := range value {
|
||||||
if err := dst.fields[i].Set(v); err != nil {
|
if err := dst.fields[i].Set(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dst.status = Present
|
dst.Status = Present
|
||||||
default:
|
default:
|
||||||
return errors.Errorf("Use pgtype.Row() as query parameter")
|
return errors.Errorf("Can not convert %v to Composite", src)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AssignTo is never called on composite value directly, it is here
|
// AssignTo should never be called on composite value directly
|
||||||
// to satisfy Valuer interface
|
func (src Composite) AssignTo(dst interface{}) error {
|
||||||
func (src composite) AssignTo(dst interface{}) error {
|
return errors.New("Pass Composite.Scan() to deconstruct composite")
|
||||||
return errors.New("BUG: should never be called, because pgtype.composite doesn't support decoding")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (src composite) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) {
|
func (src Composite) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) {
|
||||||
|
switch src.Status {
|
||||||
|
case Null:
|
||||||
|
return nil, nil
|
||||||
|
case Undefined:
|
||||||
|
return nil, errUndefined
|
||||||
|
}
|
||||||
return EncodeRow(ci, buf, src.fields...)
|
return EncodeRow(ci, buf, src.fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecodeBinary here is just to make pgx use binary result format by default.
|
// DecodeBinary implements BinaryDecoder interface.
|
||||||
// Users should be using Row function or their own types to scan composites
|
// Opposite to Record, fields in a composite act as a "schema"
|
||||||
func (src composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) {
|
// and decoding fails if SQL value can't be assigned due to
|
||||||
return errors.New("Pass pgtype.Row() to Scan to deconstruct Composite")
|
// type mismatch
|
||||||
}
|
func (dst *Composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) {
|
||||||
|
if buf == nil {
|
||||||
// Row method creates composite BinaryEncoder. It's main purpose
|
dst.Status = Null
|
||||||
// is to build composite query argument inplace without registering
|
|
||||||
// pgtype.Composite in ConnInfo first
|
|
||||||
func (src composite) Row(values ...interface{}) BinaryEncoderFunc {
|
|
||||||
return func(ci *ConnInfo, buf []byte) ([]byte, error) {
|
|
||||||
if len(values) != len(src.fields) {
|
|
||||||
return nil, errors.Errorf("Number of fields don't match. Composite has %d fields", len(src.fields))
|
|
||||||
}
|
|
||||||
for i, v := range values {
|
|
||||||
if err := src.fields[i].Set(v); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
src.status = Present
|
|
||||||
return src.EncodeBinary(ci, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DecodeBinary is called when pgtype.Row() is passed to Scan() to
|
|
||||||
// deconstruct composite value
|
|
||||||
func (r rowValue) DecodeBinary(ci *ConnInfo, src []byte) error {
|
|
||||||
if len(r.args) == 0 {
|
|
||||||
return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan")
|
|
||||||
}
|
|
||||||
|
|
||||||
isNull, ok := r.args[0].(*bool)
|
|
||||||
if !ok {
|
|
||||||
return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan")
|
|
||||||
}
|
|
||||||
args := r.args[1:]
|
|
||||||
|
|
||||||
var record Record
|
|
||||||
if err := record.DecodeBinary(ci, src); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if record.Status == Null {
|
|
||||||
*isNull = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(record.Fields) != len(args) {
|
fieldIter, fieldCount, err := binary.NewRecordFieldIterator(buf)
|
||||||
return errors.Errorf("SQL composite can't be read, 'pgtype.Row' has wrong field cout. %d != %d", len(record.Fields), len(args))
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if len(dst.fields) != fieldCount {
|
||||||
|
return errors.Errorf("SQL composite can't be read, field count mismatch. expected %d , found %d", len(dst.fields), fieldCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, f := range record.Fields {
|
_, fieldBytes, eof, err := fieldIter.Next()
|
||||||
if err := f.AssignTo(args[i]); err != nil {
|
|
||||||
|
for i := 0; !eof; i++ {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
binaryDecoder, ok := dst.fields[i].(BinaryDecoder)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("Composite field doesn't support binary protocol")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, fieldBytes, eof, err = fieldIter.Next()
|
||||||
|
}
|
||||||
|
dst.Status = Present
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan is a helper function to perform "nested" scan of
|
||||||
|
// a composite value when scanning a query result row.
|
||||||
|
// isNull is set if scanned value is NULL
|
||||||
|
// Rest of arguments are set in the order of fields in the composite
|
||||||
|
//
|
||||||
|
// Use of Scan method doesn't modify original composite
|
||||||
|
func (src Composite) Scan(isNull *bool, dst ...interface{}) BinaryDecoderFunc {
|
||||||
|
return func(ci *ConnInfo, buf []byte) error {
|
||||||
|
if err := src.DecodeBinary(ci, buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if src.Status == Null {
|
||||||
|
*isNull = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, f := range src.fields {
|
||||||
|
if err := f.AssignTo(dst[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetFields sets Composite's fields to corresponding values
|
||||||
|
func (dst *Composite) SetFields(values ...interface{}) error {
|
||||||
|
if len(values) != len(dst.fields) {
|
||||||
|
return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields))
|
||||||
|
}
|
||||||
|
for i, v := range values {
|
||||||
|
if err := dst.fields[i].Set(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
dst.Status = Present
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
+6
-16
@@ -101,27 +101,15 @@ func BenchmarkBinaryEncodingRow(b *testing.B) {
|
|||||||
ci := pgtype.NewConnInfo()
|
ci := pgtype.NewConnInfo()
|
||||||
f1 := 2
|
f1 := 2
|
||||||
f2 := ptrS("bar")
|
f2 := ptrS("bar")
|
||||||
|
c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{})
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
c := pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{})
|
c.SetFields(f1, f2)
|
||||||
c.Set(pgtype.Row(f1, f2))
|
|
||||||
buf, _ = c.EncodeBinary(ci, buf[:0])
|
buf, _ = c.EncodeBinary(ci, buf[:0])
|
||||||
}
|
}
|
||||||
x = buf
|
x = buf
|
||||||
}
|
}
|
||||||
func BenchmarkBinaryEncodingRowInplace(b *testing.B) {
|
|
||||||
buf := make([]byte, 0, 128)
|
|
||||||
ci := pgtype.NewConnInfo()
|
|
||||||
f1 := 2
|
|
||||||
f2 := ptrS("bar")
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
for n := 0; n < b.N; n++ {
|
|
||||||
buf, _ = pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}).Row(f1, f2).EncodeBinary(ci, buf[:0])
|
|
||||||
}
|
|
||||||
x = buf
|
|
||||||
}
|
|
||||||
|
|
||||||
var dstRaw MyCompositeRaw
|
var dstRaw MyCompositeRaw
|
||||||
|
|
||||||
@@ -156,16 +144,18 @@ func BenchmarkBinaryDecodingHelpers(b *testing.B) {
|
|||||||
var gf1 int
|
var gf1 int
|
||||||
var gf2 *string
|
var gf2 *string
|
||||||
|
|
||||||
func BenchmarkBinaryDecodingRow(b *testing.B) {
|
func BenchmarkBinaryDecodingCompositeScan(b *testing.B) {
|
||||||
ci := pgtype.NewConnInfo()
|
ci := pgtype.NewConnInfo()
|
||||||
buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil)
|
buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil)
|
||||||
var isNull bool
|
var isNull bool
|
||||||
var f1 int
|
var f1 int
|
||||||
var f2 *string
|
var f2 *string
|
||||||
|
|
||||||
|
c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{})
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
err := pgtype.Row(&isNull, &f1, &f2).DecodeBinary(ci, buf)
|
err := c.Scan(&isNull, &f1, &f2).DecodeBinary(ci, buf)
|
||||||
E(err)
|
E(err)
|
||||||
}
|
}
|
||||||
gf1 = f1
|
gf1 = f1
|
||||||
|
|||||||
+7
-10
@@ -25,28 +25,25 @@ create type mytype as (
|
|||||||
E(err)
|
E(err)
|
||||||
defer conn.Exec(context.Background(), "drop type mytype")
|
defer conn.Exec(context.Background(), "drop type mytype")
|
||||||
|
|
||||||
//WIP
|
|
||||||
q, err := conn.Prepare(context.Background(), "z", "select $1::mytype")
|
|
||||||
E(err)
|
|
||||||
conn.ConnInfo().RegisterDataType(pgtype.DataType{pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}), "mytype", q.ParamOIDs[0]})
|
|
||||||
|
|
||||||
var isNull bool
|
var isNull bool
|
||||||
var a int
|
var a int
|
||||||
var b *string
|
var b *string
|
||||||
|
|
||||||
err = conn.QueryRow(context.Background(), "select $1::mytype",
|
c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{})
|
||||||
pgtype.Row(2, "bar")).
|
c.SetFields(2, "bar")
|
||||||
Scan(pgtype.Row(&isNull, &a, &b))
|
|
||||||
|
err = conn.QueryRow(context.Background(), "select $1::mytype", c).
|
||||||
|
Scan(c.Scan(&isNull, &a, &b))
|
||||||
E(err)
|
E(err)
|
||||||
|
|
||||||
fmt.Printf("First: isNull=%v a=%d b=%s\n", isNull, a, *b)
|
fmt.Printf("First: isNull=%v a=%d b=%s\n", isNull, a, *b)
|
||||||
|
|
||||||
err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(pgtype.Row(&isNull, &a, &b))
|
err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(c.Scan(&isNull, &a, &b))
|
||||||
E(err)
|
E(err)
|
||||||
|
|
||||||
fmt.Printf("Second: isNull=%v a=%d b=%v\n", isNull, a, b)
|
fmt.Printf("Second: isNull=%v a=%d b=%v\n", isNull, a, b)
|
||||||
|
|
||||||
err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(pgtype.Row(&isNull, &a, &b))
|
err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(c.Scan(&isNull, &a, &b))
|
||||||
E(err)
|
E(err)
|
||||||
|
|
||||||
fmt.Printf("Third: isNull=%v\n", isNull)
|
fmt.Printf("Third: isNull=%v\n", isNull)
|
||||||
|
|||||||
Reference in New Issue
Block a user