2
0

Use result readers in next/get fashion

This commit is contained in:
Jack Christensen
2019-01-01 14:10:16 -06:00
parent b12b579814
commit 0330052b0a
3 changed files with 43 additions and 39 deletions
+16 -11
View File
@@ -538,10 +538,9 @@ type PgResultReader struct {
cleanupContext func() cleanupContext func()
} }
// GetResult returns a PgResultReader for the next result. If all results are consumed it returns nil. If an error // NextResult reads until a result is ready to be read or no results are pending. Returns true if a result is available.
// occurs it will be reported on the returned PgResultReader. Returned PgResultReader is only valid until next call of // Use ResultReader() to acquire a reader for the result.
// GetResult. func (pgConn *PgConn) NextResult(ctx context.Context) bool {
func (pgConn *PgConn) GetResult(ctx context.Context) *PgResultReader {
cleanupContext := contextDoneToConnDeadline(ctx, pgConn.conn) cleanupContext := contextDoneToConnDeadline(ctx, pgConn.conn)
for pgConn.pendingReadyForQueryCount > 0 { for pgConn.pendingReadyForQueryCount > 0 {
@@ -549,29 +548,34 @@ func (pgConn *PgConn) GetResult(ctx context.Context) *PgResultReader {
if err != nil { if err != nil {
cleanupContext() cleanupContext()
pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, err: preferContextOverNetTimeoutError(ctx, err), complete: true} pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, err: preferContextOverNetTimeoutError(ctx, err), complete: true}
return &pgConn.resultReader return true
} }
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.RowDescription: case *pgproto3.RowDescription:
pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, cleanupContext: cleanupContext, fieldDescriptions: msg.Fields} pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, cleanupContext: cleanupContext, fieldDescriptions: msg.Fields}
return &pgConn.resultReader return true
case *pgproto3.DataRow: case *pgproto3.DataRow:
pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, cleanupContext: cleanupContext, rowValues: msg.Values, preloadedRowValues: true} pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, cleanupContext: cleanupContext, rowValues: msg.Values, preloadedRowValues: true}
return &pgConn.resultReader return true
case *pgproto3.CommandComplete: case *pgproto3.CommandComplete:
cleanupContext() cleanupContext()
pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, commandTag: CommandTag(msg.CommandTag), complete: true} pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, commandTag: CommandTag(msg.CommandTag), complete: true}
return &pgConn.resultReader return true
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
cleanupContext() cleanupContext()
pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, err: errorResponseToPgError(msg), complete: true} pgConn.resultReader = PgResultReader{pgConn: pgConn, ctx: ctx, err: errorResponseToPgError(msg), complete: true}
return &pgConn.resultReader return true
} }
} }
cleanupContext() cleanupContext()
return nil return false
}
// ResultReader returns the result reader prepared by next result. It is only valid until the result is completed.
func (pgConn *PgConn) ResultReader() *PgResultReader {
return &pgConn.resultReader
} }
// NextRow returns advances the PgResultReader to the next row and returns true if a row is available. // NextRow returns advances the PgResultReader to the next row and returns true if a row is available.
@@ -806,7 +810,8 @@ func (pgConn *PgConn) Exec(ctx context.Context, sql string) (*PgResult, error) {
func (pgConn *PgConn) bufferLastResult(ctx context.Context) (*PgResult, error) { func (pgConn *PgConn) bufferLastResult(ctx context.Context) (*PgResult, error) {
var result *PgResult var result *PgResult
for resultReader := pgConn.GetResult(ctx); resultReader != nil; resultReader = pgConn.GetResult(ctx) { for pgConn.NextResult(ctx) {
resultReader := pgConn.ResultReader()
rows := [][][]byte{} rows := [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values())) row := make([][]byte, len(resultReader.Values()))
+14 -15
View File
@@ -84,10 +84,10 @@ func stressBatch(pgConn *pgconn.PgConn) error {
} }
// Query 1 // Query 1
resultReader := pgConn.GetResult(context.Background()) if !pgConn.NextResult(context.Background()) {
if resultReader == nil { return errors.New("missing result")
return errors.New("missing resultReader")
} }
resultReader := pgConn.ResultReader()
for resultReader.NextRow() { for resultReader.NextRow() {
} }
@@ -97,10 +97,10 @@ func stressBatch(pgConn *pgconn.PgConn) error {
} }
// Query 2 // Query 2
resultReader = pgConn.GetResult(context.Background()) if !pgConn.NextResult(context.Background()) {
if resultReader == nil { return errors.New("missing result")
return errors.New("missing resultReader")
} }
resultReader = pgConn.ResultReader()
for resultReader.NextRow() { for resultReader.NextRow() {
} }
@@ -110,8 +110,7 @@ func stressBatch(pgConn *pgconn.PgConn) error {
} }
// No more // No more
resultReader = pgConn.GetResult(context.Background()) if pgConn.NextResult(context.Background()) {
if resultReader != nil {
return errors.New("unexpected result reader") return errors.New("unexpected result reader")
} }
@@ -162,10 +161,10 @@ func stressBatchCanceled(pgConn *pgconn.PgConn) error {
} }
// Query 1 // Query 1
resultReader := pgConn.GetResult(context.Background()) if !pgConn.NextResult(context.Background()) {
if resultReader == nil { return errors.New("missing result")
return errors.New("missing resultReader")
} }
resultReader := pgConn.ResultReader()
for resultReader.NextRow() { for resultReader.NextRow() {
} }
@@ -176,11 +175,11 @@ func stressBatchCanceled(pgConn *pgconn.PgConn) error {
// Query 2 // Query 2
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
resultReader = pgConn.GetResult(ctx) if !pgConn.NextResult(ctx) {
cancel() return errors.New("missing result")
if resultReader == nil {
return errors.New("missing resultReader")
} }
cancel()
resultReader = pgConn.ResultReader()
for resultReader.NextRow() { for resultReader.NextRow() {
} }
+13 -13
View File
@@ -373,8 +373,8 @@ func TestConnBatchedQueries(t *testing.T) {
err = pgConn.Flush(context.Background()) err = pgConn.Flush(context.Background())
// "select 'SendExec 1'" // "select 'SendExec 1'"
resultReader := pgConn.GetResult(context.Background()) require.True(t, pgConn.NextResult(context.Background()))
require.NotNil(t, resultReader) resultReader := pgConn.ResultReader()
rows := [][][]byte{} rows := [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
@@ -391,8 +391,8 @@ func TestConnBatchedQueries(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// "SendExecParams 1" // "SendExecParams 1"
resultReader = pgConn.GetResult(context.Background()) require.True(t, pgConn.NextResult(context.Background()))
require.NotNil(t, resultReader) resultReader = pgConn.ResultReader()
rows = [][][]byte{} rows = [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
@@ -409,8 +409,8 @@ func TestConnBatchedQueries(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// "SendExecPrepared 1" // "SendExecPrepared 1"
resultReader = pgConn.GetResult(context.Background()) require.True(t, pgConn.NextResult(context.Background()))
require.NotNil(t, resultReader) resultReader = pgConn.ResultReader()
rows = [][][]byte{} rows = [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
@@ -427,8 +427,8 @@ func TestConnBatchedQueries(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// "SendExec 2" // "SendExec 2"
resultReader = pgConn.GetResult(context.Background()) require.True(t, pgConn.NextResult(context.Background()))
require.NotNil(t, resultReader) resultReader = pgConn.ResultReader()
rows = [][][]byte{} rows = [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
@@ -445,8 +445,8 @@ func TestConnBatchedQueries(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// "SendExecParams 2" // "SendExecParams 2"
resultReader = pgConn.GetResult(context.Background()) require.True(t, pgConn.NextResult(context.Background()))
require.NotNil(t, resultReader) resultReader = pgConn.ResultReader()
rows = [][][]byte{} rows = [][][]byte{}
for resultReader.NextRow() { for resultReader.NextRow() {
@@ -463,8 +463,7 @@ func TestConnBatchedQueries(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
// Done // Done
resultReader = pgConn.GetResult(context.Background()) require.False(t, pgConn.NextResult(context.Background()))
assert.Nil(t, resultReader)
} }
func TestConnRecoverFromTimeout(t *testing.T) { func TestConnRecoverFromTimeout(t *testing.T) {
@@ -505,7 +504,8 @@ func TestConnCancelQuery(t *testing.T) {
err = pgConn.CancelRequest(context.Background()) err = pgConn.CancelRequest(context.Background())
require.Nil(t, err) require.Nil(t, err)
_, err = pgConn.GetResult(context.Background()).Close() require.True(t, pgConn.NextResult(context.Background()))
_, err = pgConn.ResultReader().Close()
if err, ok := err.(*pgconn.PgError); ok { if err, ok := err.(*pgconn.PgError); ok {
assert.Equal(t, "57014", err.Code) assert.Equal(t, "57014", err.Code)
} else { } else {