From b6e5b74e2c82dc3305355453ec86dc002bf577b4 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 18 Apr 2019 22:50:36 -0500 Subject: [PATCH] Reuse one MultiResultReader per connection Using a PgConn while locked now panics. i.e. You must Close any ResultReader or MultiResultReader. --- pgconn.go | 65 ++++++++++++++++---------------------------------- pgconn_test.go | 6 ++--- 2 files changed, 23 insertions(+), 48 deletions(-) diff --git a/pgconn.go b/pgconn.go index 4cf4d745..7e8909ea 100644 --- a/pgconn.go +++ b/pgconn.go @@ -99,8 +99,9 @@ type PgConn struct { bufferingReceiveErr error // Reusable / preallocated resources - wbuf []byte // write buffer - resultReader ResultReader + wbuf []byte // write buffer + resultReader ResultReader + multiResultReader MultiResultReader } // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) @@ -411,24 +412,18 @@ func (pgConn *PgConn) IsAlive() bool { return !pgConn.closed } -// lock locks the connection. It returns an error if the connection is already locked or is closed. -func (pgConn *PgConn) lock() error { +// lock locks the connection. It panics if the connection is already locked or is closed. +func (pgConn *PgConn) lock() { if pgConn.locked { - return errors.New("connection busy") - } - - if pgConn.closed { - return errors.New("connection closed") + panic("connection busy") // This only should be possible in case of an application bug. } pgConn.locked = true - - return nil } func (pgConn *PgConn) unlock() { if !pgConn.locked { - panic("BUG: cannot unlock unlocked connection") + panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package. } pgConn.locked = false @@ -505,9 +500,7 @@ type PreparedStatementDescription struct { // Prepare creates a prepared statement. func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*PreparedStatementDescription, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } + pgConn.lock() defer pgConn.unlock() select { @@ -626,9 +619,7 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error { // WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not // received. func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { - if err := pgConn.lock(); err != nil { - return err - } + pgConn.lock() select { case <-ctx.Done(): @@ -659,17 +650,14 @@ func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { // // Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { - multiResult := &MultiResultReader{ + pgConn.lock() + + pgConn.multiResultReader = MultiResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, } - - if err := pgConn.lock(); err != nil { - multiResult.closed = true - multiResult.err = err - return multiResult - } + multiResult := &pgConn.multiResultReader select { case <-ctx.Done(): @@ -758,6 +746,8 @@ func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramVa } func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader { + pgConn.lock() + pgConn.resultReader = ResultReader{ pgConn: pgConn, ctx: ctx, @@ -765,12 +755,6 @@ func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]by } result := &pgConn.resultReader - if err := pgConn.lock(); err != nil { - result.concludeCommand("", err) - result.closed = true - return result - } - if len(paramValues) > math.MaxUint16 { result.concludeCommand("", fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16)) result.closed = true @@ -808,9 +792,7 @@ func (pgConn *PgConn) execExtendedSuffix(buf []byte, result *ResultReader) { // CopyTo executes the copy command sql and copies the results to w. func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return "", err - } + pgConn.lock() select { case <-ctx.Done(): @@ -867,9 +849,7 @@ func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (Comm // Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r // could still block. func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return "", err - } + pgConn.lock() defer pgConn.unlock() select { @@ -1251,17 +1231,14 @@ func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFor // ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a // transaction is already in progress or SQL contains transaction control statements. func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { - multiResult := &MultiResultReader{ + pgConn.lock() + + pgConn.multiResultReader = MultiResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, } - - if err := pgConn.lock(); err != nil { - multiResult.closed = true - multiResult.err = ctx.Err() - return multiResult - } + multiResult := &pgConn.multiResultReader select { case <-ctx.Done(): diff --git a/pgconn_test.go b/pgconn_test.go index fd57face..3be61be8 100644 --- a/pgconn_test.go +++ b/pgconn_test.go @@ -690,11 +690,9 @@ func TestConnLocking(t *testing.T) { defer closeConn(t, pgConn) mrr := pgConn.Exec(context.Background(), "select 'Hello, world'") - results, err := pgConn.Exec(context.Background(), "select 'Hello, world'").ReadAll() - assert.Error(t, err) - assert.Equal(t, "connection busy", err.Error()) + require.Panics(t, func() { pgConn.Exec(context.Background(), "select 'Hello, world'") }) - results, err = mrr.ReadAll() + results, err := mrr.ReadAll() assert.NoError(t, err) assert.Len(t, results, 1) assert.Nil(t, results[0].Err)