Remove AfterClose() and Conn() from Tx and Rows
This commit is contained in:
+3
-13
@@ -31,8 +31,6 @@ type ConnPool struct {
|
|||||||
preparedStatements map[string]*PreparedStatement
|
preparedStatements map[string]*PreparedStatement
|
||||||
acquireTimeout time.Duration
|
acquireTimeout time.Duration
|
||||||
connInfo *pgtype.ConnInfo
|
connInfo *pgtype.ConnInfo
|
||||||
txAfterClose func(tx *Tx)
|
|
||||||
rowsAfterClose func(rows *Rows)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnPoolStat struct {
|
type ConnPoolStat struct {
|
||||||
@@ -75,14 +73,6 @@ func NewConnPool(config ConnPoolConfig) (p *ConnPool, err error) {
|
|||||||
p.logLevel = LogLevelNone
|
p.logLevel = LogLevelNone
|
||||||
}
|
}
|
||||||
|
|
||||||
p.txAfterClose = func(tx *Tx) {
|
|
||||||
p.Release(tx.Conn())
|
|
||||||
}
|
|
||||||
|
|
||||||
p.rowsAfterClose = func(rows *Rows) {
|
|
||||||
p.Release(rows.Conn())
|
|
||||||
}
|
|
||||||
|
|
||||||
p.allConnections = make([]*Conn, 0, p.maxConnections)
|
p.allConnections = make([]*Conn, 0, p.maxConnections)
|
||||||
p.availableConnections = make([]*Conn, 0, p.maxConnections)
|
p.availableConnections = make([]*Conn, 0, p.maxConnections)
|
||||||
p.preparedStatements = make(map[string]*PreparedStatement)
|
p.preparedStatements = make(map[string]*PreparedStatement)
|
||||||
@@ -381,7 +371,7 @@ func (p *ConnPool) Query(sql string, args ...interface{}) (*Rows, error) {
|
|||||||
return rows, err
|
return rows, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rows.AfterClose(p.rowsAfterClose)
|
rows.connPool = p
|
||||||
|
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
@@ -399,7 +389,7 @@ func (p *ConnPool) QueryEx(ctx context.Context, sql string, options *QueryExOpti
|
|||||||
return rows, err
|
return rows, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rows.AfterClose(p.rowsAfterClose)
|
rows.connPool = p
|
||||||
|
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
@@ -531,7 +521,7 @@ func (p *ConnPool) BeginEx(txOptions *TxOptions) (*Tx, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.AfterClose(p.txAfterClose)
|
tx.connPool = p
|
||||||
return tx, nil
|
return tx, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ func (r *Row) Scan(dest ...interface{}) (err error) {
|
|||||||
// calling Next() until it returns false, or when a fatal error occurs.
|
// calling Next() until it returns false, or when a fatal error occurs.
|
||||||
type Rows struct {
|
type Rows struct {
|
||||||
conn *Conn
|
conn *Conn
|
||||||
|
connPool *ConnPool
|
||||||
values [][]byte
|
values [][]byte
|
||||||
fields []FieldDescription
|
fields []FieldDescription
|
||||||
rowCount int
|
rowCount int
|
||||||
@@ -50,7 +51,6 @@ type Rows struct {
|
|||||||
startTime time.Time
|
startTime time.Time
|
||||||
sql string
|
sql string
|
||||||
args []interface{}
|
args []interface{}
|
||||||
afterClose func(*Rows)
|
|
||||||
unlockConn bool
|
unlockConn bool
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
@@ -84,8 +84,8 @@ func (rows *Rows) Close() {
|
|||||||
rows.conn.log(LogLevelError, "Query", map[string]interface{}{"sql": rows.sql, "args": logQueryArgs(rows.args)})
|
rows.conn.log(LogLevelError, "Query", map[string]interface{}{"sql": rows.sql, "args": logQueryArgs(rows.args)})
|
||||||
}
|
}
|
||||||
|
|
||||||
if rows.afterClose != nil {
|
if rows.connPool != nil {
|
||||||
rows.afterClose(rows)
|
rows.connPool.Release(rows.conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,11 +156,6 @@ func (rows *Rows) Next() bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conn returns the *Conn this *Rows is using.
|
|
||||||
func (rows *Rows) Conn() *Conn {
|
|
||||||
return rows.conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rows *Rows) nextColumn() ([]byte, *FieldDescription, bool) {
|
func (rows *Rows) nextColumn() ([]byte, *FieldDescription, bool) {
|
||||||
if rows.closed {
|
if rows.closed {
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
@@ -321,20 +316,6 @@ func (rows *Rows) Values() ([]interface{}, error) {
|
|||||||
return values, rows.Err()
|
return values, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AfterClose adds f to a LILO queue of functions that will be called when
|
|
||||||
// rows is closed.
|
|
||||||
func (rows *Rows) AfterClose(f func(*Rows)) {
|
|
||||||
if rows.afterClose == nil {
|
|
||||||
rows.afterClose = f
|
|
||||||
} else {
|
|
||||||
prevFn := rows.afterClose
|
|
||||||
rows.afterClose = func(rows *Rows) {
|
|
||||||
f(rows)
|
|
||||||
prevFn(rows)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query executes sql with args. If there is an error the returned *Rows will
|
// Query executes sql with args. If there is an error the returned *Rows will
|
||||||
// be returned in an error state. So it is allowed to ignore the error returned
|
// be returned in an error state. So it is allowed to ignore the error returned
|
||||||
// from Query and handle it in *Rows.
|
// from Query and handle it in *Rows.
|
||||||
|
|||||||
@@ -94,10 +94,10 @@ func (c *Conn) BeginEx(txOptions *TxOptions) (*Tx, error) {
|
|||||||
// All Tx methods return ErrTxClosed if Commit or Rollback has already been
|
// All Tx methods return ErrTxClosed if Commit or Rollback has already been
|
||||||
// called on the Tx.
|
// called on the Tx.
|
||||||
type Tx struct {
|
type Tx struct {
|
||||||
conn *Conn
|
conn *Conn
|
||||||
afterClose func(*Tx)
|
connPool *ConnPool
|
||||||
err error
|
err error
|
||||||
status int8
|
status int8
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit commits the transaction
|
// Commit commits the transaction
|
||||||
@@ -117,9 +117,10 @@ func (tx *Tx) Commit() error {
|
|||||||
tx.err = err
|
tx.err = err
|
||||||
}
|
}
|
||||||
|
|
||||||
if tx.afterClose != nil {
|
if tx.connPool != nil {
|
||||||
tx.afterClose(tx)
|
tx.connPool.Release(tx.conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.err
|
return tx.err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,9 +140,10 @@ func (tx *Tx) Rollback() error {
|
|||||||
tx.status = TxStatusRollbackFailure
|
tx.status = TxStatusRollbackFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
if tx.afterClose != nil {
|
if tx.connPool != nil {
|
||||||
tx.afterClose(tx)
|
tx.connPool.Release(tx.conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.err
|
return tx.err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,11 +196,6 @@ func (tx *Tx) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFr
|
|||||||
return tx.conn.CopyFrom(tableName, columnNames, rowSrc)
|
return tx.conn.CopyFrom(tableName, columnNames, rowSrc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conn returns the *Conn this transaction is using.
|
|
||||||
func (tx *Tx) Conn() *Conn {
|
|
||||||
return tx.conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Status returns the status of the transaction from the set of
|
// Status returns the status of the transaction from the set of
|
||||||
// pgx.TxStatus* constants.
|
// pgx.TxStatus* constants.
|
||||||
func (tx *Tx) Status() int8 {
|
func (tx *Tx) Status() int8 {
|
||||||
@@ -209,17 +206,3 @@ func (tx *Tx) Status() int8 {
|
|||||||
func (tx *Tx) Err() error {
|
func (tx *Tx) Err() error {
|
||||||
return tx.err
|
return tx.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// AfterClose adds f to a LILO queue of functions that will be called when
|
|
||||||
// the transaction is closed (either Commit or Rollback).
|
|
||||||
func (tx *Tx) AfterClose(f func(*Tx)) {
|
|
||||||
if tx.afterClose == nil {
|
|
||||||
tx.afterClose = f
|
|
||||||
} else {
|
|
||||||
prevFn := tx.afterClose
|
|
||||||
tx.afterClose = func(tx *Tx) {
|
|
||||||
f(tx)
|
|
||||||
prevFn(tx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
+2
-37
@@ -1,9 +1,9 @@
|
|||||||
package pgx_test
|
package pgx_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/jackc/pgx"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
"github.com/jackc/pgx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTransactionSuccessfulCommit(t *testing.T) {
|
func TestTransactionSuccessfulCommit(t *testing.T) {
|
||||||
@@ -226,41 +226,6 @@ func TestBeginExReadOnly(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTxAfterClose(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
tx, err := conn.Begin()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var zeroTime, t1, t2 time.Time
|
|
||||||
tx.AfterClose(func(tx *pgx.Tx) {
|
|
||||||
t1 = time.Now()
|
|
||||||
})
|
|
||||||
|
|
||||||
tx.AfterClose(func(tx *pgx.Tx) {
|
|
||||||
t2 = time.Now()
|
|
||||||
})
|
|
||||||
|
|
||||||
tx.Rollback()
|
|
||||||
|
|
||||||
if t1 == zeroTime {
|
|
||||||
t.Error("First Tx.AfterClose callback not called")
|
|
||||||
}
|
|
||||||
|
|
||||||
if t2 == zeroTime {
|
|
||||||
t.Error("Second Tx.AfterClose callback not called")
|
|
||||||
}
|
|
||||||
|
|
||||||
if t1.Before(t2) {
|
|
||||||
t.Errorf("AfterClose callbacks called out of order: %v, %v", t1, t2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTxStatus(t *testing.T) {
|
func TestTxStatus(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,14 @@ ConnPool.Close no longer waits for all acquired connections to be released. Inst
|
|||||||
|
|
||||||
Removed Rows.Fatal(error)
|
Removed Rows.Fatal(error)
|
||||||
|
|
||||||
|
Removed Rows.AfterClose()
|
||||||
|
|
||||||
|
Removed Rows.Conn()
|
||||||
|
|
||||||
|
Removed Tx.AfterClose()
|
||||||
|
|
||||||
|
Removed Tx.Conn()
|
||||||
|
|
||||||
## TODO / Possible / Investigate
|
## TODO / Possible / Investigate
|
||||||
|
|
||||||
Organize errors better
|
Organize errors better
|
||||||
|
|||||||
Reference in New Issue
Block a user