2
0
Files
pgx/copy_to_test.go
T
Jack Christensen 743b98b298 Name PG types as words
Though this doesn't follow Go naming conventions exactly it makes names more
consistent with PostgreSQL and it is easier to read. For example, TIDOID becomes
TidOid. In addition this is one less breaking change in the move to V3.
2017-03-11 17:03:23 -06:00

429 lines
9.7 KiB
Go

package pgx_test
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/jackc/pgx"
)
func TestConnCopyToSmall(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz
)`)
inputRows := [][]interface{}{
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)},
{nil, nil, nil, nil, nil, nil, nil},
}
copyCount, err := conn.CopyTo("foo", []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyToRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyTo: %v", err)
}
if copyCount != len(inputRows) {
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
}
func TestConnCopyToLarge(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz,
h bytea
)`)
inputRows := [][]interface{}{}
for i := 0; i < 10000; i++ {
inputRows = append(inputRows, []interface{}{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local), []byte{111, 111, 111, 111}})
}
copyCount, err := conn.CopyTo("foo", []string{"a", "b", "c", "d", "e", "f", "g", "h"}, pgx.CopyToRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyTo: %v", err)
}
if copyCount != len(inputRows) {
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal")
}
ensureConnValid(t, conn)
}
func TestConnCopyToJSON(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
for _, oid := range []pgx.Oid{pgx.JsonOid, pgx.JsonbOid} {
if _, ok := conn.PgTypes[oid]; !ok {
return // No JSON/JSONB type -- must be running against old PostgreSQL
}
}
mustExec(t, conn, `create temporary table foo(
a json,
b jsonb
)`)
inputRows := [][]interface{}{
{map[string]interface{}{"foo": "bar"}, map[string]interface{}{"bar": "quz"}},
{nil, nil},
}
copyCount, err := conn.CopyTo("foo", []string{"a", "b"}, pgx.CopyToRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyTo: %v", err)
}
if copyCount != len(inputRows) {
t.Errorf("Expected CopyTo to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
}
func TestConnCopyToFailServerSideMidway(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int4,
b varchar not null
)`)
inputRows := [][]interface{}{
{int32(1), "abc"},
{int32(2), nil}, // this row should trigger a failure
{int32(3), "def"},
}
copyCount, err := conn.CopyTo("foo", []string{"a", "b"}, pgx.CopyToRows(inputRows))
if err == nil {
t.Errorf("Expected CopyTo return error, but it did not")
}
if _, ok := err.(pgx.PgError); !ok {
t.Errorf("Expected CopyTo return pgx.PgError, but instead it returned: %v", err)
}
if copyCount != 0 {
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}
type failSource struct {
count int
}
func (fs *failSource) Next() bool {
time.Sleep(time.Millisecond * 100)
fs.count++
return fs.count < 100
}
func (fs *failSource) Values() ([]interface{}, error) {
if fs.count == 3 {
return []interface{}{nil}, nil
}
return []interface{}{make([]byte, 100000)}, nil
}
func (fs *failSource) Err() error {
return nil
}
func TestConnCopyToFailServerSideMidwayAbortsWithoutWaiting(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
startTime := time.Now()
copyCount, err := conn.CopyTo("foo", []string{"a"}, &failSource{})
if err == nil {
t.Errorf("Expected CopyTo return error, but it did not")
}
if _, ok := err.(pgx.PgError); !ok {
t.Errorf("Expected CopyTo return pgx.PgError, but instead it returned: %v", err)
}
if copyCount != 0 {
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
}
endTime := time.Now()
copyTime := endTime.Sub(startTime)
if copyTime > time.Second {
t.Errorf("Failing CopyTo shouldn't have taken so long: %v", copyTime)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}
type clientFailSource struct {
count int
err error
}
func (cfs *clientFailSource) Next() bool {
cfs.count++
return cfs.count < 100
}
func (cfs *clientFailSource) Values() ([]interface{}, error) {
if cfs.count == 3 {
cfs.err = fmt.Errorf("client error")
return nil, cfs.err
}
return []interface{}{make([]byte, 100000)}, nil
}
func (cfs *clientFailSource) Err() error {
return cfs.err
}
func TestConnCopyToCopyToSourceErrorMidway(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
copyCount, err := conn.CopyTo("foo", []string{"a"}, &clientFailSource{})
if err == nil {
t.Errorf("Expected CopyTo return error, but it did not")
}
if copyCount != 0 {
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}
type clientFinalErrSource struct {
count int
}
func (cfs *clientFinalErrSource) Next() bool {
cfs.count++
return cfs.count < 5
}
func (cfs *clientFinalErrSource) Values() ([]interface{}, error) {
return []interface{}{make([]byte, 100000)}, nil
}
func (cfs *clientFinalErrSource) Err() error {
return fmt.Errorf("final error")
}
func TestConnCopyToCopyToSourceErrorEnd(t *testing.T) {
t.Parallel()
conn := mustConnect(t, *defaultConnConfig)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
copyCount, err := conn.CopyTo("foo", []string{"a"}, &clientFinalErrSource{})
if err == nil {
t.Errorf("Expected CopyTo return error, but it did not")
}
if copyCount != 0 {
t.Errorf("Expected CopyTo to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query("select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]interface{}
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}