Remove CopyTo
This commit is contained in:
+11
-11
@@ -331,27 +331,27 @@ const benchmarkWriteTableInsertSQL = `insert into t(
|
|||||||
$13::bool
|
$13::bool
|
||||||
)`
|
)`
|
||||||
|
|
||||||
type benchmarkWriteTableCopyToSrc struct {
|
type benchmarkWriteTableCopyFromSrc struct {
|
||||||
count int
|
count int
|
||||||
idx int
|
idx int
|
||||||
row []interface{}
|
row []interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *benchmarkWriteTableCopyToSrc) Next() bool {
|
func (s *benchmarkWriteTableCopyFromSrc) Next() bool {
|
||||||
s.idx++
|
s.idx++
|
||||||
return s.idx < s.count
|
return s.idx < s.count
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *benchmarkWriteTableCopyToSrc) Values() ([]interface{}, error) {
|
func (s *benchmarkWriteTableCopyFromSrc) Values() ([]interface{}, error) {
|
||||||
return s.row, nil
|
return s.row, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *benchmarkWriteTableCopyToSrc) Err() error {
|
func (s *benchmarkWriteTableCopyFromSrc) Err() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBenchmarkWriteTableCopyToSrc(count int) pgx.CopyToSource {
|
func newBenchmarkWriteTableCopyFromSrc(count int) pgx.CopyFromSource {
|
||||||
return &benchmarkWriteTableCopyToSrc{
|
return &benchmarkWriteTableCopyFromSrc{
|
||||||
count: count,
|
count: count,
|
||||||
row: []interface{}{
|
row: []interface{}{
|
||||||
"varchar_1",
|
"varchar_1",
|
||||||
@@ -384,7 +384,7 @@ func benchmarkWriteNRowsViaInsert(b *testing.B, n int) {
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
src := newBenchmarkWriteTableCopyToSrc(n)
|
src := newBenchmarkWriteTableCopyFromSrc(n)
|
||||||
|
|
||||||
tx, err := conn.Begin()
|
tx, err := conn.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -407,7 +407,7 @@ func benchmarkWriteNRowsViaInsert(b *testing.B, n int) {
|
|||||||
|
|
||||||
// note this function is only used for benchmarks -- it doesn't escape tableName
|
// note this function is only used for benchmarks -- it doesn't escape tableName
|
||||||
// or columnNames
|
// or columnNames
|
||||||
func multiInsert(conn *pgx.Conn, tableName string, columnNames []string, rowSrc pgx.CopyToSource) (int, error) {
|
func multiInsert(conn *pgx.Conn, tableName string, columnNames []string, rowSrc pgx.CopyFromSource) (int, error) {
|
||||||
maxRowsPerInsert := 65535 / len(columnNames)
|
maxRowsPerInsert := 65535 / len(columnNames)
|
||||||
rowsThisInsert := 0
|
rowsThisInsert := 0
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
@@ -495,7 +495,7 @@ func benchmarkWriteNRowsViaMultiInsert(b *testing.B, n int) {
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
src := newBenchmarkWriteTableCopyToSrc(n)
|
src := newBenchmarkWriteTableCopyFromSrc(n)
|
||||||
|
|
||||||
_, err := multiInsert(conn, "t",
|
_, err := multiInsert(conn, "t",
|
||||||
[]string{"varchar_1",
|
[]string{"varchar_1",
|
||||||
@@ -527,9 +527,9 @@ func benchmarkWriteNRowsViaCopy(b *testing.B, n int) {
|
|||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
src := newBenchmarkWriteTableCopyToSrc(n)
|
src := newBenchmarkWriteTableCopyFromSrc(n)
|
||||||
|
|
||||||
_, err := conn.CopyTo("t",
|
_, err := conn.CopyFrom(pgx.Identifier{"t"},
|
||||||
[]string{"varchar_1",
|
[]string{"varchar_1",
|
||||||
"varchar_2",
|
"varchar_2",
|
||||||
"varchar_null_1",
|
"varchar_null_1",
|
||||||
|
|||||||
+3
-3
@@ -540,13 +540,13 @@ func (p *ConnPool) BeginEx(txOptions *TxOptions) (*Tx, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyTo acquires a connection, delegates the call to that connection, and releases the connection
|
// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection
|
||||||
func (p *ConnPool) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) {
|
func (p *ConnPool) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int, error) {
|
||||||
c, err := p.Acquire()
|
c, err := p.Acquire()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer p.Release(c)
|
defer p.Release(c)
|
||||||
|
|
||||||
return c.CopyTo(tableName, columnNames, rowSrc)
|
return c.CopyFrom(tableName, columnNames, rowSrc)
|
||||||
}
|
}
|
||||||
|
|||||||
+26
-25
@@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx"
|
"github.com/jackc/pgx"
|
||||||
|
"github.com/jackc/pgx/pgtype"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConnCopyFromSmall(t *testing.T) {
|
func TestConnCopyFromSmall(t *testing.T) {
|
||||||
@@ -26,7 +27,7 @@ func TestConnCopyFromSmall(t *testing.T) {
|
|||||||
)`)
|
)`)
|
||||||
|
|
||||||
inputRows := [][]interface{}{
|
inputRows := [][]interface{}{
|
||||||
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)},
|
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)},
|
||||||
{nil, nil, nil, nil, nil, nil, nil},
|
{nil, nil, nil, nil, nil, nil, nil},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,7 +84,7 @@ func TestConnCopyFromLarge(t *testing.T) {
|
|||||||
inputRows := [][]interface{}{}
|
inputRows := [][]interface{}{}
|
||||||
|
|
||||||
for i := 0; i < 10000; i++ {
|
for i := 0; i < 10000; i++ {
|
||||||
inputRows = append(inputRows, []interface{}{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local), []byte{111, 111, 111, 111}})
|
inputRows = append(inputRows, []interface{}{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local), []byte{111, 111, 111, 111}})
|
||||||
}
|
}
|
||||||
|
|
||||||
copyCount, err := conn.CopyFrom(pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g", "h"}, pgx.CopyFromRows(inputRows))
|
copyCount, err := conn.CopyFrom(pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g", "h"}, pgx.CopyFromRows(inputRows))
|
||||||
@@ -125,7 +126,7 @@ func TestConnCopyFromJSON(t *testing.T) {
|
|||||||
conn := mustConnect(t, *defaultConnConfig)
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
defer closeConn(t, conn)
|
defer closeConn(t, conn)
|
||||||
|
|
||||||
for _, oid := range []pgx.Oid{pgx.JsonOid, pgx.JsonbOid} {
|
for _, oid := range []pgtype.Oid{pgx.JsonOid, pgx.JsonbOid} {
|
||||||
if _, ok := conn.PgTypes[oid]; !ok {
|
if _, ok := conn.PgTypes[oid]; !ok {
|
||||||
return // No JSON/JSONB type -- must be running against old PostgreSQL
|
return // No JSON/JSONB type -- must be running against old PostgreSQL
|
||||||
}
|
}
|
||||||
@@ -174,6 +175,28 @@ func TestConnCopyFromJSON(t *testing.T) {
|
|||||||
ensureConnValid(t, conn)
|
ensureConnValid(t, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type clientFailSource struct {
|
||||||
|
count int
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfs *clientFailSource) Next() bool {
|
||||||
|
cfs.count++
|
||||||
|
return cfs.count < 100
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfs *clientFailSource) Values() ([]interface{}, error) {
|
||||||
|
if cfs.count == 3 {
|
||||||
|
cfs.err = fmt.Errorf("client error")
|
||||||
|
return nil, cfs.err
|
||||||
|
}
|
||||||
|
return []interface{}{make([]byte, 100000)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfs *clientFailSource) Err() error {
|
||||||
|
return cfs.err
|
||||||
|
}
|
||||||
|
|
||||||
func TestConnCopyFromFailServerSideMidway(t *testing.T) {
|
func TestConnCopyFromFailServerSideMidway(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@@ -302,28 +325,6 @@ func TestConnCopyFromFailServerSideMidwayAbortsWithoutWaiting(t *testing.T) {
|
|||||||
ensureConnValid(t, conn)
|
ensureConnValid(t, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientFailSource struct {
|
|
||||||
count int
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfs *clientFailSource) Next() bool {
|
|
||||||
cfs.count++
|
|
||||||
return cfs.count < 100
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfs *clientFailSource) Values() ([]interface{}, error) {
|
|
||||||
if cfs.count == 3 {
|
|
||||||
cfs.err = fmt.Errorf("client error")
|
|
||||||
return nil, cfs.err
|
|
||||||
}
|
|
||||||
return []interface{}{make([]byte, 100000)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfs *clientFailSource) Err() error {
|
|
||||||
return cfs.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyFromCopyFromSourceErrorMidway(t *testing.T) {
|
func TestConnCopyFromCopyFromSourceErrorMidway(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
-221
@@ -1,221 +0,0 @@
|
|||||||
package pgx
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Deprecated. Use CopyFromRows instead. CopyToRows returns a CopyToSource
|
|
||||||
// interface over the provided rows slice making it usable by *Conn.CopyTo.
|
|
||||||
func CopyToRows(rows [][]interface{}) CopyToSource {
|
|
||||||
return ©ToRows{rows: rows, idx: -1}
|
|
||||||
}
|
|
||||||
|
|
||||||
type copyToRows struct {
|
|
||||||
rows [][]interface{}
|
|
||||||
idx int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctr *copyToRows) Next() bool {
|
|
||||||
ctr.idx++
|
|
||||||
return ctr.idx < len(ctr.rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctr *copyToRows) Values() ([]interface{}, error) {
|
|
||||||
return ctr.rows[ctr.idx], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctr *copyToRows) Err() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deprecated. Use CopyFromSource instead. CopyToSource is the interface used by
|
|
||||||
// *Conn.CopyTo as the source for copy data.
|
|
||||||
type CopyToSource interface {
|
|
||||||
// Next returns true if there is another row and makes the next row data
|
|
||||||
// available to Values(). When there are no more rows available or an error
|
|
||||||
// has occurred it returns false.
|
|
||||||
Next() bool
|
|
||||||
|
|
||||||
// Values returns the values for the current row.
|
|
||||||
Values() ([]interface{}, error)
|
|
||||||
|
|
||||||
// Err returns any error that has been encountered by the CopyToSource. If
|
|
||||||
// this is not nil *Conn.CopyTo will abort the copy.
|
|
||||||
Err() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type copyTo struct {
|
|
||||||
conn *Conn
|
|
||||||
tableName string
|
|
||||||
columnNames []string
|
|
||||||
rowSrc CopyToSource
|
|
||||||
readerErrChan chan error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *copyTo) readUntilReadyForQuery() {
|
|
||||||
for {
|
|
||||||
t, r, err := ct.conn.rxMsg()
|
|
||||||
if err != nil {
|
|
||||||
ct.readerErrChan <- err
|
|
||||||
close(ct.readerErrChan)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch t {
|
|
||||||
case readyForQuery:
|
|
||||||
ct.conn.rxReadyForQuery(r)
|
|
||||||
close(ct.readerErrChan)
|
|
||||||
return
|
|
||||||
case errorResponse:
|
|
||||||
ct.readerErrChan <- ct.conn.rxErrorResponse(r)
|
|
||||||
default:
|
|
||||||
err = ct.conn.processContextFreeMsg(t, r)
|
|
||||||
if err != nil {
|
|
||||||
ct.readerErrChan <- ct.conn.processContextFreeMsg(t, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *copyTo) waitForReaderDone() error {
|
|
||||||
var err error
|
|
||||||
for err = range ct.readerErrChan {
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *copyTo) run() (int, error) {
|
|
||||||
quotedTableName := quoteIdentifier(ct.tableName)
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
for i, cn := range ct.columnNames {
|
|
||||||
if i != 0 {
|
|
||||||
buf.WriteString(", ")
|
|
||||||
}
|
|
||||||
buf.WriteString(quoteIdentifier(cn))
|
|
||||||
}
|
|
||||||
quotedColumnNames := buf.String()
|
|
||||||
|
|
||||||
ps, err := ct.conn.Prepare("", fmt.Sprintf("select %s from %s", quotedColumnNames, quotedTableName))
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ct.conn.sendSimpleQuery(fmt.Sprintf("copy %s ( %s ) from stdin binary;", quotedTableName, quotedColumnNames))
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ct.conn.readUntilCopyInResponse()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go ct.readUntilReadyForQuery()
|
|
||||||
defer ct.waitForReaderDone()
|
|
||||||
|
|
||||||
wbuf := newWriteBuf(ct.conn, copyData)
|
|
||||||
|
|
||||||
wbuf.WriteBytes([]byte("PGCOPY\n\377\r\n\000"))
|
|
||||||
wbuf.WriteInt32(0)
|
|
||||||
wbuf.WriteInt32(0)
|
|
||||||
|
|
||||||
var sentCount int
|
|
||||||
|
|
||||||
for ct.rowSrc.Next() {
|
|
||||||
select {
|
|
||||||
case err = <-ct.readerErrChan:
|
|
||||||
return 0, err
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(wbuf.buf) > 65536 {
|
|
||||||
wbuf.closeMsg()
|
|
||||||
_, err = ct.conn.conn.Write(wbuf.buf)
|
|
||||||
if err != nil {
|
|
||||||
ct.conn.die(err)
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Directly manipulate wbuf to reset to reuse the same buffer
|
|
||||||
wbuf.buf = wbuf.buf[0:5]
|
|
||||||
wbuf.buf[0] = copyData
|
|
||||||
wbuf.sizeIdx = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
sentCount++
|
|
||||||
|
|
||||||
values, err := ct.rowSrc.Values()
|
|
||||||
if err != nil {
|
|
||||||
ct.cancelCopyIn()
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
if len(values) != len(ct.columnNames) {
|
|
||||||
ct.cancelCopyIn()
|
|
||||||
return 0, fmt.Errorf("expected %d values, got %d values", len(ct.columnNames), len(values))
|
|
||||||
}
|
|
||||||
|
|
||||||
wbuf.WriteInt16(int16(len(ct.columnNames)))
|
|
||||||
for i, val := range values {
|
|
||||||
err = Encode(wbuf, ps.FieldDescriptions[i].DataType, val)
|
|
||||||
if err != nil {
|
|
||||||
ct.cancelCopyIn()
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ct.rowSrc.Err() != nil {
|
|
||||||
ct.cancelCopyIn()
|
|
||||||
return 0, ct.rowSrc.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
wbuf.WriteInt16(-1) // terminate the copy stream
|
|
||||||
|
|
||||||
wbuf.startMsg(copyDone)
|
|
||||||
wbuf.closeMsg()
|
|
||||||
_, err = ct.conn.conn.Write(wbuf.buf)
|
|
||||||
if err != nil {
|
|
||||||
ct.conn.die(err)
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ct.waitForReaderDone()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return sentCount, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ct *copyTo) cancelCopyIn() error {
|
|
||||||
wbuf := newWriteBuf(ct.conn, copyFail)
|
|
||||||
wbuf.WriteCString("client error: abort")
|
|
||||||
wbuf.closeMsg()
|
|
||||||
_, err := ct.conn.conn.Write(wbuf.buf)
|
|
||||||
if err != nil {
|
|
||||||
ct.conn.die(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deprecated. Use CopyFrom instead. CopyTo uses the PostgreSQL copy protocol to
|
|
||||||
// perform bulk data insertion. It returns the number of rows copied and an
|
|
||||||
// error.
|
|
||||||
//
|
|
||||||
// CopyTo requires all values use the binary format. Almost all types
|
|
||||||
// implemented by pgx use the binary format by default. Types implementing
|
|
||||||
// Encoder can only be used if they encode to the binary format.
|
|
||||||
func (c *Conn) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) {
|
|
||||||
ct := ©To{
|
|
||||||
conn: c,
|
|
||||||
tableName: tableName,
|
|
||||||
columnNames: columnNames,
|
|
||||||
rowSrc: rowSrc,
|
|
||||||
readerErrChan: make(chan error),
|
|
||||||
}
|
|
||||||
|
|
||||||
return ct.run()
|
|
||||||
}
|
|
||||||
-368
@@ -1,368 +0,0 @@
|
|||||||
package pgx_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/jackc/pgx"
|
|
||||||
"github.com/jackc/pgx/pgtype"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestConnCopyToSmall(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a int2,
|
|
||||||
b int4,
|
|
||||||
c int8,
|
|
||||||
d varchar,
|
|
||||||
e text,
|
|
||||||
f date,
|
|
||||||
g timestamptz
|
|
||||||
)`)
|
|
||||||
|
|
||||||
inputRows := [][]interface{}{
|
|
||||||
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)},
|
|
||||||
{nil, nil, nil, nil, nil, nil, nil},
|
|
||||||
}
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyToRows(inputRows))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for CopyTo: %v", err)
|
|
||||||
}
|
|
||||||
if copyCount != len(inputRows) {
|
|
||||||
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(inputRows, outputRows) {
|
|
||||||
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToLarge(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a int2,
|
|
||||||
b int4,
|
|
||||||
c int8,
|
|
||||||
d varchar,
|
|
||||||
e text,
|
|
||||||
f date,
|
|
||||||
g timestamptz,
|
|
||||||
h bytea
|
|
||||||
)`)
|
|
||||||
|
|
||||||
inputRows := [][]interface{}{}
|
|
||||||
|
|
||||||
for i := 0; i < 10000; i++ {
|
|
||||||
inputRows = append(inputRows, []interface{}{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local), []byte{111, 111, 111, 111}})
|
|
||||||
}
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a", "b", "c", "d", "e", "f", "g", "h"}, pgx.CopyToRows(inputRows))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for CopyTo: %v", err)
|
|
||||||
}
|
|
||||||
if copyCount != len(inputRows) {
|
|
||||||
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(inputRows, outputRows) {
|
|
||||||
t.Errorf("Input rows and output rows do not equal")
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToJSON(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
for _, oid := range []pgtype.Oid{pgx.JsonOid, pgx.JsonbOid} {
|
|
||||||
if _, ok := conn.PgTypes[oid]; !ok {
|
|
||||||
return // No JSON/JSONB type -- must be running against old PostgreSQL
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a json,
|
|
||||||
b jsonb
|
|
||||||
)`)
|
|
||||||
|
|
||||||
inputRows := [][]interface{}{
|
|
||||||
{map[string]interface{}{"foo": "bar"}, map[string]interface{}{"bar": "quz"}},
|
|
||||||
{nil, nil},
|
|
||||||
}
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a", "b"}, pgx.CopyToRows(inputRows))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for CopyTo: %v", err)
|
|
||||||
}
|
|
||||||
if copyCount != len(inputRows) {
|
|
||||||
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(inputRows, outputRows) {
|
|
||||||
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToFailServerSideMidway(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a int4,
|
|
||||||
b varchar not null
|
|
||||||
)`)
|
|
||||||
|
|
||||||
inputRows := [][]interface{}{
|
|
||||||
{int32(1), "abc"},
|
|
||||||
{int32(2), nil}, // this row should trigger a failure
|
|
||||||
{int32(3), "def"},
|
|
||||||
}
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a", "b"}, pgx.CopyToRows(inputRows))
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Expected CopyTo return error, but it did not")
|
|
||||||
}
|
|
||||||
if _, ok := err.(pgx.PgError); !ok {
|
|
||||||
t.Errorf("Expected CopyTo return pgx.PgError, but instead it returned: %v", err)
|
|
||||||
}
|
|
||||||
if copyCount != 0 {
|
|
||||||
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputRows) != 0 {
|
|
||||||
t.Errorf("Expected 0 rows, but got %v", outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToFailServerSideMidwayAbortsWithoutWaiting(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a bytea not null
|
|
||||||
)`)
|
|
||||||
|
|
||||||
startTime := time.Now()
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a"}, &failSource{})
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Expected CopyTo return error, but it did not")
|
|
||||||
}
|
|
||||||
if _, ok := err.(pgx.PgError); !ok {
|
|
||||||
t.Errorf("Expected CopyTo return pgx.PgError, but instead it returned: %v", err)
|
|
||||||
}
|
|
||||||
if copyCount != 0 {
|
|
||||||
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
endTime := time.Now()
|
|
||||||
copyTime := endTime.Sub(startTime)
|
|
||||||
if copyTime > time.Second {
|
|
||||||
t.Errorf("Failing CopyTo shouldn't have taken so long: %v", copyTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputRows) != 0 {
|
|
||||||
t.Errorf("Expected 0 rows, but got %v", outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToCopyToSourceErrorMidway(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a bytea not null
|
|
||||||
)`)
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a"}, &clientFailSource{})
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Expected CopyTo return error, but it did not")
|
|
||||||
}
|
|
||||||
if copyCount != 0 {
|
|
||||||
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputRows) != 0 {
|
|
||||||
t.Errorf("Expected 0 rows, but got %v", outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnCopyToCopyToSourceErrorEnd(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
mustExec(t, conn, `create temporary table foo(
|
|
||||||
a bytea not null
|
|
||||||
)`)
|
|
||||||
|
|
||||||
copyCount, err := conn.CopyTo("foo", []string{"a"}, &clientFinalErrSource{})
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Expected CopyTo return error, but it did not")
|
|
||||||
}
|
|
||||||
if copyCount != 0 {
|
|
||||||
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := conn.Query("select * from foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for Query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var outputRows [][]interface{}
|
|
||||||
for rows.Next() {
|
|
||||||
row, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Values(): %v", err)
|
|
||||||
}
|
|
||||||
outputRows = append(outputRows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rows.Err() != nil {
|
|
||||||
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(outputRows) != 0 {
|
|
||||||
t.Errorf("Expected 0 rows, but got %v", outputRows)
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
@@ -185,13 +185,13 @@ func (tx *Tx) QueryRow(sql string, args ...interface{}) *Row {
|
|||||||
return (*Row)(rows)
|
return (*Row)(rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyTo delegates to the underlying *Conn
|
// CopyFrom delegates to the underlying *Conn
|
||||||
func (tx *Tx) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) {
|
func (tx *Tx) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int, error) {
|
||||||
if tx.status != TxStatusInProgress {
|
if tx.status != TxStatusInProgress {
|
||||||
return 0, ErrTxClosed
|
return 0, ErrTxClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.conn.CopyTo(tableName, columnNames, rowSrc)
|
return tx.conn.CopyFrom(tableName, columnNames, rowSrc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conn returns the *Conn this transaction is using.
|
// Conn returns the *Conn this transaction is using.
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ ReplicationConn.WaitForReplicationMessage now takes context.Context instead of t
|
|||||||
|
|
||||||
Reject scanning binary format values into a string (e.g. binary encoded timestamptz to string). See https://github.com/jackc/pgx/issues/219 and https://github.com/jackc/pgx/issues/228
|
Reject scanning binary format values into a string (e.g. binary encoded timestamptz to string). See https://github.com/jackc/pgx/issues/219 and https://github.com/jackc/pgx/issues/228
|
||||||
|
|
||||||
|
Remove CopyTo
|
||||||
|
|
||||||
## TODO / Possible / Investigate
|
## TODO / Possible / Investigate
|
||||||
|
|
||||||
Organize errors better
|
Organize errors better
|
||||||
|
|||||||
Reference in New Issue
Block a user