Reuse one MultiResultReader per connection
Using a PgConn while locked now panics. i.e. You must Close any ResultReader or MultiResultReader.
This commit is contained in:
@@ -99,8 +99,9 @@ type PgConn struct {
|
||||
bufferingReceiveErr error
|
||||
|
||||
// Reusable / preallocated resources
|
||||
wbuf []byte // write buffer
|
||||
resultReader ResultReader
|
||||
wbuf []byte // write buffer
|
||||
resultReader ResultReader
|
||||
multiResultReader MultiResultReader
|
||||
}
|
||||
|
||||
// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
|
||||
@@ -411,24 +412,18 @@ func (pgConn *PgConn) IsAlive() bool {
|
||||
return !pgConn.closed
|
||||
}
|
||||
|
||||
// lock locks the connection. It returns an error if the connection is already locked or is closed.
|
||||
func (pgConn *PgConn) lock() error {
|
||||
// lock locks the connection. It panics if the connection is already locked or is closed.
|
||||
func (pgConn *PgConn) lock() {
|
||||
if pgConn.locked {
|
||||
return errors.New("connection busy")
|
||||
}
|
||||
|
||||
if pgConn.closed {
|
||||
return errors.New("connection closed")
|
||||
panic("connection busy") // This only should be possible in case of an application bug.
|
||||
}
|
||||
|
||||
pgConn.locked = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pgConn *PgConn) unlock() {
|
||||
if !pgConn.locked {
|
||||
panic("BUG: cannot unlock unlocked connection")
|
||||
panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package.
|
||||
}
|
||||
|
||||
pgConn.locked = false
|
||||
@@ -505,9 +500,7 @@ type PreparedStatementDescription struct {
|
||||
|
||||
// Prepare creates a prepared statement.
|
||||
func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*PreparedStatementDescription, error) {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pgConn.lock()
|
||||
defer pgConn.unlock()
|
||||
|
||||
select {
|
||||
@@ -626,9 +619,7 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
|
||||
// WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not
|
||||
// received.
|
||||
func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return err
|
||||
}
|
||||
pgConn.lock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -659,17 +650,14 @@ func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
|
||||
//
|
||||
// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries.
|
||||
func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader {
|
||||
multiResult := &MultiResultReader{
|
||||
pgConn.lock()
|
||||
|
||||
pgConn.multiResultReader = MultiResultReader{
|
||||
pgConn: pgConn,
|
||||
ctx: ctx,
|
||||
cleanupContextDeadline: func() {},
|
||||
}
|
||||
|
||||
if err := pgConn.lock(); err != nil {
|
||||
multiResult.closed = true
|
||||
multiResult.err = err
|
||||
return multiResult
|
||||
}
|
||||
multiResult := &pgConn.multiResultReader
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -758,6 +746,8 @@ func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramVa
|
||||
}
|
||||
|
||||
func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader {
|
||||
pgConn.lock()
|
||||
|
||||
pgConn.resultReader = ResultReader{
|
||||
pgConn: pgConn,
|
||||
ctx: ctx,
|
||||
@@ -765,12 +755,6 @@ func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]by
|
||||
}
|
||||
result := &pgConn.resultReader
|
||||
|
||||
if err := pgConn.lock(); err != nil {
|
||||
result.concludeCommand("", err)
|
||||
result.closed = true
|
||||
return result
|
||||
}
|
||||
|
||||
if len(paramValues) > math.MaxUint16 {
|
||||
result.concludeCommand("", fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16))
|
||||
result.closed = true
|
||||
@@ -808,9 +792,7 @@ func (pgConn *PgConn) execExtendedSuffix(buf []byte, result *ResultReader) {
|
||||
|
||||
// CopyTo executes the copy command sql and copies the results to w.
|
||||
func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
pgConn.lock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -867,9 +849,7 @@ func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (Comm
|
||||
// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r
|
||||
// could still block.
|
||||
func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
pgConn.lock()
|
||||
defer pgConn.unlock()
|
||||
|
||||
select {
|
||||
@@ -1251,17 +1231,14 @@ func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFor
|
||||
// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a
|
||||
// transaction is already in progress or SQL contains transaction control statements.
|
||||
func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader {
|
||||
multiResult := &MultiResultReader{
|
||||
pgConn.lock()
|
||||
|
||||
pgConn.multiResultReader = MultiResultReader{
|
||||
pgConn: pgConn,
|
||||
ctx: ctx,
|
||||
cleanupContextDeadline: func() {},
|
||||
}
|
||||
|
||||
if err := pgConn.lock(); err != nil {
|
||||
multiResult.closed = true
|
||||
multiResult.err = ctx.Err()
|
||||
return multiResult
|
||||
}
|
||||
multiResult := &pgConn.multiResultReader
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
+2
-4
@@ -690,11 +690,9 @@ func TestConnLocking(t *testing.T) {
|
||||
defer closeConn(t, pgConn)
|
||||
|
||||
mrr := pgConn.Exec(context.Background(), "select 'Hello, world'")
|
||||
results, err := pgConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, "connection busy", err.Error())
|
||||
require.Panics(t, func() { pgConn.Exec(context.Background(), "select 'Hello, world'") })
|
||||
|
||||
results, err = mrr.ReadAll()
|
||||
results, err := mrr.ReadAll()
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, results, 1)
|
||||
assert.Nil(t, results[0].Err)
|
||||
|
||||
Reference in New Issue
Block a user