From 087df120bbdc0e79b94e174062509a1dc14e90a9 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 11 Apr 2020 10:38:23 +0100 Subject: [PATCH 01/18] Refactor lowlevel record field iteration --- record.go | 89 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/record.go b/record.go index 5c9d7a02..76da5ad0 100644 --- a/record.go +++ b/record.go @@ -78,57 +78,94 @@ func (src *Record) AssignTo(dst interface{}) error { return errors.Errorf("cannot decode %#v into %T", src, dst) } +type fieldIter struct { + rp int + fieldCount int + src []byte +} + +func newFieldIterator(src []byte) (fieldIter, error) { + rp := 0 + if len(src[rp:]) < 4 { + return fieldIter{}, errors.Errorf("Record incomplete %v", src) + } + + fieldCount := int(int32(binary.BigEndian.Uint32(src[rp:]))) + rp += 4 + + return fieldIter{ + rp: rp, + fieldCount: fieldCount, + src: src, + }, nil +} + +func (fi *fieldIter) next() (fieldOID uint32, buf []byte, eof bool, err error) { + if fi.rp == len(fi.src) { + eof = true + return + } + + if len(fi.src[fi.rp:]) < 8 { + err = errors.Errorf("Record incomplete %v", fi.src) + return + } + fieldOID = binary.BigEndian.Uint32(fi.src[fi.rp:]) + fi.rp += 4 + + fieldLen := int(int32(binary.BigEndian.Uint32(fi.src[fi.rp:]))) + fi.rp += 4 + + if fieldLen >= 0 { + if len(fi.src[fi.rp:]) < fieldLen { + err = errors.Errorf("Record incomplete rp=%d src=%v", fi.rp, fi.src) + return + } + buf = fi.src[fi.rp : fi.rp+fieldLen] + fi.rp += fieldLen + } + + return +} + func (dst *Record) DecodeBinary(ci *ConnInfo, src []byte) error { if src == nil { *dst = Record{Status: Null} return nil } - rp := 0 - - if len(src[rp:]) < 4 { - return errors.Errorf("Record incomplete %v", src) + fieldIter, err := newFieldIterator(src) + if err != nil { + return err } - fieldCount := int(int32(binary.BigEndian.Uint32(src[rp:]))) - rp += 4 - fields := make([]Value, fieldCount) + fields := make([]Value, fieldIter.fieldCount) + fieldOID, fieldBytes, eof, err := fieldIter.next() - for i := 0; i < fieldCount; i++ { - if len(src[rp:]) < 8 { - return errors.Errorf("Record incomplete %v", src) + for i := 0; !eof; i++ { + if err != nil { + return err } - fieldOID := binary.BigEndian.Uint32(src[rp:]) - rp += 4 - - fieldLen := int(int32(binary.BigEndian.Uint32(src[rp:]))) - rp += 4 - var binaryDecoder BinaryDecoder + if dt, ok := ci.DataTypeForOID(fieldOID); ok { binaryDecoder, _ = dt.Value.(BinaryDecoder) - } - if binaryDecoder == nil { + } else { return errors.Errorf("unknown oid while decoding record: %v", fieldOID) } - var fieldBytes []byte - if fieldLen >= 0 { - if len(src[rp:]) < fieldLen { - return errors.Errorf("Record incomplete %v", src) - } - fieldBytes = src[rp : rp+fieldLen] - rp += fieldLen + if binaryDecoder == nil { + return errors.Errorf("no binary decoder registered for: %v", fieldOID) } // Duplicate struct to scan into binaryDecoder = reflect.New(reflect.ValueOf(binaryDecoder).Elem().Type()).Interface().(BinaryDecoder) - if err := binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { return err } fields[i] = binaryDecoder.(Value) + fieldOID, fieldBytes, eof, err = fieldIter.next() } *dst = Record{Fields: fields, Status: Present} From 9a869c8359bd2845aaa54ee5db56c930a348a063 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 11 Apr 2020 11:08:53 +0100 Subject: [PATCH 02/18] Refactor record field binary decoder preparation --- record.go | 36 +++++++++++++++++++++++------------- record_test.go | 46 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/record.go b/record.go index 76da5ad0..08603140 100644 --- a/record.go +++ b/record.go @@ -128,6 +128,25 @@ func (fi *fieldIter) next() (fieldOID uint32, buf []byte, eof bool, err error) { return } +func prepareNewBinaryDecoder(ci *ConnInfo, fieldOID uint32, v *Value) (BinaryDecoder, error) { + var binaryDecoder BinaryDecoder + + if dt, ok := ci.DataTypeForOID(fieldOID); ok { + binaryDecoder, _ = dt.Value.(BinaryDecoder) + } else { + return nil, errors.Errorf("unknown oid while decoding record: %v", fieldOID) + } + + if binaryDecoder == nil { + return nil, errors.Errorf("no binary decoder registered for: %v", fieldOID) + } + + // Duplicate struct to scan into + binaryDecoder = reflect.New(reflect.ValueOf(binaryDecoder).Elem().Type()).Interface().(BinaryDecoder) + *v = binaryDecoder.(Value) + return binaryDecoder, nil +} + func (dst *Record) DecodeBinary(ci *ConnInfo, src []byte) error { if src == nil { *dst = Record{Status: Null} @@ -146,25 +165,16 @@ func (dst *Record) DecodeBinary(ci *ConnInfo, src []byte) error { if err != nil { return err } - var binaryDecoder BinaryDecoder - if dt, ok := ci.DataTypeForOID(fieldOID); ok { - binaryDecoder, _ = dt.Value.(BinaryDecoder) - } else { - return errors.Errorf("unknown oid while decoding record: %v", fieldOID) + binaryDecoder, err := prepareNewBinaryDecoder(ci, fieldOID, &fields[i]) + if err != nil { + return err } - if binaryDecoder == nil { - return errors.Errorf("no binary decoder registered for: %v", fieldOID) - } - - // Duplicate struct to scan into - binaryDecoder = reflect.New(reflect.ValueOf(binaryDecoder).Elem().Type()).Interface().(BinaryDecoder) - if err := binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { + if err = binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { return err } - fields[i] = binaryDecoder.(Value) fieldOID, fieldBytes, eof, err = fieldIter.next() } diff --git a/record_test.go b/record_test.go index 71a2f702..c8d9097d 100644 --- a/record_test.go +++ b/record_test.go @@ -83,22 +83,50 @@ func TestRecordTranscode(t *testing.T) { }, } - for i, tt := range tests { + for i := 0; i < len(tests); i++ { + tt := tests[i] psName := fmt.Sprintf("test%d", i) _, err := conn.Prepare(context.Background(), psName, tt.sql) if err != nil { t.Fatal(err) } - var result pgtype.Record - if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { - t.Errorf("%d: %v", i, err) - continue - } + t.Run(fmt.Sprintf("scan %d", i), func(t *testing.T) { + var result pgtype.Record + if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { + t.Errorf("%v", err) + return + } + + if !reflect.DeepEqual(tt.expected, result) { + t.Errorf("expected %#v, got %#v", tt.expected, result) + } + }) + + t.Run(fmt.Sprintf("scan MatchFields %d", i), func(t *testing.T) { + tt.expected.MatchFields = true + + fieldsCopy := make([]pgtype.Value, len(tt.expected.Fields)) + reflect.Copy(reflect.ValueOf(fieldsCopy), reflect.ValueOf(tt.expected.Fields)) + + if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&tt.expected); err != nil { + t.Errorf("%d: %v", i, err) + return + } + + if !reflect.DeepEqual(tt.expected.Fields, fieldsCopy) { + t.Errorf("Matching scan succeeded, but modified predefined fields. %d: expected %#v, got %#v", i, tt.expected.Fields, fieldsCopy) + } + + // borrow fields from a neighbor test, this makes scan always fail + tt.expected.Fields = tests[(i+1)%len(tests)].expected.Fields + reflect.Copy(reflect.ValueOf(fieldsCopy), reflect.ValueOf(tt.expected.Fields)) + if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&tt.expected); err == nil { + t.Errorf("Matching scan didn't fail, despite fields not mathchin query result. %d: %v", i, err) + return + } + }) - if !reflect.DeepEqual(tt.expected, result) { - t.Errorf("%d: expected %#v, got %#v", i, tt.expected, result) - } } } From ff95f82f7057c17f1bc55e01c6a2a04da70b1f4f Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 11 Apr 2020 12:20:43 +0100 Subject: [PATCH 03/18] Add ScanRowValue helper function ScanRowValue is useful when reading ROW() values with known field types as well as composite types. It accepts pgtype.Value arguments, where ROW() fields are written to on successfull scan. --- convert.go | 38 +++++++++ record_test.go | 207 ++++++++++++++++++++++++++----------------------- 2 files changed, 150 insertions(+), 95 deletions(-) diff --git a/convert.go b/convert.go index cc5c10ab..a0c38c5b 100644 --- a/convert.go +++ b/convert.go @@ -433,6 +433,44 @@ func GetAssignToDstType(dst interface{}) (interface{}, bool) { return nil, false } +// ScanRowValue assigns ROW()'s fields to destination Values. +// Argument types are checked and error is returned if SQL field value +// can't be assigned to corresponding destionation Value without loss +// of information. Number of fields have to match number of destination values. +// +// Values must implement BinaryDecoder interface otherwise error is returned. +// ScanRowValue takes ownership of src, caller MUST not use it after call +func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { + fieldIter, err := newFieldIterator(src) + if err != nil { + return err + } + + if len(dst) != fieldIter.fieldCount { + return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", fieldIter.fieldCount, len(dst)) + } + + _, fieldBytes, eof, err := fieldIter.next() + for i := 0; !eof; i++ { + if err != nil { + return err + } + + binaryDecoder, ok := dst[i].(BinaryDecoder) + if !ok { + return errors.Errorf("record field doesn't implement binary decoding: %s", reflect.TypeOf(dst[i]).Name()) + } + + if err = binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { + return err + } + + _, fieldBytes, eof, err = fieldIter.next() + } + + return nil +} + func init() { kindTypes = map[reflect.Kind]reflect.Type{ reflect.Bool: reflect.TypeOf(false), diff --git a/record_test.go b/record_test.go index c8d9097d..af2105c7 100644 --- a/record_test.go +++ b/record_test.go @@ -11,87 +11,128 @@ import ( "github.com/jackc/pgx/v4" ) +var recordTests = []struct { + sql string + expected pgtype.Record +}{ + { + sql: `select row()`, + expected: pgtype.Record{ + Fields: []pgtype.Value{}, + Status: pgtype.Present, + }, + }, + { + sql: `select row('foo'::text, 42::int4)`, + expected: pgtype.Record{ + Fields: []pgtype.Value{ + &pgtype.Text{String: "foo", Status: pgtype.Present}, + &pgtype.Int4{Int: 42, Status: pgtype.Present}, + }, + Status: pgtype.Present, + }, + }, + { + sql: `select row(100.0::float4, 1.09::float4)`, + expected: pgtype.Record{ + Fields: []pgtype.Value{ + &pgtype.Float4{Float: 100, Status: pgtype.Present}, + &pgtype.Float4{Float: 1.09, Status: pgtype.Present}, + }, + Status: pgtype.Present, + }, + }, + { + sql: `select row('foo'::text, array[1, 2, null, 4]::int4[], 42::int4)`, + expected: pgtype.Record{ + Fields: []pgtype.Value{ + &pgtype.Text{String: "foo", Status: pgtype.Present}, + &pgtype.Int4Array{ + Elements: []pgtype.Int4{ + {Int: 1, Status: pgtype.Present}, + {Int: 2, Status: pgtype.Present}, + {Status: pgtype.Null}, + {Int: 4, Status: pgtype.Present}, + }, + Dimensions: []pgtype.ArrayDimension{{Length: 4, LowerBound: 1}}, + Status: pgtype.Present, + }, + &pgtype.Int4{Int: 42, Status: pgtype.Present}, + }, + Status: pgtype.Present, + }, + }, + { + sql: `select row(null)`, + expected: pgtype.Record{ + Fields: []pgtype.Value{ + &pgtype.Unknown{Status: pgtype.Null}, + }, + Status: pgtype.Present, + }, + }, + { + sql: `select null::record`, + expected: pgtype.Record{ + Status: pgtype.Null, + }, + }, +} + +// row values are binary compatible with records, so we test our helper +// routines here +func TestScanRowValue(t *testing.T) { + conn := testutil.MustConnectPgx(t) + defer testutil.MustCloseContext(t, conn) + + for i := 0; i < len(recordTests); i++ { + tt := recordTests[i] + psName := fmt.Sprintf("test%d", i) + _, err := conn.Prepare(context.Background(), psName, tt.sql) + if err != nil { + t.Fatal(err) + } + t.Run(tt.sql, func(t *testing.T) { + desc := append([]pgtype.Value(nil), tt.expected.Fields...) + + var raw pgtype.GenericBinary + + if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&raw); err != nil { + t.Error(err) + return + } + + if raw.Status == pgtype.Null { + // ScanRowValue deals with complete rows only, NULL values (but NOT null fields) + // should be handled by the calling code + return + } + + if err := pgtype.ScanRowValue(conn.ConnInfo(), raw.Bytes, desc...); err != nil { + t.Error(err) + } + + // borrow fields from a neighbor test, this makes scan always fail + desc = append([]pgtype.Value(nil), recordTests[(i+1)%len(recordTests)].expected.Fields...) + if err := pgtype.ScanRowValue(conn.ConnInfo(), raw.Bytes, desc...); err == nil { + t.Error("Matching scan didn't fail, despite fields not mathching query result") + } + }) + } +} + func TestRecordTranscode(t *testing.T) { conn := testutil.MustConnectPgx(t) defer testutil.MustCloseContext(t, conn) - tests := []struct { - sql string - expected pgtype.Record - }{ - { - sql: `select row()`, - expected: pgtype.Record{ - Fields: []pgtype.Value{}, - Status: pgtype.Present, - }, - }, - { - sql: `select row('foo'::text, 42::int4)`, - expected: pgtype.Record{ - Fields: []pgtype.Value{ - &pgtype.Text{String: "foo", Status: pgtype.Present}, - &pgtype.Int4{Int: 42, Status: pgtype.Present}, - }, - Status: pgtype.Present, - }, - }, - { - sql: `select row(100.0::float4, 1.09::float4)`, - expected: pgtype.Record{ - Fields: []pgtype.Value{ - &pgtype.Float4{Float: 100, Status: pgtype.Present}, - &pgtype.Float4{Float: 1.09, Status: pgtype.Present}, - }, - Status: pgtype.Present, - }, - }, - { - sql: `select row('foo'::text, array[1, 2, null, 4]::int4[], 42::int4)`, - expected: pgtype.Record{ - Fields: []pgtype.Value{ - &pgtype.Text{String: "foo", Status: pgtype.Present}, - &pgtype.Int4Array{ - Elements: []pgtype.Int4{ - {Int: 1, Status: pgtype.Present}, - {Int: 2, Status: pgtype.Present}, - {Status: pgtype.Null}, - {Int: 4, Status: pgtype.Present}, - }, - Dimensions: []pgtype.ArrayDimension{{Length: 4, LowerBound: 1}}, - Status: pgtype.Present, - }, - &pgtype.Int4{Int: 42, Status: pgtype.Present}, - }, - Status: pgtype.Present, - }, - }, - { - sql: `select row(null)`, - expected: pgtype.Record{ - Fields: []pgtype.Value{ - &pgtype.Unknown{Status: pgtype.Null}, - }, - Status: pgtype.Present, - }, - }, - { - sql: `select null::record`, - expected: pgtype.Record{ - Status: pgtype.Null, - }, - }, - } - - for i := 0; i < len(tests); i++ { - tt := tests[i] + for i, tt := range recordTests { psName := fmt.Sprintf("test%d", i) _, err := conn.Prepare(context.Background(), psName, tt.sql) if err != nil { t.Fatal(err) } - t.Run(fmt.Sprintf("scan %d", i), func(t *testing.T) { + t.Run(tt.sql, func(t *testing.T) { var result pgtype.Record if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { t.Errorf("%v", err) @@ -103,30 +144,6 @@ func TestRecordTranscode(t *testing.T) { } }) - t.Run(fmt.Sprintf("scan MatchFields %d", i), func(t *testing.T) { - tt.expected.MatchFields = true - - fieldsCopy := make([]pgtype.Value, len(tt.expected.Fields)) - reflect.Copy(reflect.ValueOf(fieldsCopy), reflect.ValueOf(tt.expected.Fields)) - - if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&tt.expected); err != nil { - t.Errorf("%d: %v", i, err) - return - } - - if !reflect.DeepEqual(tt.expected.Fields, fieldsCopy) { - t.Errorf("Matching scan succeeded, but modified predefined fields. %d: expected %#v, got %#v", i, tt.expected.Fields, fieldsCopy) - } - - // borrow fields from a neighbor test, this makes scan always fail - tt.expected.Fields = tests[(i+1)%len(tests)].expected.Fields - reflect.Copy(reflect.ValueOf(fieldsCopy), reflect.ValueOf(tt.expected.Fields)) - if err := conn.QueryRow(context.Background(), psName, pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&tt.expected); err == nil { - t.Errorf("Matching scan didn't fail, despite fields not mathchin query result. %d: %v", i, err) - return - } - }) - } } From 71ed747f3a7786d1f1c1dc336d4c661d08da6e6a Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 12 Apr 2020 15:52:37 +0100 Subject: [PATCH 04/18] Add example of CompositeType handling with ScanRowValue helper --- composite_test.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 composite_test.go diff --git a/composite_test.go b/composite_test.go new file mode 100644 index 00000000..d51cb579 --- /dev/null +++ b/composite_test.go @@ -0,0 +1,77 @@ +package pgtype_test + +import ( + "context" + "fmt" + "os" + + "github.com/jackc/pgtype" + pgx "github.com/jackc/pgx/v4" + errors "golang.org/x/xerrors" +) + +type MyType struct { + a int32 // NULL will cause decoding error + b *string // there can be NULL in this position in SQL +} + +func (dst *MyType) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { + if src == nil { + return errors.New("NULL values can't be decoded. Scan into a &*MyType to handle NULLs") + } + + a := pgtype.Int4{} + b := pgtype.Text{} + + if err := pgtype.ScanRowValue(ci, src, &a, &b); err != nil { + return err + } + + // type compatibility is checked by AssignTo + // only lossless assignments will succeed + if err := a.AssignTo(&dst.a); err != nil { + return err + } + + // AssignTo also deals with null value handling + if err := b.AssignTo(&dst.b); err != nil { + return err + } + + return nil +} + +func Example_compositeTypes() { + conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + if err != nil { + panic(err) + } + defer conn.Close(context.Background()) + _, err = conn.Exec(context.Background(), `drop type if exists mytype; + +create type mytype as ( + a int4, + b text +);`) + if err != nil { + panic(err) + } + defer conn.Exec(context.Background(), "drop type mytype") + + var result *MyType + if err = conn.QueryRow(context.Background(), "select (1,'foo')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { + panic(err) + } + + fmt.Printf("First row: a=%d b=%s\n", result.a, *result.b) + + // Because we scan into &*MyType, NULLs are handled generically by assigning nil to result + if err = conn.QueryRow(context.Background(), "select NULL::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { + panic(err) + } + + fmt.Printf("Second row: %v\n", result) + // Output: + // First row: a=1 b=foo + // Second row: +} From 368295d3ee4d8f08f30d5f8cb1841461cd4f14a6 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 12 Apr 2020 18:40:52 +0100 Subject: [PATCH 05/18] Create ROW helper for adhoc decoding of records --- composite_test.go | 13 +++++++++++++ convert.go | 28 ++++++++++++++++++++++++++++ pgtype.go | 9 +++++++++ 3 files changed, 50 insertions(+) diff --git a/composite_test.go b/composite_test.go index d51cb579..ffa7d479 100644 --- a/composite_test.go +++ b/composite_test.go @@ -71,7 +71,20 @@ create type mytype as ( } fmt.Printf("Second row: %v\n", result) + + // Adhoc rows can be decoded inplace without boilerplate (works with composite types too) + var isNull bool + var a int + var b *string + + if err = conn.QueryRow(context.Background(), "select (2, 'bar')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(pgtype.ROW(&isNull, &a, &b)); err != nil { + panic(err) + } + + fmt.Printf("Adhoc: isNull=%v a=%d b=%s", isNull, a, *b) + // Output: // First row: a=1 b=foo // Second row: + // Adhoc: isNull=false a=2 b=bar } diff --git a/convert.go b/convert.go index a0c38c5b..8157358b 100644 --- a/convert.go +++ b/convert.go @@ -471,6 +471,34 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { return nil } +// ROW allows deconstructing row values (records and composite types) into +// fields directly without creating your own type and implementing decoder interfaces +func ROW(isNull *bool, fields ...interface{}) BinaryDecoderFunc { + return func(ci *ConnInfo, src []byte) error { + var record Record + if err := record.DecodeBinary(ci, src); err != nil { + return err + } + + if record.Status == Null { + *isNull = true + return nil + } + + if len(record.Fields) != len(fields) { + return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", len(record.Fields), len(fields)) + } + + for i, f := range record.Fields { + if err := f.AssignTo(fields[i]); err != nil { + return err + } + } + + return nil + } +} + func init() { kindTypes = map[reflect.Kind]reflect.Type{ reflect.Bool: reflect.TypeOf(false), diff --git a/pgtype.go b/pgtype.go index 914e02d2..1749c8c2 100644 --- a/pgtype.go +++ b/pgtype.go @@ -158,6 +158,15 @@ type TextEncoder interface { EncodeText(ci *ConnInfo, buf []byte) (newBuf []byte, err error) } +//The BinaryDecoderFunc type is an adapter to allow the use of ordinary functions as BinaryDecoder types. +// If f is a function with the appropriate signature, BinaryDecoderFunc(f) is a BinaryDecoder that calls f. +type BinaryDecoderFunc func(ci *ConnInfo, src []byte) error + +// DecodeBinary calls f(ci, src) +func (f BinaryDecoderFunc) DecodeBinary(ci *ConnInfo, src []byte) error { + return f(ci, src) +} + var errUndefined = errors.New("cannot encode status undefined") var errBadStatus = errors.New("invalid status") From 8ae83b19f7d6a2a27ac3b1a2664dc3c61a90cf46 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 12 Apr 2020 22:33:33 +0100 Subject: [PATCH 06/18] Add EncodeRow helpers Also extend example to show how EncodeRow can be used to create binary encoders for composite type --- composite_test.go | 47 +++++++++++++++++++++++++++++++++-------------- convert.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/composite_test.go b/composite_test.go index ffa7d479..d0c48f6e 100644 --- a/composite_test.go +++ b/composite_test.go @@ -41,11 +41,32 @@ func (dst *MyType) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { return nil } -func Example_compositeTypes() { - conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) +func (src MyType) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error) { + a := pgtype.Int4{src.a, pgtype.Present} + var b pgtype.Text + if src.b != nil { + b = pgtype.Text{*src.b, pgtype.Present} + } else { + b = pgtype.Text{Status: pgtype.Null} + } + + return pgtype.EncodeRow(ci, buf, &a, &b) +} + +func ptrS(s string) *string { + return &s +} + +func E(err error) { if err != nil { panic(err) } +} + +func Example_compositeTypes() { + conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + E(err) + defer conn.Close(context.Background()) _, err = conn.Exec(context.Background(), `drop type if exists mytype; @@ -53,22 +74,21 @@ create type mytype as ( a int4, b text );`) - if err != nil { - panic(err) - } + E(err) defer conn.Exec(context.Background(), "drop type mytype") var result *MyType - if err = conn.QueryRow(context.Background(), "select (1,'foo')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { - panic(err) - } + + // Demonstrates both passing and reading back composite values + err = conn.QueryRow(context.Background(), "select $1::mytype", + pgx.QueryResultFormats{pgx.BinaryFormatCode}, MyType{1, ptrS("foo")}).Scan(&result) + E(err) fmt.Printf("First row: a=%d b=%s\n", result.a, *result.b) // Because we scan into &*MyType, NULLs are handled generically by assigning nil to result - if err = conn.QueryRow(context.Background(), "select NULL::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result); err != nil { - panic(err) - } + err = conn.QueryRow(context.Background(), "select NULL::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result) + E(err) fmt.Printf("Second row: %v\n", result) @@ -77,9 +97,8 @@ create type mytype as ( var a int var b *string - if err = conn.QueryRow(context.Background(), "select (2, 'bar')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(pgtype.ROW(&isNull, &a, &b)); err != nil { - panic(err) - } + err = conn.QueryRow(context.Background(), "select (2, 'bar')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(pgtype.ROW(&isNull, &a, &b)) + E(err) fmt.Printf("Adhoc: isNull=%v a=%d b=%s", isNull, a, *b) diff --git a/convert.go b/convert.go index 8157358b..d22a714f 100644 --- a/convert.go +++ b/convert.go @@ -5,6 +5,7 @@ import ( "reflect" "time" + "github.com/jackc/pgio" errors "golang.org/x/xerrors" ) @@ -471,6 +472,38 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { return nil } +// EncodeRow builds a binary representation of row values (row(), composite types) +func EncodeRow(ci *ConnInfo, buf []byte, fields ...Value) (newBuf []byte, err error) { + fieldBytes := make([]byte, 0, 128) + + newBuf = pgio.AppendUint32(buf, uint32(len(fields))) + for _, f := range fields { + dt, ok := ci.DataTypeForValue(f) + if !ok { + return nil, errors.Errorf("Unknown OID for %s", f) + } + newBuf = pgio.AppendUint32(newBuf, dt.OID) + + if f.Get() != nil { + binaryEncoder, ok := f.(BinaryEncoder) + if !ok { + return nil, errors.Errorf("record field doesn't implement binary encoding: %s", reflect.TypeOf(f).Name()) + } + fieldBytes, err = binaryEncoder.EncodeBinary(ci, fieldBytes[:0]) + if err != nil { + return nil, err + } + + newBuf = pgio.AppendUint32(newBuf, uint32(len(fieldBytes))) + newBuf = append(newBuf, fieldBytes...) + } else { + newBuf = pgio.AppendInt32(newBuf, int32(-1)) + } + + } + return +} + // ROW allows deconstructing row values (records and composite types) into // fields directly without creating your own type and implementing decoder interfaces func ROW(isNull *bool, fields ...interface{}) BinaryDecoderFunc { From 3ce29f9e055b46203c43c51744f536888942c018 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Mon, 13 Apr 2020 01:52:06 +0100 Subject: [PATCH 07/18] Add Composite type for inplace row() values handling Composite() function returns a private type, which should be registered with ConnInfo.RegisterDataType for the composite type's OID. All subsequent interaction with Composite types is to be done via Row(...) function. Function return value can be either passed as a query argument to build SQL composite value out of individual fields or passed to Scan to read SQL composite value back. When passed to Scan, Row() should have first argument of type *bool to flag NULL values returned from query. --- composite.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++ composite_test.go | 16 ++++-- convert.go | 28 ---------- pgtype.go | 9 ++++ 4 files changed, 150 insertions(+), 31 deletions(-) create mode 100644 composite.go diff --git a/composite.go b/composite.go new file mode 100644 index 00000000..1caa24d6 --- /dev/null +++ b/composite.go @@ -0,0 +1,128 @@ +package pgtype + +import ( + errors "golang.org/x/xerrors" +) + +type composite struct { + fields []Value + status Status +} + +// helper struct to act both as a scanning target and query argument +type rowValue struct { + args []interface{} +} + +// Row helper function builds a value which can be both used to +// "assemble" composite quiery arguments and to scan results back. +// +// When passed as an argument to query, values from Row args will +// be assigned to corresponding fields in a composite type and a single +// composite type will be passed to the PostgreSQL. Composite type need +// to be registered in ConnInfo first. This is required so that pgx +// can know which SQL types to use when constructing SQL composite argument +// +// When passed to Scan individual fields from composite query result +// are assigned to corresponding Row arguments. First argument MUST +// be of type *bool to flag when NULL value received. So total number +// of Row arguments, when passed to Scan should be number of composite +// fields you expect to read + 1 +func Row(fields ...interface{}) rowValue { + return rowValue{fields} +} + +// Composite types is meant to be passed to ConnInfo.RegisterDataType only, +// so it is made private on purpose. Once registered, it allows Row +// function to correctly pass query arguments. +func Composite(fields ...Value) *composite { + return &composite{fields, Undefined} +} + +func (src composite) Get() interface{} { + switch src.status { + case Present: + return src + case Null: + return nil + default: + return src.status + } +} + +// Set is called internally when passing query arguments. +// Only valid src is a result of pgtype.Row() or nil +func (dst *composite) Set(src interface{}) error { + if src == nil { + *dst = composite{status: Null} + return nil + } + + switch value := src.(type) { + case rowValue: + if len(value.args) != len(dst.fields) { + return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields)) + } + for i, v := range value.args { + if err := dst.fields[i].Set(v); err != nil { + return err + } + } + dst.status = Present + default: + return errors.Errorf("Use pgtype.Row() as query parameter") + } + + return nil +} + +// AssignTo is never called on composite value directly, it is here +// to satisfy Valuer interface +func (src composite) AssignTo(dst interface{}) error { + return errors.New("BUG: should never be called, because pgtype.composite doesn't support decoding") +} + +func (src composite) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) { + return EncodeRow(ci, buf, src.fields...) +} + +// DecodeBinary here is just to make pgx use binary result format by default. +// Users should be using Row function or their own types to scan composites +func (src composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) { + return errors.New("Pass pgtype.Row() to Scan to deconstruct Composite") +} + +// DecodeBinary is called when pgtype.Row() is passed to Scan() to +// deconstruct composite value +func (r rowValue) DecodeBinary(ci *ConnInfo, src []byte) error { + if len(r.args) == 0 { + return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan") + } + + isNull, ok := r.args[0].(*bool) + if !ok { + return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan") + } + args := r.args[1:] + + var record Record + if err := record.DecodeBinary(ci, src); err != nil { + return err + } + + if record.Status == Null { + *isNull = true + return nil + } + + if len(record.Fields) != len(args) { + return errors.Errorf("SQL composite can't be read, 'pgtype.Row' has wrong field cout. %d != %d", len(record.Fields), len(args)) + } + + for i, f := range record.Fields { + if err := f.AssignTo(args[i]); err != nil { + return err + } + } + return nil +} diff --git a/composite_test.go b/composite_test.go index d0c48f6e..b38cdd45 100644 --- a/composite_test.go +++ b/composite_test.go @@ -81,7 +81,8 @@ create type mytype as ( // Demonstrates both passing and reading back composite values err = conn.QueryRow(context.Background(), "select $1::mytype", - pgx.QueryResultFormats{pgx.BinaryFormatCode}, MyType{1, ptrS("foo")}).Scan(&result) + pgx.QueryResultFormats{pgx.BinaryFormatCode}, MyType{1, ptrS("foo")}). + Scan(&result) E(err) fmt.Printf("First row: a=%d b=%s\n", result.a, *result.b) @@ -92,12 +93,21 @@ create type mytype as ( fmt.Printf("Second row: %v\n", result) - // Adhoc rows can be decoded inplace without boilerplate (works with composite types too) + //WIP + q, err := conn.Prepare(context.Background(), "z", "select $1::mytype") + E(err) + conn.ConnInfo().RegisterDataType(pgtype.DataType{pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}), "mytype", q.ParamOIDs[0]}) + + // Adhoc rows can be decoded inplace without boilerplate + // Composite types can be encoded/decoded inplace + var isNull bool var a int var b *string - err = conn.QueryRow(context.Background(), "select (2, 'bar')::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(pgtype.ROW(&isNull, &a, &b)) + err = conn.QueryRow(context.Background(), "select row(($1::mytype).a, ($1).b)", + pgx.QueryResultFormats{pgx.BinaryFormatCode}, pgtype.Row(2, "bar")). + Scan(pgtype.Row(&isNull, &a, &b)) E(err) fmt.Printf("Adhoc: isNull=%v a=%d b=%s", isNull, a, *b) diff --git a/convert.go b/convert.go index d22a714f..134e123d 100644 --- a/convert.go +++ b/convert.go @@ -504,34 +504,6 @@ func EncodeRow(ci *ConnInfo, buf []byte, fields ...Value) (newBuf []byte, err er return } -// ROW allows deconstructing row values (records and composite types) into -// fields directly without creating your own type and implementing decoder interfaces -func ROW(isNull *bool, fields ...interface{}) BinaryDecoderFunc { - return func(ci *ConnInfo, src []byte) error { - var record Record - if err := record.DecodeBinary(ci, src); err != nil { - return err - } - - if record.Status == Null { - *isNull = true - return nil - } - - if len(record.Fields) != len(fields) { - return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", len(record.Fields), len(fields)) - } - - for i, f := range record.Fields { - if err := f.AssignTo(fields[i]); err != nil { - return err - } - } - - return nil - } -} - func init() { kindTypes = map[reflect.Kind]reflect.Type{ reflect.Bool: reflect.TypeOf(false), diff --git a/pgtype.go b/pgtype.go index 1749c8c2..e86255f4 100644 --- a/pgtype.go +++ b/pgtype.go @@ -167,6 +167,15 @@ func (f BinaryDecoderFunc) DecodeBinary(ci *ConnInfo, src []byte) error { return f(ci, src) } +//The BinaryEncoderFunc type is an adapter to allow the use of ordinary functions as BinaryDecoder types. +// If f is a function with the appropriate signature, BinaryEncoderFunc(f) is a BinaryDecoder that calls f. +type BinaryEncoderFunc func(ci *ConnInfo, buf []byte) ([]byte, error) + +// EncodeBinary calls f(ci, buf) +func (f BinaryEncoderFunc) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) { + return f(ci, buf) +} + var errUndefined = errors.New("cannot encode status undefined") var errBadStatus = errors.New("invalid status") From a6747b513f7e839171908923e91ad8c13ca8c51d Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Mon, 13 Apr 2020 17:44:02 +0100 Subject: [PATCH 08/18] Split composite examples --- composite_test.go | 99 ++++++++------------------------------ custom_composite_test.go | 101 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 80 deletions(-) create mode 100644 custom_composite_test.go diff --git a/composite_test.go b/composite_test.go index b38cdd45..3e63151c 100644 --- a/composite_test.go +++ b/composite_test.go @@ -7,63 +7,11 @@ import ( "github.com/jackc/pgtype" pgx "github.com/jackc/pgx/v4" - errors "golang.org/x/xerrors" ) -type MyType struct { - a int32 // NULL will cause decoding error - b *string // there can be NULL in this position in SQL -} - -func (dst *MyType) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { - if src == nil { - return errors.New("NULL values can't be decoded. Scan into a &*MyType to handle NULLs") - } - - a := pgtype.Int4{} - b := pgtype.Text{} - - if err := pgtype.ScanRowValue(ci, src, &a, &b); err != nil { - return err - } - - // type compatibility is checked by AssignTo - // only lossless assignments will succeed - if err := a.AssignTo(&dst.a); err != nil { - return err - } - - // AssignTo also deals with null value handling - if err := b.AssignTo(&dst.b); err != nil { - return err - } - - return nil -} - -func (src MyType) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error) { - a := pgtype.Int4{src.a, pgtype.Present} - var b pgtype.Text - if src.b != nil { - b = pgtype.Text{*src.b, pgtype.Present} - } else { - b = pgtype.Text{Status: pgtype.Null} - } - - return pgtype.EncodeRow(ci, buf, &a, &b) -} - -func ptrS(s string) *string { - return &s -} - -func E(err error) { - if err != nil { - panic(err) - } -} - -func Example_compositeTypes() { +//ExampleComposite demonstrates use of Row() function to pass and receive +// back composite types without creating boilderplate custom types. +func Example_composite() { conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) E(err) @@ -77,43 +25,34 @@ create type mytype as ( E(err) defer conn.Exec(context.Background(), "drop type mytype") - var result *MyType - - // Demonstrates both passing and reading back composite values - err = conn.QueryRow(context.Background(), "select $1::mytype", - pgx.QueryResultFormats{pgx.BinaryFormatCode}, MyType{1, ptrS("foo")}). - Scan(&result) - E(err) - - fmt.Printf("First row: a=%d b=%s\n", result.a, *result.b) - - // Because we scan into &*MyType, NULLs are handled generically by assigning nil to result - err = conn.QueryRow(context.Background(), "select NULL::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result) - E(err) - - fmt.Printf("Second row: %v\n", result) - //WIP q, err := conn.Prepare(context.Background(), "z", "select $1::mytype") E(err) conn.ConnInfo().RegisterDataType(pgtype.DataType{pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}), "mytype", q.ParamOIDs[0]}) - // Adhoc rows can be decoded inplace without boilerplate - // Composite types can be encoded/decoded inplace - var isNull bool var a int var b *string - err = conn.QueryRow(context.Background(), "select row(($1::mytype).a, ($1).b)", - pgx.QueryResultFormats{pgx.BinaryFormatCode}, pgtype.Row(2, "bar")). + err = conn.QueryRow(context.Background(), "select $1::mytype", + pgtype.Row(2, "bar")). Scan(pgtype.Row(&isNull, &a, &b)) E(err) - fmt.Printf("Adhoc: isNull=%v a=%d b=%s", isNull, a, *b) + fmt.Printf("First: isNull=%v a=%d b=%s\n", isNull, a, *b) + + err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(pgtype.Row(&isNull, &a, &b)) + E(err) + + fmt.Printf("Second: isNull=%v a=%d b=%v\n", isNull, a, b) + + err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(pgtype.Row(&isNull, &a, &b)) + E(err) + + fmt.Printf("Third: isNull=%v\n", isNull) // Output: - // First row: a=1 b=foo - // Second row: - // Adhoc: isNull=false a=2 b=bar + // First: isNull=false a=2 b=bar + // Second: isNull=false a=1 b= + // Third: isNull=true } diff --git a/custom_composite_test.go b/custom_composite_test.go new file mode 100644 index 00000000..61ea91c5 --- /dev/null +++ b/custom_composite_test.go @@ -0,0 +1,101 @@ +package pgtype_test + +import ( + "context" + "fmt" + "os" + + "github.com/jackc/pgtype" + pgx "github.com/jackc/pgx/v4" + errors "golang.org/x/xerrors" +) + +type MyType struct { + a int32 // NULL will cause decoding error + b *string // there can be NULL in this position in SQL +} + +func (dst *MyType) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { + if src == nil { + return errors.New("NULL values can't be decoded. Scan into a &*MyType to handle NULLs") + } + + a := pgtype.Int4{} + b := pgtype.Text{} + + if err := pgtype.ScanRowValue(ci, src, &a, &b); err != nil { + return err + } + + // type compatibility is checked by AssignTo + // only lossless assignments will succeed + if err := a.AssignTo(&dst.a); err != nil { + return err + } + + // AssignTo also deals with null value handling + if err := b.AssignTo(&dst.b); err != nil { + return err + } + + return nil +} + +func (src MyType) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error) { + a := pgtype.Int4{src.a, pgtype.Present} + var b pgtype.Text + if src.b != nil { + b = pgtype.Text{*src.b, pgtype.Present} + } else { + b = pgtype.Text{Status: pgtype.Null} + } + + return pgtype.EncodeRow(ci, buf, &a, &b) +} + +func ptrS(s string) *string { + return &s +} + +func E(err error) { + if err != nil { + panic(err) + } +} + +// ExampleCustomCompositeTypes demonstrates how support for custom types mappable to SQL +// composites can be added. +func Example_customCompositeTypes() { + conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + E(err) + + defer conn.Close(context.Background()) + _, err = conn.Exec(context.Background(), `drop type if exists mytype; + +create type mytype as ( + a int4, + b text +);`) + E(err) + defer conn.Exec(context.Background(), "drop type mytype") + + var result *MyType + + // Demonstrates both passing and reading back composite values + err = conn.QueryRow(context.Background(), "select $1::mytype", + pgx.QueryResultFormats{pgx.BinaryFormatCode}, MyType{1, ptrS("foo")}). + Scan(&result) + E(err) + + fmt.Printf("First row: a=%d b=%s\n", result.a, *result.b) + + // Because we scan into &*MyType, NULLs are handled generically by assigning nil to result + err = conn.QueryRow(context.Background(), "select NULL::mytype", pgx.QueryResultFormats{pgx.BinaryFormatCode}).Scan(&result) + E(err) + + fmt.Printf("Second row: %v\n", result) + + // Output: + // First row: a=1 b=foo + // Second row: +} From 2e13f2fe7691a7c99f55a85fdb2e8934da7a9582 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Thu, 16 Apr 2020 20:59:07 +0100 Subject: [PATCH 09/18] Move lowlevel binary routines into own package --- binary/record.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++ convert.go | 22 ++++++-------- record.go | 61 ++++--------------------------------- 3 files changed, 93 insertions(+), 68 deletions(-) create mode 100644 binary/record.go diff --git a/binary/record.go b/binary/record.go new file mode 100644 index 00000000..72b688a8 --- /dev/null +++ b/binary/record.go @@ -0,0 +1,78 @@ +package binary + +import ( + "encoding/binary" + + "github.com/jackc/pgio" + errors "golang.org/x/xerrors" +) + +type RecordFieldIter struct { + rp int + src []byte +} + +// NewRecordFieldIterator creates iterator over binary representation +// of record, aka ROW(), aka Composite +func NewRecordFieldIterator(src []byte) (RecordFieldIter, int, error) { + rp := 0 + if len(src[rp:]) < 4 { + return RecordFieldIter{}, 0, errors.Errorf("Record incomplete %v", src) + } + + fieldCount := int(int32(binary.BigEndian.Uint32(src[rp:]))) + rp += 4 + + return RecordFieldIter{ + rp: rp, + src: src, + }, fieldCount, nil +} + +// Next returns next field decoded from record. eof is returned if no +// more fields left to decode. +func (fi *RecordFieldIter) Next() (fieldOID uint32, buf []byte, eof bool, err error) { + if fi.rp == len(fi.src) { + eof = true + return + } + + if len(fi.src[fi.rp:]) < 8 { + err = errors.Errorf("Record incomplete %v", fi.src) + return + } + fieldOID = binary.BigEndian.Uint32(fi.src[fi.rp:]) + fi.rp += 4 + + fieldLen := int(int32(binary.BigEndian.Uint32(fi.src[fi.rp:]))) + fi.rp += 4 + + if fieldLen >= 0 { + if len(fi.src[fi.rp:]) < fieldLen { + err = errors.Errorf("Record incomplete rp=%d src=%v", fi.rp, fi.src) + return + } + buf = fi.src[fi.rp : fi.rp+fieldLen] + fi.rp += fieldLen + } + + return +} + +// RecordStart adds record header to the buf +func RecordStart(buf []byte, fieldCount int) []byte { + return pgio.AppendUint32(buf, uint32(fieldCount)) +} + +// RecordAdd adds record field to the buf +func RecordAdd(buf []byte, oid uint32, fieldBytes []byte) []byte { + buf = pgio.AppendUint32(buf, oid) + buf = pgio.AppendUint32(buf, uint32(len(fieldBytes))) + buf = append(buf, fieldBytes...) + return buf +} + +// RecordAddNull adds null value as a field to the buf +func RecordAddNull(buf []byte, oid uint32) []byte { + return pgio.AppendInt32(buf, int32(-1)) +} diff --git a/convert.go b/convert.go index 134e123d..6d5ea0c9 100644 --- a/convert.go +++ b/convert.go @@ -5,7 +5,7 @@ import ( "reflect" "time" - "github.com/jackc/pgio" + "github.com/jackc/pgtype/binary" errors "golang.org/x/xerrors" ) @@ -442,16 +442,16 @@ func GetAssignToDstType(dst interface{}) (interface{}, bool) { // Values must implement BinaryDecoder interface otherwise error is returned. // ScanRowValue takes ownership of src, caller MUST not use it after call func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { - fieldIter, err := newFieldIterator(src) + fieldIter, fieldCount, err := binary.NewRecordFieldIterator(src) if err != nil { return err } - if len(dst) != fieldIter.fieldCount { - return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", fieldIter.fieldCount, len(dst)) + if len(dst) != fieldCount { + return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", fieldCount, len(dst)) } - _, fieldBytes, eof, err := fieldIter.next() + _, fieldBytes, eof, err := fieldIter.Next() for i := 0; !eof; i++ { if err != nil { return err @@ -466,7 +466,7 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { return err } - _, fieldBytes, eof, err = fieldIter.next() + _, fieldBytes, eof, err = fieldIter.Next() } return nil @@ -476,14 +476,12 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { func EncodeRow(ci *ConnInfo, buf []byte, fields ...Value) (newBuf []byte, err error) { fieldBytes := make([]byte, 0, 128) - newBuf = pgio.AppendUint32(buf, uint32(len(fields))) + newBuf = binary.RecordStart(buf, len(fields)) for _, f := range fields { dt, ok := ci.DataTypeForValue(f) if !ok { return nil, errors.Errorf("Unknown OID for %s", f) } - newBuf = pgio.AppendUint32(newBuf, dt.OID) - if f.Get() != nil { binaryEncoder, ok := f.(BinaryEncoder) if !ok { @@ -493,11 +491,9 @@ func EncodeRow(ci *ConnInfo, buf []byte, fields ...Value) (newBuf []byte, err er if err != nil { return nil, err } - - newBuf = pgio.AppendUint32(newBuf, uint32(len(fieldBytes))) - newBuf = append(newBuf, fieldBytes...) + newBuf = binary.RecordAdd(newBuf, dt.OID, fieldBytes) } else { - newBuf = pgio.AppendInt32(newBuf, int32(-1)) + newBuf = binary.RecordAddNull(newBuf, dt.OID) } } diff --git a/record.go b/record.go index 08603140..4e39f92a 100644 --- a/record.go +++ b/record.go @@ -1,9 +1,10 @@ package pgtype import ( - "encoding/binary" "reflect" + "github.com/jackc/pgtype/binary" + errors "golang.org/x/xerrors" ) @@ -78,56 +79,6 @@ func (src *Record) AssignTo(dst interface{}) error { return errors.Errorf("cannot decode %#v into %T", src, dst) } -type fieldIter struct { - rp int - fieldCount int - src []byte -} - -func newFieldIterator(src []byte) (fieldIter, error) { - rp := 0 - if len(src[rp:]) < 4 { - return fieldIter{}, errors.Errorf("Record incomplete %v", src) - } - - fieldCount := int(int32(binary.BigEndian.Uint32(src[rp:]))) - rp += 4 - - return fieldIter{ - rp: rp, - fieldCount: fieldCount, - src: src, - }, nil -} - -func (fi *fieldIter) next() (fieldOID uint32, buf []byte, eof bool, err error) { - if fi.rp == len(fi.src) { - eof = true - return - } - - if len(fi.src[fi.rp:]) < 8 { - err = errors.Errorf("Record incomplete %v", fi.src) - return - } - fieldOID = binary.BigEndian.Uint32(fi.src[fi.rp:]) - fi.rp += 4 - - fieldLen := int(int32(binary.BigEndian.Uint32(fi.src[fi.rp:]))) - fi.rp += 4 - - if fieldLen >= 0 { - if len(fi.src[fi.rp:]) < fieldLen { - err = errors.Errorf("Record incomplete rp=%d src=%v", fi.rp, fi.src) - return - } - buf = fi.src[fi.rp : fi.rp+fieldLen] - fi.rp += fieldLen - } - - return -} - func prepareNewBinaryDecoder(ci *ConnInfo, fieldOID uint32, v *Value) (BinaryDecoder, error) { var binaryDecoder BinaryDecoder @@ -153,13 +104,13 @@ func (dst *Record) DecodeBinary(ci *ConnInfo, src []byte) error { return nil } - fieldIter, err := newFieldIterator(src) + fieldIter, fieldCount, err := binary.NewRecordFieldIterator(src) if err != nil { return err } - fields := make([]Value, fieldIter.fieldCount) - fieldOID, fieldBytes, eof, err := fieldIter.next() + fields := make([]Value, fieldCount) + fieldOID, fieldBytes, eof, err := fieldIter.Next() for i := 0; !eof; i++ { if err != nil { @@ -175,7 +126,7 @@ func (dst *Record) DecodeBinary(ci *ConnInfo, src []byte) error { return err } - fieldOID, fieldBytes, eof, err = fieldIter.next() + fieldOID, fieldBytes, eof, err = fieldIter.Next() } *dst = Record{Fields: fields, Status: Present} From 54a03cb143744322b83bfc5ba36bc77cf93644a6 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Thu, 16 Apr 2020 22:24:40 +0100 Subject: [PATCH 10/18] Add benchmark for various composite encoder implementations ``` BenchmarkBinaryEncodingManual-12 824053234 28.9 ns/op 0 B/op 0 allocs/op BenchmarkBinaryEncodingHelper-12 76815436 314 ns/op 192 B/op 5 allocs/op BenchmarkBinaryEncodingRow-12 65302958 364 ns/op 192 B/op 5 allocs/op ``` --- composite_bench_test.go | 70 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 composite_bench_test.go diff --git a/composite_bench_test.go b/composite_bench_test.go new file mode 100644 index 00000000..a1eba72b --- /dev/null +++ b/composite_bench_test.go @@ -0,0 +1,70 @@ +package pgtype_test + +import ( + "testing" + + "github.com/jackc/pgtype" + "github.com/jackc/pgtype/binary" +) + +type MyCompositeRaw struct { + a int32 + b *string +} + +func (src MyCompositeRaw) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error) { + a := pgtype.Int4{src.a, pgtype.Present} + + fieldBytes := make([]byte, 0, 64) + fieldBytes, _ = a.EncodeBinary(ci, fieldBytes[:0]) + + newBuf = binary.RecordStart(buf, 2) + newBuf = binary.RecordAdd(newBuf, pgtype.Int4OID, fieldBytes) + + if src.b != nil { + fieldBytes, _ = pgtype.Text{*src.b, pgtype.Present}.EncodeBinary(ci, fieldBytes[:0]) + newBuf = binary.RecordAdd(newBuf, pgtype.TextOID, fieldBytes) + } else { + newBuf = binary.RecordAddNull(newBuf, pgtype.TextOID) + } + return +} + +var x []byte + +func BenchmarkBinaryEncodingManual(b *testing.B) { + buf := make([]byte, 0, 128) + ci := pgtype.NewConnInfo() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + v := MyCompositeRaw{4, ptrS("ABCDEFG")} + buf, _ = v.EncodeBinary(ci, buf[:0]) + } + x = buf +} + +func BenchmarkBinaryEncodingHelper(b *testing.B) { + buf := make([]byte, 0, 128) + ci := pgtype.NewConnInfo() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + v := MyType{4, ptrS("ABCDEFG")} + buf, _ = v.EncodeBinary(ci, buf[:0]) + } + x = buf +} + +func BenchmarkBinaryEncodingRow(b *testing.B) { + buf := make([]byte, 0, 128) + ci := pgtype.NewConnInfo() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + c := pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}) + c.Set(pgtype.Row(2, "bar")) + buf, _ = c.EncodeBinary(ci, buf[:0]) + } + x = buf +} From b88a3e07653f3db164be10edf86edd1497bd56e7 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 18 Apr 2020 14:08:28 +0100 Subject: [PATCH 11/18] Tighten ScanRowValue input types ScanRowValue needs not Value, but BinaryEncoder --- convert.go | 20 ++++++++------------ record_test.go | 10 ++++++++-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/convert.go b/convert.go index 6d5ea0c9..45f117bc 100644 --- a/convert.go +++ b/convert.go @@ -434,14 +434,15 @@ func GetAssignToDstType(dst interface{}) (interface{}, bool) { return nil, false } -// ScanRowValue assigns ROW()'s fields to destination Values. -// Argument types are checked and error is returned if SQL field value -// can't be assigned to corresponding destionation Value without loss -// of information. Number of fields have to match number of destination values. +// ScanRowValue decodes ROW()'s and composite type +// from src argument using provided decoders. Decoders should match +// order and count of fields of record being decoded. +// +// In practice you can pass pgtype.Value types as decoders, as +// most of them implement BinaryDecoder interface. // -// Values must implement BinaryDecoder interface otherwise error is returned. // ScanRowValue takes ownership of src, caller MUST not use it after call -func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { +func ScanRowValue(ci *ConnInfo, src []byte, dst ...BinaryDecoder) error { fieldIter, fieldCount, err := binary.NewRecordFieldIterator(src) if err != nil { return err @@ -457,12 +458,7 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...Value) error { return err } - binaryDecoder, ok := dst[i].(BinaryDecoder) - if !ok { - return errors.Errorf("record field doesn't implement binary decoding: %s", reflect.TypeOf(dst[i]).Name()) - } - - if err = binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { + if err = dst[i].DecodeBinary(ci, fieldBytes); err != nil { return err } diff --git a/record_test.go b/record_test.go index af2105c7..9516612e 100644 --- a/record_test.go +++ b/record_test.go @@ -93,7 +93,10 @@ func TestScanRowValue(t *testing.T) { t.Fatal(err) } t.Run(tt.sql, func(t *testing.T) { - desc := append([]pgtype.Value(nil), tt.expected.Fields...) + desc := []pgtype.BinaryDecoder{} + for _, f := range tt.expected.Fields { + desc = append(desc, f.(pgtype.BinaryDecoder)) + } var raw pgtype.GenericBinary @@ -113,7 +116,10 @@ func TestScanRowValue(t *testing.T) { } // borrow fields from a neighbor test, this makes scan always fail - desc = append([]pgtype.Value(nil), recordTests[(i+1)%len(recordTests)].expected.Fields...) + desc = desc[:0] + for _, f := range recordTests[(i+1)%len(recordTests)].expected.Fields { + desc = append(desc, f.(pgtype.BinaryDecoder)) + } if err := pgtype.ScanRowValue(conn.ConnInfo(), raw.Bytes, desc...); err == nil { t.Error("Matching scan didn't fail, despite fields not mathching query result") } From 53e0f25a4e17a0bd0ad643e92d1f62e172fe6921 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 18 Apr 2020 19:29:08 +0000 Subject: [PATCH 12/18] Make ScanRowValue error message clearer --- convert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/convert.go b/convert.go index 45f117bc..8008d677 100644 --- a/convert.go +++ b/convert.go @@ -449,7 +449,7 @@ func ScanRowValue(ci *ConnInfo, src []byte, dst ...BinaryDecoder) error { } if len(dst) != fieldCount { - return errors.Errorf("can't scan row value, number of fields don't match: row fields count=%d desired fields count=%d", fieldCount, len(dst)) + return errors.Errorf("can't scan row value, number of fields don't match: found=%d expected=%d", fieldCount, len(dst)) } _, fieldBytes, eof, err := fieldIter.Next() From 72680d61f8072c85cb6e03ef51ac1be204736fc3 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 19 Apr 2020 11:30:21 +0000 Subject: [PATCH 13/18] Move value createion outside of encoding benchmark --- composite_bench_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/composite_bench_test.go b/composite_bench_test.go index a1eba72b..154b2e26 100644 --- a/composite_bench_test.go +++ b/composite_bench_test.go @@ -35,10 +35,10 @@ var x []byte func BenchmarkBinaryEncodingManual(b *testing.B) { buf := make([]byte, 0, 128) ci := pgtype.NewConnInfo() + v := MyCompositeRaw{4, ptrS("ABCDEFG")} b.ResetTimer() for n := 0; n < b.N; n++ { - v := MyCompositeRaw{4, ptrS("ABCDEFG")} buf, _ = v.EncodeBinary(ci, buf[:0]) } x = buf @@ -47,10 +47,10 @@ func BenchmarkBinaryEncodingManual(b *testing.B) { func BenchmarkBinaryEncodingHelper(b *testing.B) { buf := make([]byte, 0, 128) ci := pgtype.NewConnInfo() + v := MyType{4, ptrS("ABCDEFG")} b.ResetTimer() for n := 0; n < b.N; n++ { - v := MyType{4, ptrS("ABCDEFG")} buf, _ = v.EncodeBinary(ci, buf[:0]) } x = buf @@ -59,11 +59,13 @@ func BenchmarkBinaryEncodingHelper(b *testing.B) { func BenchmarkBinaryEncodingRow(b *testing.B) { buf := make([]byte, 0, 128) ci := pgtype.NewConnInfo() + f1 := 2 + f2 := ptrS("bar") b.ResetTimer() for n := 0; n < b.N; n++ { c := pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}) - c.Set(pgtype.Row(2, "bar")) + c.Set(pgtype.Row(f1, f2)) buf, _ = c.EncodeBinary(ci, buf[:0]) } x = buf From 04ff904ff59c7cdbb8bd3b7189c1b90bc02d3958 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 19 Apr 2020 15:46:10 +0000 Subject: [PATCH 14/18] Add binary decoding benchmarks ``` BenchmarkBinaryDecodingManual-4 10479085 106 ns/op 40 B/op 2 allocs/op BenchmarkBinaryDecodingHelpers-4 4485451 263 ns/op 64 B/op 4 allocs/op BenchmarkBinaryDecodingRow-4 1999726 587 ns/op 96 B/op 5 allocs/op ``` --- composite_bench_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/composite_bench_test.go b/composite_bench_test.go index 154b2e26..30e48ae7 100644 --- a/composite_bench_test.go +++ b/composite_bench_test.go @@ -5,6 +5,7 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgtype/binary" + errors "golang.org/x/xerrors" ) type MyCompositeRaw struct { @@ -30,6 +31,45 @@ func (src MyCompositeRaw) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf return } +func (dst *MyCompositeRaw) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { + a := pgtype.Int4{} + b := pgtype.Text{} + + fieldIter, fieldCount, err := binary.NewRecordFieldIterator(src) + if err != nil { + return err + } + + if 2 != fieldCount { + return errors.Errorf("can't scan row value, number of fields don't match: found=%d expected=2", fieldCount) + } + + _, fieldBytes, eof, err := fieldIter.Next() + if eof || err != nil { + return errors.New("Bad record") + } + if err = a.DecodeBinary(ci, fieldBytes); err != nil { + return err + } + + _, fieldBytes, eof, err = fieldIter.Next() + if eof || err != nil { + return errors.New("Bad record") + } + if err = b.DecodeBinary(ci, fieldBytes); err != nil { + return err + } + + dst.a = a.Int + if b.Status == pgtype.Present { + dst.b = &b.String + } else { + dst.b = nil + } + + return nil +} + var x []byte func BenchmarkBinaryEncodingManual(b *testing.B) { @@ -70,3 +110,52 @@ func BenchmarkBinaryEncodingRow(b *testing.B) { } x = buf } + +var dstRaw MyCompositeRaw + +func BenchmarkBinaryDecodingManual(b *testing.B) { + ci := pgtype.NewConnInfo() + buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil) + dst := MyCompositeRaw{} + + b.ResetTimer() + for n := 0; n < b.N; n++ { + err := dst.DecodeBinary(ci, buf) + E(err) + } + dstRaw = dst +} + +var dstMyType MyType + +func BenchmarkBinaryDecodingHelpers(b *testing.B) { + ci := pgtype.NewConnInfo() + buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil) + dst := MyType{} + + b.ResetTimer() + for n := 0; n < b.N; n++ { + err := dst.DecodeBinary(ci, buf) + E(err) + } + dstMyType = dst +} + +var gf1 int +var gf2 *string + +func BenchmarkBinaryDecodingRow(b *testing.B) { + ci := pgtype.NewConnInfo() + buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil) + var isNull bool + var f1 int + var f2 *string + + b.ResetTimer() + for n := 0; n < b.N; n++ { + err := pgtype.Row(&isNull, &f1, &f2).DecodeBinary(ci, buf) + E(err) + } + gf1 = f1 + gf2 = f2 +} From e283f322e1082cff623f19ac046fe1b5ee2b81ed Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Mon, 20 Apr 2020 22:38:20 +0000 Subject: [PATCH 15/18] Composite().Row() helper for working with composites without registration --- composite.go | 18 ++++++++++++++++++ composite_bench_test.go | 12 ++++++++++++ 2 files changed, 30 insertions(+) diff --git a/composite.go b/composite.go index 1caa24d6..d9f47d92 100644 --- a/composite.go +++ b/composite.go @@ -92,6 +92,24 @@ func (src composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) { return errors.New("Pass pgtype.Row() to Scan to deconstruct Composite") } +// Row method creates composite BinaryEncoder. It's main purpose +// is to build composite query argument inplace without registering +// pgtype.Composite in ConnInfo first +func (src composite) Row(values ...interface{}) BinaryEncoderFunc { + return func(ci *ConnInfo, buf []byte) ([]byte, error) { + if len(values) != len(src.fields) { + return nil, errors.Errorf("Number of fields don't match. Composite has %d fields", len(src.fields)) + } + for i, v := range values { + if err := src.fields[i].Set(v); err != nil { + return nil, err + } + } + src.status = Present + return src.EncodeBinary(ci, buf) + } +} + // DecodeBinary is called when pgtype.Row() is passed to Scan() to // deconstruct composite value func (r rowValue) DecodeBinary(ci *ConnInfo, src []byte) error { diff --git a/composite_bench_test.go b/composite_bench_test.go index 30e48ae7..67dcf1fd 100644 --- a/composite_bench_test.go +++ b/composite_bench_test.go @@ -110,6 +110,18 @@ func BenchmarkBinaryEncodingRow(b *testing.B) { } x = buf } +func BenchmarkBinaryEncodingRowInplace(b *testing.B) { + buf := make([]byte, 0, 128) + ci := pgtype.NewConnInfo() + f1 := 2 + f2 := ptrS("bar") + + b.ResetTimer() + for n := 0; n < b.N; n++ { + buf, _ = pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}).Row(f1, f2).EncodeBinary(ci, buf[:0]) + } + x = buf +} var dstRaw MyCompositeRaw From 5f0d5f42557769b5794e256eaf52566d10602b66 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Mon, 27 Apr 2020 00:40:29 +0100 Subject: [PATCH 16/18] Remove pgtype.Row(), introduce Composite.Scan() pgtype.Row() was optimized for a single line use without much ceremony at a cost of OID registration, which is cumbersome. In practice it so much incovnenience to create new Composite just before making a query. So now there is just a Composite type and 2 helper methods: - SetFields sets composite fields to values passed. This assignment fails if types passed are not assignable to Values pgtype is made of. - Scan acts exactly like query.Scan, but for a composite value. Passed values are set to values from SQL composite. --- composite.go | 195 +++++++++++++++++++++------------------- composite_bench_test.go | 22 ++--- composite_test.go | 17 ++-- 3 files changed, 114 insertions(+), 120 deletions(-) diff --git a/composite.go b/composite.go index d9f47d92..61034262 100644 --- a/composite.go +++ b/composite.go @@ -1,146 +1,153 @@ package pgtype import ( + "github.com/jackc/pgtype/binary" errors "golang.org/x/xerrors" ) -type composite struct { +type Composite struct { fields []Value - status Status + Status Status } -// helper struct to act both as a scanning target and query argument -type rowValue struct { - args []interface{} +// NewComposite creates a Composite object, which acts as a "schema" for +// SQL composite values. +// To pass Composite as SQL parameter first set it's fields, either by +// passing initialized Value{} instances to NewComposite or by calling +// SetFields method +// To read composite fields back pass result of Scan() method +// to query Scan function. +func NewComposite(fields ...Value) *Composite { + return &Composite{fields, Present} } -// Row helper function builds a value which can be both used to -// "assemble" composite quiery arguments and to scan results back. -// -// When passed as an argument to query, values from Row args will -// be assigned to corresponding fields in a composite type and a single -// composite type will be passed to the PostgreSQL. Composite type need -// to be registered in ConnInfo first. This is required so that pgx -// can know which SQL types to use when constructing SQL composite argument -// -// When passed to Scan individual fields from composite query result -// are assigned to corresponding Row arguments. First argument MUST -// be of type *bool to flag when NULL value received. So total number -// of Row arguments, when passed to Scan should be number of composite -// fields you expect to read + 1 -func Row(fields ...interface{}) rowValue { - return rowValue{fields} -} - -// Composite types is meant to be passed to ConnInfo.RegisterDataType only, -// so it is made private on purpose. Once registered, it allows Row -// function to correctly pass query arguments. -func Composite(fields ...Value) *composite { - return &composite{fields, Undefined} -} - -func (src composite) Get() interface{} { - switch src.status { +func (src Composite) Get() interface{} { + switch src.Status { case Present: return src case Null: return nil default: - return src.status + return src.Status } } // Set is called internally when passing query arguments. -// Only valid src is a result of pgtype.Row() or nil -func (dst *composite) Set(src interface{}) error { +func (dst *Composite) Set(src interface{}) error { if src == nil { - *dst = composite{status: Null} + *dst = Composite{Status: Null} return nil } switch value := src.(type) { - case rowValue: - if len(value.args) != len(dst.fields) { + case []Value: + if len(value) != len(dst.fields) { return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields)) } - for i, v := range value.args { + for i, v := range value { if err := dst.fields[i].Set(v); err != nil { return err } } - dst.status = Present + dst.Status = Present default: - return errors.Errorf("Use pgtype.Row() as query parameter") + return errors.Errorf("Can not convert %v to Composite", src) } return nil } -// AssignTo is never called on composite value directly, it is here -// to satisfy Valuer interface -func (src composite) AssignTo(dst interface{}) error { - return errors.New("BUG: should never be called, because pgtype.composite doesn't support decoding") +// AssignTo should never be called on composite value directly +func (src Composite) AssignTo(dst interface{}) error { + return errors.New("Pass Composite.Scan() to deconstruct composite") } -func (src composite) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) { +func (src Composite) EncodeBinary(ci *ConnInfo, buf []byte) (newBuf []byte, err error) { + switch src.Status { + case Null: + return nil, nil + case Undefined: + return nil, errUndefined + } return EncodeRow(ci, buf, src.fields...) } -// DecodeBinary here is just to make pgx use binary result format by default. -// Users should be using Row function or their own types to scan composites -func (src composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) { - return errors.New("Pass pgtype.Row() to Scan to deconstruct Composite") -} - -// Row method creates composite BinaryEncoder. It's main purpose -// is to build composite query argument inplace without registering -// pgtype.Composite in ConnInfo first -func (src composite) Row(values ...interface{}) BinaryEncoderFunc { - return func(ci *ConnInfo, buf []byte) ([]byte, error) { - if len(values) != len(src.fields) { - return nil, errors.Errorf("Number of fields don't match. Composite has %d fields", len(src.fields)) - } - for i, v := range values { - if err := src.fields[i].Set(v); err != nil { - return nil, err - } - } - src.status = Present - return src.EncodeBinary(ci, buf) - } -} - -// DecodeBinary is called when pgtype.Row() is passed to Scan() to -// deconstruct composite value -func (r rowValue) DecodeBinary(ci *ConnInfo, src []byte) error { - if len(r.args) == 0 { - return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan") - } - - isNull, ok := r.args[0].(*bool) - if !ok { - return errors.New("pgtype.Row must have 'isNull *bool' as a first argument when used in Scan") - } - args := r.args[1:] - - var record Record - if err := record.DecodeBinary(ci, src); err != nil { - return err - } - - if record.Status == Null { - *isNull = true +// DecodeBinary implements BinaryDecoder interface. +// Opposite to Record, fields in a composite act as a "schema" +// and decoding fails if SQL value can't be assigned due to +// type mismatch +func (dst *Composite) DecodeBinary(ci *ConnInfo, buf []byte) (err error) { + if buf == nil { + dst.Status = Null return nil } - if len(record.Fields) != len(args) { - return errors.Errorf("SQL composite can't be read, 'pgtype.Row' has wrong field cout. %d != %d", len(record.Fields), len(args)) + fieldIter, fieldCount, err := binary.NewRecordFieldIterator(buf) + if err != nil { + return err + } else if len(dst.fields) != fieldCount { + return errors.Errorf("SQL composite can't be read, field count mismatch. expected %d , found %d", len(dst.fields), fieldCount) } - for i, f := range record.Fields { - if err := f.AssignTo(args[i]); err != nil { + _, fieldBytes, eof, err := fieldIter.Next() + + for i := 0; !eof; i++ { + if err != nil { + return err + } + + binaryDecoder, ok := dst.fields[i].(BinaryDecoder) + if !ok { + return errors.New("Composite field doesn't support binary protocol") + } + + if err = binaryDecoder.DecodeBinary(ci, fieldBytes); err != nil { + return err + } + + _, fieldBytes, eof, err = fieldIter.Next() + } + dst.Status = Present + + return nil +} + +// Scan is a helper function to perform "nested" scan of +// a composite value when scanning a query result row. +// isNull is set if scanned value is NULL +// Rest of arguments are set in the order of fields in the composite +// +// Use of Scan method doesn't modify original composite +func (src Composite) Scan(isNull *bool, dst ...interface{}) BinaryDecoderFunc { + return func(ci *ConnInfo, buf []byte) error { + if err := src.DecodeBinary(ci, buf); err != nil { + return err + } + + if src.Status == Null { + *isNull = true + return nil + } + + for i, f := range src.fields { + if err := f.AssignTo(dst[i]); err != nil { + return err + } + } + return nil + } +} + +// SetFields sets Composite's fields to corresponding values +func (dst *Composite) SetFields(values ...interface{}) error { + if len(values) != len(dst.fields) { + return errors.Errorf("Number of fields don't match. Composite has %d fields", len(dst.fields)) + } + for i, v := range values { + if err := dst.fields[i].Set(v); err != nil { return err } } + dst.Status = Present return nil } diff --git a/composite_bench_test.go b/composite_bench_test.go index 67dcf1fd..323c3179 100644 --- a/composite_bench_test.go +++ b/composite_bench_test.go @@ -101,27 +101,15 @@ func BenchmarkBinaryEncodingRow(b *testing.B) { ci := pgtype.NewConnInfo() f1 := 2 f2 := ptrS("bar") + c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{}) b.ResetTimer() for n := 0; n < b.N; n++ { - c := pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}) - c.Set(pgtype.Row(f1, f2)) + c.SetFields(f1, f2) buf, _ = c.EncodeBinary(ci, buf[:0]) } x = buf } -func BenchmarkBinaryEncodingRowInplace(b *testing.B) { - buf := make([]byte, 0, 128) - ci := pgtype.NewConnInfo() - f1 := 2 - f2 := ptrS("bar") - - b.ResetTimer() - for n := 0; n < b.N; n++ { - buf, _ = pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}).Row(f1, f2).EncodeBinary(ci, buf[:0]) - } - x = buf -} var dstRaw MyCompositeRaw @@ -156,16 +144,18 @@ func BenchmarkBinaryDecodingHelpers(b *testing.B) { var gf1 int var gf2 *string -func BenchmarkBinaryDecodingRow(b *testing.B) { +func BenchmarkBinaryDecodingCompositeScan(b *testing.B) { ci := pgtype.NewConnInfo() buf, _ := MyType{4, ptrS("ABCDEFG")}.EncodeBinary(ci, nil) var isNull bool var f1 int var f2 *string + c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{}) + b.ResetTimer() for n := 0; n < b.N; n++ { - err := pgtype.Row(&isNull, &f1, &f2).DecodeBinary(ci, buf) + err := c.Scan(&isNull, &f1, &f2).DecodeBinary(ci, buf) E(err) } gf1 = f1 diff --git a/composite_test.go b/composite_test.go index 3e63151c..666de054 100644 --- a/composite_test.go +++ b/composite_test.go @@ -25,28 +25,25 @@ create type mytype as ( E(err) defer conn.Exec(context.Background(), "drop type mytype") - //WIP - q, err := conn.Prepare(context.Background(), "z", "select $1::mytype") - E(err) - conn.ConnInfo().RegisterDataType(pgtype.DataType{pgtype.Composite(&pgtype.Int4{}, &pgtype.Text{}), "mytype", q.ParamOIDs[0]}) - var isNull bool var a int var b *string - err = conn.QueryRow(context.Background(), "select $1::mytype", - pgtype.Row(2, "bar")). - Scan(pgtype.Row(&isNull, &a, &b)) + c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{}) + c.SetFields(2, "bar") + + err = conn.QueryRow(context.Background(), "select $1::mytype", c). + Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("First: isNull=%v a=%d b=%s\n", isNull, a, *b) - err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(pgtype.Row(&isNull, &a, &b)) + err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("Second: isNull=%v a=%d b=%v\n", isNull, a, b) - err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(pgtype.Row(&isNull, &a, &b)) + err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("Third: isNull=%v\n", isNull) From 700df0d05a8316577c60d3120b6f4f41895fc522 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Fri, 1 May 2020 23:35:58 +0100 Subject: [PATCH 17/18] Request binary format in Composite tests --- .vscode/settings.json | 6 ++++++ composite_test.go | 8 +++++--- 2 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..a32b4d68 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "go.inferGopath": false, + "go.testEnvVars": { + "PGX_TEST_DATABASE": "user=postgres database=pgx_test host=127.0.0.1" + }, +} \ No newline at end of file diff --git a/composite_test.go b/composite_test.go index 666de054..ac0eb4d0 100644 --- a/composite_test.go +++ b/composite_test.go @@ -25,6 +25,8 @@ create type mytype as ( E(err) defer conn.Exec(context.Background(), "drop type mytype") + qrf := pgx.QueryResultFormats{pgx.BinaryFormatCode} + var isNull bool var a int var b *string @@ -32,18 +34,18 @@ create type mytype as ( c := pgtype.NewComposite(&pgtype.Int4{}, &pgtype.Text{}) c.SetFields(2, "bar") - err = conn.QueryRow(context.Background(), "select $1::mytype", c). + err = conn.QueryRow(context.Background(), "select $1::mytype", qrf, c). Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("First: isNull=%v a=%d b=%s\n", isNull, a, *b) - err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype").Scan(c.Scan(&isNull, &a, &b)) + err = conn.QueryRow(context.Background(), "select (1, NULL)::mytype", qrf).Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("Second: isNull=%v a=%d b=%v\n", isNull, a, b) - err = conn.QueryRow(context.Background(), "select NULL::mytype").Scan(c.Scan(&isNull, &a, &b)) + err = conn.QueryRow(context.Background(), "select NULL::mytype", qrf).Scan(c.Scan(&isNull, &a, &b)) E(err) fmt.Printf("Third: isNull=%v\n", isNull) From 63c5d350a366a7d538ae5815b352f828134636d8 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sat, 2 May 2020 10:54:19 +0100 Subject: [PATCH 18/18] Add JSON benchmarks --- composite_bench_test.go | 51 +++++++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/composite_bench_test.go b/composite_bench_test.go index 323c3179..429ce9b3 100644 --- a/composite_bench_test.go +++ b/composite_bench_test.go @@ -9,12 +9,12 @@ import ( ) type MyCompositeRaw struct { - a int32 - b *string + A int32 + B *string } func (src MyCompositeRaw) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf []byte, err error) { - a := pgtype.Int4{src.a, pgtype.Present} + a := pgtype.Int4{src.A, pgtype.Present} fieldBytes := make([]byte, 0, 64) fieldBytes, _ = a.EncodeBinary(ci, fieldBytes[:0]) @@ -22,8 +22,8 @@ func (src MyCompositeRaw) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) (newBuf newBuf = binary.RecordStart(buf, 2) newBuf = binary.RecordAdd(newBuf, pgtype.Int4OID, fieldBytes) - if src.b != nil { - fieldBytes, _ = pgtype.Text{*src.b, pgtype.Present}.EncodeBinary(ci, fieldBytes[:0]) + if src.B != nil { + fieldBytes, _ = pgtype.Text{*src.B, pgtype.Present}.EncodeBinary(ci, fieldBytes[:0]) newBuf = binary.RecordAdd(newBuf, pgtype.TextOID, fieldBytes) } else { newBuf = binary.RecordAddNull(newBuf, pgtype.TextOID) @@ -60,11 +60,11 @@ func (dst *MyCompositeRaw) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error { return err } - dst.a = a.Int + dst.A = a.Int if b.Status == pgtype.Present { - dst.b = &b.String + dst.B = &b.String } else { - dst.b = nil + dst.B = nil } return nil @@ -96,7 +96,7 @@ func BenchmarkBinaryEncodingHelper(b *testing.B) { x = buf } -func BenchmarkBinaryEncodingRow(b *testing.B) { +func BenchmarkBinaryEncodingComposite(b *testing.B) { buf := make([]byte, 0, 128) ci := pgtype.NewConnInfo() f1 := 2 @@ -111,6 +111,20 @@ func BenchmarkBinaryEncodingRow(b *testing.B) { x = buf } +func BenchmarkBinaryEncodingJSON(b *testing.B) { + buf := make([]byte, 0, 128) + ci := pgtype.NewConnInfo() + v := MyCompositeRaw{4, ptrS("ABCDEFG")} + j := pgtype.JSON{} + + b.ResetTimer() + for n := 0; n < b.N; n++ { + j.Set(v) + buf, _ = j.EncodeBinary(ci, buf[:0]) + } + x = buf +} + var dstRaw MyCompositeRaw func BenchmarkBinaryDecodingManual(b *testing.B) { @@ -161,3 +175,22 @@ func BenchmarkBinaryDecodingCompositeScan(b *testing.B) { gf1 = f1 gf2 = f2 } + +func BenchmarkBinaryDecodingJSON(b *testing.B) { + ci := pgtype.NewConnInfo() + j := pgtype.JSON{} + j.Set(MyCompositeRaw{4, ptrS("ABCDEFG")}) + buf, _ := j.EncodeBinary(ci, nil) + + j = pgtype.JSON{} + dst := MyCompositeRaw{} + + b.ResetTimer() + for n := 0; n < b.N; n++ { + err := j.DecodeBinary(ci, buf) + E(err) + err = j.AssignTo(&dst) + E(err) + } + dstRaw = dst +}