2
0

Add CopyFrom to Tx and ConnPool

fixes #248
This commit is contained in:
Jack Christensen
2017-03-17 14:32:35 -05:00
parent 97c01fb524
commit b5bb05877f
3 changed files with 35 additions and 13 deletions
+11 -11
View File
@@ -485,27 +485,27 @@ const benchmarkWriteTableInsertSQL = `insert into t(
$13::bool $13::bool
)` )`
type benchmarkWriteTableCopyToSrc struct { type benchmarkWriteTableCopyFromSrc struct {
count int count int
idx int idx int
row []interface{} row []interface{}
} }
func (s *benchmarkWriteTableCopyToSrc) Next() bool { func (s *benchmarkWriteTableCopyFromSrc) Next() bool {
s.idx++ s.idx++
return s.idx < s.count return s.idx < s.count
} }
func (s *benchmarkWriteTableCopyToSrc) Values() ([]interface{}, error) { func (s *benchmarkWriteTableCopyFromSrc) Values() ([]interface{}, error) {
return s.row, nil return s.row, nil
} }
func (s *benchmarkWriteTableCopyToSrc) Err() error { func (s *benchmarkWriteTableCopyFromSrc) Err() error {
return nil return nil
} }
func newBenchmarkWriteTableCopyToSrc(count int) pgx.CopyToSource { func newBenchmarkWriteTableCopyFromSrc(count int) pgx.CopyFromSource {
return &benchmarkWriteTableCopyToSrc{ return &benchmarkWriteTableCopyFromSrc{
count: count, count: count,
row: []interface{}{ row: []interface{}{
"varchar_1", "varchar_1",
@@ -538,7 +538,7 @@ func benchmarkWriteNRowsViaInsert(b *testing.B, n int) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
src := newBenchmarkWriteTableCopyToSrc(n) src := newBenchmarkWriteTableCopyFromSrc(n)
tx, err := conn.Begin() tx, err := conn.Begin()
if err != nil { if err != nil {
@@ -561,7 +561,7 @@ func benchmarkWriteNRowsViaInsert(b *testing.B, n int) {
// note this function is only used for benchmarks -- it doesn't escape tableName // note this function is only used for benchmarks -- it doesn't escape tableName
// or columnNames // or columnNames
func multiInsert(conn *pgx.Conn, tableName string, columnNames []string, rowSrc pgx.CopyToSource) (int, error) { func multiInsert(conn *pgx.Conn, tableName string, columnNames []string, rowSrc pgx.CopyFromSource) (int, error) {
maxRowsPerInsert := 65535 / len(columnNames) maxRowsPerInsert := 65535 / len(columnNames)
rowsThisInsert := 0 rowsThisInsert := 0
rowCount := 0 rowCount := 0
@@ -649,7 +649,7 @@ func benchmarkWriteNRowsViaMultiInsert(b *testing.B, n int) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
src := newBenchmarkWriteTableCopyToSrc(n) src := newBenchmarkWriteTableCopyFromSrc(n)
_, err := multiInsert(conn, "t", _, err := multiInsert(conn, "t",
[]string{"varchar_1", []string{"varchar_1",
@@ -681,9 +681,9 @@ func benchmarkWriteNRowsViaCopy(b *testing.B, n int) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
src := newBenchmarkWriteTableCopyToSrc(n) src := newBenchmarkWriteTableCopyFromSrc(n)
_, err := conn.CopyTo("t", _, err := conn.CopyFrom(pgx.Identifier{"t"},
[]string{"varchar_1", []string{"varchar_1",
"varchar_2", "varchar_2",
"varchar_null_1", "varchar_null_1",
+14 -1
View File
@@ -504,7 +504,8 @@ func (p *ConnPool) BeginIso(iso string) (*Tx, error) {
} }
} }
// CopyTo acquires a connection, delegates the call to that connection, and releases the connection // Deprecated. Use CopyFrom instead. CopyTo acquires a connection, delegates the
// call to that connection, and releases the connection.
func (p *ConnPool) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) { func (p *ConnPool) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) {
c, err := p.Acquire() c, err := p.Acquire()
if err != nil { if err != nil {
@@ -514,3 +515,15 @@ func (p *ConnPool) CopyTo(tableName string, columnNames []string, rowSrc CopyToS
return c.CopyTo(tableName, columnNames, rowSrc) return c.CopyTo(tableName, columnNames, rowSrc)
} }
// CopyFrom acquires a connection, delegates the call to that connection, and
// releases the connection.
func (p *ConnPool) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyToSource) (int, error) {
c, err := p.Acquire()
if err != nil {
return 0, err
}
defer p.Release(c)
return c.CopyFrom(tableName, columnNames, rowSrc)
}
+10 -1
View File
@@ -158,7 +158,7 @@ func (tx *Tx) QueryRow(sql string, args ...interface{}) *Row {
return (*Row)(rows) return (*Row)(rows)
} }
// CopyTo delegates to the underlying *Conn // Deprecated. Use CopyFrom instead. CopyTo delegates to the underlying *Conn
func (tx *Tx) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) { func (tx *Tx) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource) (int, error) {
if tx.status != TxStatusInProgress { if tx.status != TxStatusInProgress {
return 0, ErrTxClosed return 0, ErrTxClosed
@@ -167,6 +167,15 @@ func (tx *Tx) CopyTo(tableName string, columnNames []string, rowSrc CopyToSource
return tx.conn.CopyTo(tableName, columnNames, rowSrc) return tx.conn.CopyTo(tableName, columnNames, rowSrc)
} }
// CopyFrom delegates to the underlying *Conn
func (tx *Tx) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyToSource) (int, error) {
if tx.status != TxStatusInProgress {
return 0, ErrTxClosed
}
return tx.conn.CopyFrom(tableName, columnNames, rowSrc)
}
// Conn returns the *Conn this transaction is using. // Conn returns the *Conn this transaction is using.
func (tx *Tx) Conn() *Conn { func (tx *Tx) Conn() *Conn {
return tx.conn return tx.conn