Replace QueryFunc with ForEachScannedRow
This commit is contained in:
@@ -134,6 +134,11 @@ allows arbitrary rewriting of query SQL and arguments.
|
|||||||
## RowScanner Interface
|
## RowScanner Interface
|
||||||
|
|
||||||
The `RowScanner` interface allows a single argument to Rows.Scan to scan the entire row.
|
The `RowScanner` interface allows a single argument to Rows.Scan to scan the entire row.
|
||||||
|
|
||||||
|
## QueryFunc Replaced
|
||||||
|
|
||||||
|
`QueryFunc` has been replaced by using `ForEachScannedRow`.
|
||||||
|
|
||||||
## 3rd Party Logger Integration
|
## 3rd Party Logger Integration
|
||||||
|
|
||||||
All integrations with 3rd party loggers have been extracted to separate repositories. This trims the pgx dependency
|
All integrations with 3rd party loggers have been extracted to separate repositories. This trims the pgx dependency
|
||||||
|
|||||||
@@ -42,9 +42,6 @@ type BatchResults interface {
|
|||||||
// QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
|
// QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
|
||||||
QueryRow() Row
|
QueryRow() Row
|
||||||
|
|
||||||
// QueryFunc reads the results from the next query in the batch as if the query has been sent with Conn.QueryFunc.
|
|
||||||
QueryFunc(scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error)
|
|
||||||
|
|
||||||
// Close closes the batch operation. This must be called before the underlying connection can be used again. Any error
|
// Close closes the batch operation. This must be called before the underlying connection can be used again. Any error
|
||||||
// that occurred during a batch operation may have made it impossible to resyncronize the connection with the server.
|
// that occurred during a batch operation may have made it impossible to resyncronize the connection with the server.
|
||||||
// In this case the underlying connection will have been closed. Close is safe to call multiple times.
|
// In this case the underlying connection will have been closed. Close is safe to call multiple times.
|
||||||
@@ -148,37 +145,6 @@ func (br *batchResults) Query() (Rows, error) {
|
|||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryFunc reads the results from the next query in the batch as if the query has been sent with Conn.QueryFunc.
|
|
||||||
func (br *batchResults) QueryFunc(scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
if br.closed {
|
|
||||||
return pgconn.CommandTag{}, fmt.Errorf("batch already closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := br.Query()
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
err = rows.Scan(scans...)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = f(rows)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return rows.CommandTag(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
|
// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
|
||||||
func (br *batchResults) QueryRow() Row {
|
func (br *batchResults) QueryRow() Row {
|
||||||
rows, _ := br.Query()
|
rows, _ := br.Query()
|
||||||
|
|||||||
+2
-1
@@ -108,7 +108,8 @@ func TestConnSendBatch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rowCount = 0
|
rowCount = 0
|
||||||
_, err = br.QueryFunc([]any{&id, &description, &amount}, func(pgx.QueryFuncRow) error {
|
rows, _ = br.Query()
|
||||||
|
_, err = pgx.ForEachScannedRow(rows, []any{&id, &description, &amount}, func() error {
|
||||||
if id != selectFromLedgerExpectedRows[rowCount].id {
|
if id != selectFromLedgerExpectedRows[rowCount].id {
|
||||||
t.Errorf("id => %v, want %v", id, selectFromLedgerExpectedRows[rowCount].id)
|
t.Errorf("id => %v, want %v", id, selectFromLedgerExpectedRows[rowCount].id)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import (
|
|||||||
"github.com/jackc/pgx/v5/internal/sanitize"
|
"github.com/jackc/pgx/v5/internal/sanitize"
|
||||||
"github.com/jackc/pgx/v5/internal/stmtcache"
|
"github.com/jackc/pgx/v5/internal/stmtcache"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgproto3"
|
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -764,48 +763,6 @@ func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) Row {
|
|||||||
return (*connRow)(rows.(*baseRows))
|
return (*connRow)(rows.(*baseRows))
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryFuncRow is the argument to the QueryFunc callback function.
|
|
||||||
//
|
|
||||||
// QueryFuncRow is an interface instead of a struct to allow tests to mock QueryFunc. However, adding a method to an
|
|
||||||
// interface is technically a breaking change. Because of this the QueryFuncRow interface is partially excluded from
|
|
||||||
// semantic version requirements. Methods will not be removed or changed, but new methods may be added.
|
|
||||||
type QueryFuncRow interface {
|
|
||||||
FieldDescriptions() []pgproto3.FieldDescription
|
|
||||||
|
|
||||||
// RawValues returns the unparsed bytes of the row values. The returned data is only valid during the current
|
|
||||||
// function call.
|
|
||||||
RawValues() [][]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryFunc executes sql with args. For each row returned by the query the values will scanned into the elements of
|
|
||||||
// scans and f will be called. If any row fails to scan or f returns an error the query will be aborted and the error
|
|
||||||
// will be returned.
|
|
||||||
func (c *Conn) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
rows, err := c.Query(ctx, sql, args...)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
err = rows.Scan(scans...)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = f(rows)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return rows.CommandTag(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendBatch sends all queued queries to the server at once. All queries are run in an implicit transaction unless
|
// SendBatch sends all queued queries to the server at once. All queries are run in an implicit transaction unless
|
||||||
// explicit transaction control statements are executed. The returned BatchResults must be closed before the connection
|
// explicit transaction control statements are executed. The returned BatchResults must be closed before the connection
|
||||||
// is used again.
|
// is used again.
|
||||||
@@ -1038,20 +995,20 @@ func (c *Conn) getCompositeFields(ctx context.Context, oid uint32) ([]pgtype.Com
|
|||||||
var fields []pgtype.CompositeCodecField
|
var fields []pgtype.CompositeCodecField
|
||||||
var fieldName string
|
var fieldName string
|
||||||
var fieldOID uint32
|
var fieldOID uint32
|
||||||
_, err = c.QueryFunc(ctx, `select attname, atttypid
|
rows, _ := c.Query(ctx, `select attname, atttypid
|
||||||
from pg_attribute
|
from pg_attribute
|
||||||
where attrelid=$1
|
where attrelid=$1
|
||||||
order by attnum`,
|
order by attnum`,
|
||||||
[]any{typrelid},
|
typrelid,
|
||||||
[]any{&fieldName, &fieldOID},
|
)
|
||||||
func(qfr QueryFuncRow) error {
|
_, err = ForEachScannedRow(rows, []any{&fieldName, &fieldOID}, func() error {
|
||||||
dt, ok := c.TypeMap().TypeForOID(fieldOID)
|
dt, ok := c.TypeMap().TypeForOID(fieldOID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("unknown composite type field OID: %v", fieldOID)
|
return fmt.Errorf("unknown composite type field OID: %v", fieldOID)
|
||||||
}
|
}
|
||||||
fields = append(fields, pgtype.CompositeCodecField{Name: fieldName, Type: dt})
|
fields = append(fields, pgtype.CompositeCodecField{Name: fieldName, Type: dt})
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,6 +63,18 @@ pgx implements Query and Scan in the familiar database/sql style.
|
|||||||
|
|
||||||
// No errors found - do something with sum
|
// No errors found - do something with sum
|
||||||
|
|
||||||
|
ForEachScannedRow can be used to execute a callback function for every row. This is often easier than iterating over rows directly.
|
||||||
|
|
||||||
|
var sum, n int32
|
||||||
|
rows, _ := conn.Query(context.Background(), "select generate_series(1,$1)", 10)
|
||||||
|
_, err := pgx.ForEachScannedRow(rows, []any{&n}, func(pgx.QueryFuncRow) error {
|
||||||
|
sum += n
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
pgx also implements QueryRow in the same style as database/sql.
|
pgx also implements QueryRow in the same style as database/sql.
|
||||||
|
|
||||||
var name string
|
var name string
|
||||||
@@ -82,23 +94,6 @@ Use Exec to execute a query that does not return a result set.
|
|||||||
return errors.New("No row found to delete")
|
return errors.New("No row found to delete")
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryFunc can be used to execute a callback function for every row. This is often easier to use than Query.
|
|
||||||
|
|
||||||
var sum, n int32
|
|
||||||
_, err = conn.QueryFunc(
|
|
||||||
context.Background(),
|
|
||||||
"select generate_series(1,$1)",
|
|
||||||
[]any{10},
|
|
||||||
[]any{&n},
|
|
||||||
func(pgx.QueryFuncRow) error {
|
|
||||||
sum += n
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
Base Type Mapping
|
Base Type Mapping
|
||||||
|
|
||||||
pgx maps between all common base types directly between Go and PostgreSQL. In particular:
|
pgx maps between all common base types directly between Go and PostgreSQL. In particular:
|
||||||
|
|||||||
+2
-2
@@ -17,8 +17,8 @@ type BytesValuer interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DriverBytes is a byte slice that holds a reference to memory owned by the driver. It is only valid from the time it
|
// DriverBytes is a byte slice that holds a reference to memory owned by the driver. It is only valid from the time it
|
||||||
// is scanned until Rows.Next or Rows.Close is called. It is safe to use in a function passed to QueryFunc. It is never
|
// is scanned until Rows.Next or Rows.Close is called. It is never safe to use DriverBytes with QueryRow as Row.Scan
|
||||||
// safe to use DriverBytes with QueryRow as Row.Scan internally calls Rows.Close before returning.
|
// internally calls Rows.Close before returning.
|
||||||
type DriverBytes []byte
|
type DriverBytes []byte
|
||||||
|
|
||||||
func (b *DriverBytes) ScanBytes(v []byte) error {
|
func (b *DriverBytes) ScanBytes(v []byte) error {
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -22,13 +22,12 @@ func BenchmarkQuery<%= format_name %>FormatDecode_PG_<%= pg_type %>_to_Go_<%= go
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
var v [<%= columns %>]<%= go_type %>
|
var v [<%= columns %>]<%= go_type %>
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := conn.QueryFunc(
|
rows, _ := conn.Query(
|
||||||
ctx,
|
ctx,
|
||||||
`select <% columns.times do |col_idx| %><% if col_idx != 0 %>, <% end %>n::<%= pg_type %> + <%= col_idx%><% end %> from generate_series(1, <%= rows %>) n`,
|
`select <% columns.times do |col_idx| %><% if col_idx != 0 %>, <% end %>n::<%= pg_type %> + <%= col_idx%><% end %> from generate_series(1, <%= rows %>) n`,
|
||||||
[]any{pgx.QueryResultFormats{<%= format_code %>}},
|
[]any{pgx.QueryResultFormats{<%= format_code %>}},
|
||||||
[]any{<% columns.times do |col_idx| %><% if col_idx != 0 %>, <% end %>&v[<%= col_idx%>]<% end %>},
|
|
||||||
func(pgx.QueryFuncRow) error { return nil },
|
|
||||||
)
|
)
|
||||||
|
_, err := pgx.ForEachScannedRow(rows, []any{<% columns.times do |col_idx| %><% if col_idx != 0 %>, <% end %>&v[<%= col_idx%>]<% end %>}, func() error { return nil })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -47,13 +46,12 @@ func BenchmarkQuery<%= format_name %>FormatDecode_PG_Int4Array_With_Go_Int4Array
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
var v []int32
|
var v []int32
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := conn.QueryFunc(
|
rows, _ := conn.Query(
|
||||||
ctx,
|
ctx,
|
||||||
`select array_agg(n) from generate_series(1, <%= array_size %>) n`,
|
`select array_agg(n) from generate_series(1, <%= array_size %>) n`,
|
||||||
[]any{pgx.QueryResultFormats{<%= format_code %>}},
|
[]any{pgx.QueryResultFormats{<%= format_code %>}},
|
||||||
[]any{&v},
|
|
||||||
func(pgx.QueryFuncRow) error { return nil },
|
|
||||||
)
|
)
|
||||||
|
_, err := pgx.ForEachScannedRow(rows, []any{&v}, func() error { return nil })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,10 +17,6 @@ func (br errBatchResults) Query() (pgx.Rows, error) {
|
|||||||
return errRows{err: br.err}, br.err
|
return errRows{err: br.err}, br.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br errBatchResults) QueryFunc(scans []any, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
return pgconn.CommandTag{}, br.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (br errBatchResults) QueryRow() pgx.Row {
|
func (br errBatchResults) QueryRow() pgx.Row {
|
||||||
return errRow{err: br.err}
|
return errRow{err: br.err}
|
||||||
}
|
}
|
||||||
@@ -42,10 +38,6 @@ func (br *poolBatchResults) Query() (pgx.Rows, error) {
|
|||||||
return br.br.Query()
|
return br.br.Query()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *poolBatchResults) QueryFunc(scans []any, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
return br.br.QueryFunc(scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (br *poolBatchResults) QueryRow() pgx.Row {
|
func (br *poolBatchResults) QueryRow() pgx.Row {
|
||||||
return br.br.QueryRow()
|
return br.br.QueryRow()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,10 +74,6 @@ func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
|
|||||||
return c.Conn().QueryRow(ctx, sql, args...)
|
return c.Conn().QueryRow(ctx, sql, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
return c.Conn().QueryFunc(ctx, sql, args, scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||||
return c.Conn().SendBatch(ctx, b)
|
return c.Conn().SendBatch(ctx, b)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -533,16 +533,6 @@ func (p *Pool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
|
|||||||
return c.getPoolRow(row)
|
return c.getPoolRow(row)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
c, err := p.Acquire(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return pgconn.CommandTag{}, err
|
|
||||||
}
|
|
||||||
defer c.Release()
|
|
||||||
|
|
||||||
return c.QueryFunc(ctx, sql, args, scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||||
c, err := p.Acquire(ctx)
|
c, err := p.Acquire(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -81,10 +81,6 @@ func (tx *Tx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
|
|||||||
return tx.t.QueryRow(ctx, sql, args...)
|
return tx.t.QueryRow(ctx, sql, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
return tx.t.QueryFunc(ctx, sql, args, scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tx *Tx) Conn() *pgx.Conn {
|
func (tx *Tx) Conn() *pgx.Conn {
|
||||||
return tx.t.Conn()
|
return tx.t.Conn()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1898,102 +1898,3 @@ func TestQueryWithQueryRewriter(t *testing.T) {
|
|||||||
require.NoError(t, rows.Err())
|
require.NoError(t, rows.Err())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnQueryFunc(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
|
||||||
var actualResults []any
|
|
||||||
|
|
||||||
var a, b int
|
|
||||||
ct, err := conn.QueryFunc(
|
|
||||||
context.Background(),
|
|
||||||
"select n, n * 2 from generate_series(1, $1) n",
|
|
||||||
[]any{3},
|
|
||||||
[]any{&a, &b},
|
|
||||||
func(pgx.QueryFuncRow) error {
|
|
||||||
actualResults = append(actualResults, []any{a, b})
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
expectedResults := []any{
|
|
||||||
[]any{1, 2},
|
|
||||||
[]any{2, 4},
|
|
||||||
[]any{3, 6},
|
|
||||||
}
|
|
||||||
require.Equal(t, expectedResults, actualResults)
|
|
||||||
require.EqualValues(t, 3, ct.RowsAffected())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnQueryFuncScanError(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
|
||||||
var actualResults []any
|
|
||||||
|
|
||||||
var a, b int
|
|
||||||
ct, err := conn.QueryFunc(
|
|
||||||
context.Background(),
|
|
||||||
"select 'foo', 'bar' from generate_series(1, $1) n",
|
|
||||||
[]any{3},
|
|
||||||
[]any{&a, &b},
|
|
||||||
func(pgx.QueryFuncRow) error {
|
|
||||||
actualResults = append(actualResults, []any{a, b})
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
require.EqualError(t, err, "can't scan into dest[0]: cannot scan OID 25 in text format into *int")
|
|
||||||
require.Equal(t, pgconn.CommandTag{}, ct)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnQueryFuncAbort(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
|
||||||
var a, b int
|
|
||||||
ct, err := conn.QueryFunc(
|
|
||||||
context.Background(),
|
|
||||||
"select n, n * 2 from generate_series(1, $1) n",
|
|
||||||
[]any{3},
|
|
||||||
[]any{&a, &b},
|
|
||||||
func(pgx.QueryFuncRow) error {
|
|
||||||
return errors.New("abort")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
require.EqualError(t, err, "abort")
|
|
||||||
require.Equal(t, pgconn.CommandTag{}, ct)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func ExampleConn_QueryFunc() {
|
|
||||||
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Unable to establish connection: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var a, b int
|
|
||||||
_, err = conn.QueryFunc(
|
|
||||||
context.Background(),
|
|
||||||
"select n, n * 2 from generate_series(1, $1) n",
|
|
||||||
[]any{3},
|
|
||||||
[]any{&a, &b},
|
|
||||||
func(pgx.QueryFuncRow) error {
|
|
||||||
fmt.Printf("%v, %v\n", a, b)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("QueryFunc error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Output:
|
|
||||||
// 1, 2
|
|
||||||
// 2, 4
|
|
||||||
// 3, 6
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -361,3 +361,28 @@ func RowsFromResultReader(typeMap *pgtype.Map, resultReader *pgconn.ResultReader
|
|||||||
resultReader: resultReader,
|
resultReader: resultReader,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForEachScannedRow iterates through rows. For each row it scans into the elements of scans and calls fn. If any row
|
||||||
|
// fails to scan or fn returns an error the query will be aborted and the error will be returned. Rows will be closed
|
||||||
|
// when ForEachScannedRow returns.
|
||||||
|
func ForEachScannedRow(rows Rows, scans []any, fn func() error) (pgconn.CommandTag, error) {
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
err := rows.Scan(scans...)
|
||||||
|
if err != nil {
|
||||||
|
return pgconn.CommandTag{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fn()
|
||||||
|
if err != nil {
|
||||||
|
return pgconn.CommandTag{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return pgconn.CommandTag{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows.CommandTag(), nil
|
||||||
|
}
|
||||||
|
|||||||
+100
@@ -2,9 +2,14 @@ package pgx_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
"github.com/jackc/pgx/v5/pgxtest"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,3 +33,98 @@ func TestRowScanner(t *testing.T) {
|
|||||||
require.Equal(t, int32(72), s.age)
|
require.Equal(t, int32(72), s.age)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForEachScannedRow(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
||||||
|
var actualResults []any
|
||||||
|
|
||||||
|
rows, _ := conn.Query(
|
||||||
|
context.Background(),
|
||||||
|
"select n, n * 2 from generate_series(1, $1) n",
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
var a, b int
|
||||||
|
ct, err := pgx.ForEachScannedRow(rows, []any{&a, &b}, func() error {
|
||||||
|
actualResults = append(actualResults, []any{a, b})
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expectedResults := []any{
|
||||||
|
[]any{1, 2},
|
||||||
|
[]any{2, 4},
|
||||||
|
[]any{3, 6},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedResults, actualResults)
|
||||||
|
require.EqualValues(t, 3, ct.RowsAffected())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForEachScannedRowScanError(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
||||||
|
var actualResults []any
|
||||||
|
|
||||||
|
rows, _ := conn.Query(
|
||||||
|
context.Background(),
|
||||||
|
"select 'foo', 'bar' from generate_series(1, $1) n",
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
var a, b int
|
||||||
|
ct, err := pgx.ForEachScannedRow(rows, []any{&a, &b}, func() error {
|
||||||
|
actualResults = append(actualResults, []any{a, b})
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.EqualError(t, err, "can't scan into dest[0]: cannot scan OID 25 in text format into *int")
|
||||||
|
require.Equal(t, pgconn.CommandTag{}, ct)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForEachScannedRowAbort(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
pgxtest.RunWithQueryExecModes(context.Background(), t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
|
||||||
|
rows, _ := conn.Query(
|
||||||
|
context.Background(),
|
||||||
|
"select n, n * 2 from generate_series(1, $1) n",
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
var a, b int
|
||||||
|
ct, err := pgx.ForEachScannedRow(rows, []any{&a, &b}, func() error {
|
||||||
|
return errors.New("abort")
|
||||||
|
})
|
||||||
|
require.EqualError(t, err, "abort")
|
||||||
|
require.Equal(t, pgconn.CommandTag{}, ct)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleForEachScannedRow() {
|
||||||
|
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Unable to establish connection: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, _ := conn.Query(
|
||||||
|
context.Background(),
|
||||||
|
"select n, n * 2 from generate_series(1, $1) n",
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
var a, b int
|
||||||
|
_, err = pgx.ForEachScannedRow(rows, []any{&a, &b}, func() error {
|
||||||
|
fmt.Printf("%v, %v\n", a, b)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("ForEachScannedRow error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output:
|
||||||
|
// 1, 2
|
||||||
|
// 2, 4
|
||||||
|
// 3, 6
|
||||||
|
}
|
||||||
|
|||||||
@@ -163,7 +163,6 @@ type Tx interface {
|
|||||||
Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
|
Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
|
||||||
Query(ctx context.Context, sql string, args ...any) (Rows, error)
|
Query(ctx context.Context, sql string, args ...any) (Rows, error)
|
||||||
QueryRow(ctx context.Context, sql string, args ...any) Row
|
QueryRow(ctx context.Context, sql string, args ...any) Row
|
||||||
QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error)
|
|
||||||
|
|
||||||
// Conn returns the underlying *Conn that on which this transaction is executing.
|
// Conn returns the underlying *Conn that on which this transaction is executing.
|
||||||
Conn() *Conn
|
Conn() *Conn
|
||||||
@@ -293,15 +292,6 @@ func (tx *dbTx) QueryRow(ctx context.Context, sql string, args ...any) Row {
|
|||||||
return (*connRow)(rows.(*baseRows))
|
return (*connRow)(rows.(*baseRows))
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryFunc delegates to the underlying *Conn.
|
|
||||||
func (tx *dbTx) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
if tx.closed {
|
|
||||||
return pgconn.CommandTag{}, ErrTxClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.conn.QueryFunc(ctx, sql, args, scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyFrom delegates to the underlying *Conn
|
// CopyFrom delegates to the underlying *Conn
|
||||||
func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
|
func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
|
||||||
if tx.closed {
|
if tx.closed {
|
||||||
@@ -412,15 +402,6 @@ func (sp *dbSimulatedNestedTx) QueryRow(ctx context.Context, sql string, args ..
|
|||||||
return (*connRow)(rows.(*baseRows))
|
return (*connRow)(rows.(*baseRows))
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryFunc delegates to the underlying Tx.
|
|
||||||
func (sp *dbSimulatedNestedTx) QueryFunc(ctx context.Context, sql string, args []any, scans []any, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
|
|
||||||
if sp.closed {
|
|
||||||
return pgconn.CommandTag{}, ErrTxClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
return sp.tx.QueryFunc(ctx, sql, args, scans, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyFrom delegates to the underlying *Conn
|
// CopyFrom delegates to the underlying *Conn
|
||||||
func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
|
func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
|
||||||
if sp.closed {
|
if sp.closed {
|
||||||
|
|||||||
Reference in New Issue
Block a user