Replace BeginIso with BeginEx
Adds support for read/write mode and deferrable modes.
This commit is contained in:
+6
-6
@@ -383,7 +383,7 @@ func (p *ConnPool) QueryRow(sql string, args ...interface{}) *Row {
|
|||||||
// Begin acquires a connection and begins a transaction on it. When the
|
// Begin acquires a connection and begins a transaction on it. When the
|
||||||
// transaction is closed the connection will be automatically released.
|
// transaction is closed the connection will be automatically released.
|
||||||
func (p *ConnPool) Begin() (*Tx, error) {
|
func (p *ConnPool) Begin() (*Tx, error) {
|
||||||
return p.BeginIso("")
|
return p.BeginEx(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare creates a prepared statement on a connection in the pool to test the
|
// Prepare creates a prepared statement on a connection in the pool to test the
|
||||||
@@ -469,17 +469,17 @@ func (p *ConnPool) Deallocate(name string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginIso acquires a connection and begins a transaction in isolation mode iso
|
// BeginEx acquires a connection and starts a transaction with txOptions
|
||||||
// on it. When the transaction is closed the connection will be automatically
|
// determining the transaction mode. When the transaction is closed the
|
||||||
// released.
|
// connection will be automatically released.
|
||||||
func (p *ConnPool) BeginIso(iso string) (*Tx, error) {
|
func (p *ConnPool) BeginEx(txOptions *TxOptions) (*Tx, error) {
|
||||||
for {
|
for {
|
||||||
c, err := p.Acquire()
|
c, err := p.Acquire()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := c.BeginIso(iso)
|
tx, err := c.BeginEx(txOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
alive := c.IsAlive()
|
alive := c.IsAlive()
|
||||||
p.Release(c)
|
p.Release(c)
|
||||||
|
|||||||
+2
-2
@@ -560,9 +560,9 @@ func TestConnPoolTransactionIso(t *testing.T) {
|
|||||||
pool := createConnPool(t, 2)
|
pool := createConnPool(t, 2)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
tx, err := pool.BeginIso(pgx.Serializable)
|
tx, err := pool.BeginEx(&pgx.TxOptions{IsoLevel: pgx.Serializable})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("pool.Begin failed: %v", err)
|
t.Fatalf("pool.BeginEx failed: %v", err)
|
||||||
}
|
}
|
||||||
defer tx.Rollback()
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,35 @@
|
|||||||
package pgx
|
package pgx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TxIsoLevel string
|
||||||
|
|
||||||
// Transaction isolation levels
|
// Transaction isolation levels
|
||||||
const (
|
const (
|
||||||
Serializable = "serializable"
|
Serializable = TxIsoLevel("serializable")
|
||||||
RepeatableRead = "repeatable read"
|
RepeatableRead = TxIsoLevel("repeatable read")
|
||||||
ReadCommitted = "read committed"
|
ReadCommitted = TxIsoLevel("read committed")
|
||||||
ReadUncommitted = "read uncommitted"
|
ReadUncommitted = TxIsoLevel("read uncommitted")
|
||||||
|
)
|
||||||
|
|
||||||
|
type TxAccessMode string
|
||||||
|
|
||||||
|
// Transaction access modes
|
||||||
|
const (
|
||||||
|
ReadWrite = TxAccessMode("read write")
|
||||||
|
ReadOnly = TxAccessMode("read only")
|
||||||
|
)
|
||||||
|
|
||||||
|
type TxDeferrableMode string
|
||||||
|
|
||||||
|
// Transaction deferrable modes
|
||||||
|
const (
|
||||||
|
Deferrable = TxDeferrableMode("deferrable")
|
||||||
|
NotDeferrable = TxDeferrableMode("not deferrable")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -21,6 +40,12 @@ const (
|
|||||||
TxStatusRollbackSuccess = 2
|
TxStatusRollbackSuccess = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TxOptions struct {
|
||||||
|
IsoLevel TxIsoLevel
|
||||||
|
AccessMode TxAccessMode
|
||||||
|
DeferrableMode TxDeferrableMode
|
||||||
|
}
|
||||||
|
|
||||||
var ErrTxClosed = errors.New("tx is closed")
|
var ErrTxClosed = errors.New("tx is closed")
|
||||||
|
|
||||||
// ErrTxCommitRollback occurs when an error has occurred in a transaction and
|
// ErrTxCommitRollback occurs when an error has occurred in a transaction and
|
||||||
@@ -28,30 +53,32 @@ var ErrTxClosed = errors.New("tx is closed")
|
|||||||
// it is treated as ROLLBACK.
|
// it is treated as ROLLBACK.
|
||||||
var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback")
|
var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback")
|
||||||
|
|
||||||
// Begin starts a transaction with the default isolation level for the current
|
// Begin starts a transaction with the default transaction mode for the
|
||||||
// connection. To use a specific isolation level see BeginIso.
|
// current connection. To use a specific transaction mode see BeginEx.
|
||||||
func (c *Conn) Begin() (*Tx, error) {
|
func (c *Conn) Begin() (*Tx, error) {
|
||||||
return c.begin("")
|
return c.BeginEx(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginIso starts a transaction with isoLevel as the transaction isolation
|
// BeginEx starts a transaction with txOptions determining the transaction
|
||||||
// level.
|
// mode.
|
||||||
//
|
func (c *Conn) BeginEx(txOptions *TxOptions) (*Tx, error) {
|
||||||
// Valid isolation levels (and their constants) are:
|
|
||||||
// serializable (pgx.Serializable)
|
|
||||||
// repeatable read (pgx.RepeatableRead)
|
|
||||||
// read committed (pgx.ReadCommitted)
|
|
||||||
// read uncommitted (pgx.ReadUncommitted)
|
|
||||||
func (c *Conn) BeginIso(isoLevel string) (*Tx, error) {
|
|
||||||
return c.begin(isoLevel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) begin(isoLevel string) (*Tx, error) {
|
|
||||||
var beginSQL string
|
var beginSQL string
|
||||||
if isoLevel == "" {
|
if txOptions == nil {
|
||||||
beginSQL = "begin"
|
beginSQL = "begin"
|
||||||
} else {
|
} else {
|
||||||
beginSQL = fmt.Sprintf("begin isolation level %s", isoLevel)
|
buf := &bytes.Buffer{}
|
||||||
|
buf.WriteString("begin")
|
||||||
|
if txOptions.IsoLevel != "" {
|
||||||
|
fmt.Fprintf(buf, " isolation level %s", txOptions.IsoLevel)
|
||||||
|
}
|
||||||
|
if txOptions.AccessMode != "" {
|
||||||
|
fmt.Fprintf(buf, " %s", txOptions.AccessMode)
|
||||||
|
}
|
||||||
|
if txOptions.DeferrableMode != "" {
|
||||||
|
fmt.Fprintf(buf, " %s", txOptions.DeferrableMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
beginSQL = buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.Exec(beginSQL)
|
_, err := c.Exec(beginSQL)
|
||||||
|
|||||||
+27
-9
@@ -107,15 +107,15 @@ func TestTxCommitSerializationFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer pool.Exec(`drop table tx_serializable_sums`)
|
defer pool.Exec(`drop table tx_serializable_sums`)
|
||||||
|
|
||||||
tx1, err := pool.BeginIso(pgx.Serializable)
|
tx1, err := pool.BeginEx(&pgx.TxOptions{IsoLevel: pgx.Serializable})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("BeginIso failed: %v", err)
|
t.Fatalf("BeginEx failed: %v", err)
|
||||||
}
|
}
|
||||||
defer tx1.Rollback()
|
defer tx1.Rollback()
|
||||||
|
|
||||||
tx2, err := pool.BeginIso(pgx.Serializable)
|
tx2, err := pool.BeginEx(&pgx.TxOptions{IsoLevel: pgx.Serializable})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("BeginIso failed: %v", err)
|
t.Fatalf("BeginEx failed: %v", err)
|
||||||
}
|
}
|
||||||
defer tx2.Rollback()
|
defer tx2.Rollback()
|
||||||
|
|
||||||
@@ -182,20 +182,20 @@ func TestTransactionSuccessfulRollback(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBeginIso(t *testing.T) {
|
func TestBeginExIsoLevels(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
conn := mustConnect(t, *defaultConnConfig)
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
defer closeConn(t, conn)
|
defer closeConn(t, conn)
|
||||||
|
|
||||||
isoLevels := []string{pgx.Serializable, pgx.RepeatableRead, pgx.ReadCommitted, pgx.ReadUncommitted}
|
isoLevels := []pgx.TxIsoLevel{pgx.Serializable, pgx.RepeatableRead, pgx.ReadCommitted, pgx.ReadUncommitted}
|
||||||
for _, iso := range isoLevels {
|
for _, iso := range isoLevels {
|
||||||
tx, err := conn.BeginIso(iso)
|
tx, err := conn.BeginEx(&pgx.TxOptions{IsoLevel: iso})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("conn.BeginIso failed: %v", err)
|
t.Fatalf("conn.BeginEx failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var level string
|
var level pgx.TxIsoLevel
|
||||||
conn.QueryRow("select current_setting('transaction_isolation')").Scan(&level)
|
conn.QueryRow("select current_setting('transaction_isolation')").Scan(&level)
|
||||||
if level != iso {
|
if level != iso {
|
||||||
t.Errorf("Expected to be in isolation level %v but was %v", iso, level)
|
t.Errorf("Expected to be in isolation level %v but was %v", iso, level)
|
||||||
@@ -208,6 +208,24 @@ func TestBeginIso(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBeginExReadOnly(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
conn := mustConnect(t, *defaultConnConfig)
|
||||||
|
defer closeConn(t, conn)
|
||||||
|
|
||||||
|
tx, err := conn.BeginEx(&pgx.TxOptions{AccessMode: pgx.ReadOnly})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("conn.BeginEx failed: %v", err)
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
_, err = conn.Exec("create table foo(id serial primary key)")
|
||||||
|
if pgErr, ok := err.(pgx.PgError); !ok || pgErr.Code != "25006" {
|
||||||
|
t.Errorf("Expected error SQLSTATE 25006, but got %#v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTxAfterClose(t *testing.T) {
|
func TestTxAfterClose(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,10 @@ Rename Uuid to UUID in accordance with Go naming conventions.
|
|||||||
|
|
||||||
Logger interface reduced to single Log method.
|
Logger interface reduced to single Log method.
|
||||||
|
|
||||||
|
Replace BeginIso with BeginEx. BeginEx adds support for read/write mode and deferrable mode.
|
||||||
|
|
||||||
|
Transaction isolation level constants are now typed strings instead of bare strings.
|
||||||
|
|
||||||
## TODO / Possible / Investigate
|
## TODO / Possible / Investigate
|
||||||
|
|
||||||
Organize errors better
|
Organize errors better
|
||||||
|
|||||||
Reference in New Issue
Block a user