2
0

Remove ensureConnectionReadyForQuery

This commit is contained in:
Jack Christensen
2019-02-02 13:00:31 -06:00
parent 577bc57ba5
commit bd181764bf
4 changed files with 3 additions and 62 deletions
-4
View File
@@ -90,10 +90,6 @@ func (b *Batch) Send(ctx context.Context) error {
return err return err
} }
if err := b.conn.ensureConnectionReadyForQuery(); err != nil {
return err
}
batch := &pgconn.Batch{} batch := &pgconn.Batch{}
for _, bi := range b.items { for _, bi := range b.items {
+3 -46
View File
@@ -78,9 +78,8 @@ type Conn struct {
status byte // One of connStatus* constants status byte // One of connStatus* constants
causeOfDeath error causeOfDeath error
pendingReadyForQueryCount int // number of ReadyForQuery messages expected cancelQueryCompleted chan struct{}
cancelQueryCompleted chan struct{} lastStmtSent bool
lastStmtSent bool
// context support // context support
ctxInProgress bool ctxInProgress bool
@@ -498,10 +497,6 @@ func (c *Conn) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExO
} }
} }
if err := c.ensureConnectionReadyForQuery(); err != nil {
return nil, err
}
if c.shouldLog(LogLevelError) { if c.shouldLog(LogLevelError) {
defer func() { defer func() {
if err != nil { if err != nil {
@@ -569,10 +564,6 @@ func (c *Conn) deallocateContext(ctx context.Context, name string) (err error) {
err = c.termContext(err) err = c.termContext(err)
}() }()
if err := c.ensureConnectionReadyForQuery(); err != nil {
return err
}
delete(c.preparedStatements, name) delete(c.preparedStatements, name)
_, err = c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll() _, err = c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll()
@@ -635,8 +626,6 @@ func (c *Conn) processContextFreeMsg(msg pgproto3.BackendMessage) (err error) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
return c.rxErrorResponse(msg) return c.rxErrorResponse(msg)
case *pgproto3.ReadyForQuery:
c.rxReadyForQuery(msg)
} }
return nil return nil
@@ -670,10 +659,6 @@ func (c *Conn) rxErrorResponse(msg *pgproto3.ErrorResponse) *pgconn.PgError {
return err return err
} }
func (c *Conn) rxReadyForQuery(msg *pgproto3.ReadyForQuery) {
c.pendingReadyForQueryCount--
}
func (c *Conn) rxRowDescription(msg *pgproto3.RowDescription) []FieldDescription { func (c *Conn) rxRowDescription(msg *pgproto3.RowDescription) []FieldDescription {
fields := make([]FieldDescription, len(msg.Fields)) fields := make([]FieldDescription, len(msg.Fields))
for i := 0; i < len(fields); i++ { for i := 0; i < len(fields); i++ {
@@ -872,7 +857,7 @@ func (c *Conn) WaitUntilReady(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
return c.ensureConnectionReadyForQuery() return nil
} }
func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error { func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error {
@@ -891,30 +876,6 @@ func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error {
} }
} }
func (c *Conn) ensureConnectionReadyForQuery() error {
for c.pendingReadyForQueryCount > 0 {
msg, err := c.pgConn.ReceiveMessage()
if err != nil {
return err
}
switch msg := msg.(type) {
case *pgproto3.ErrorResponse:
pgErr := c.rxErrorResponse(msg)
if pgErr.Severity == "FATAL" {
return pgErr
}
default:
err = c.processContextFreeMsg(msg)
if err != nil {
return err
}
}
}
return nil
}
func connInfoFromRows(rows *Rows, err error) (map[string]pgtype.OID, error) { func connInfoFromRows(rows *Rows, err error) (map[string]pgtype.OID, error) {
if err != nil { if err != nil {
return nil, err return nil, err
@@ -964,10 +925,6 @@ func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (
} }
defer c.unlock() defer c.unlock()
if err := c.ensureConnectionReadyForQuery(); err != nil {
return "", err
}
startTime := time.Now() startTime := time.Now()
commandTag, err := c.exec(ctx, sql, arguments...) commandTag, err := c.exec(ctx, sql, arguments...)
-7
View File
@@ -52,10 +52,6 @@ func fpInt64Arg(n int64) fpArg {
} }
func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) { func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) {
if err := f.cn.ensureConnectionReadyForQuery(); err != nil {
return nil, err
}
buf := f.cn.wbuf buf := f.cn.wbuf
buf = append(buf, 'F') // function call buf = append(buf, 'F') // function call
sp := len(buf) sp := len(buf)
@@ -76,8 +72,6 @@ func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) {
return nil, err return nil, err
} }
f.cn.pendingReadyForQueryCount++
for { for {
msg, err := f.cn.pgConn.ReceiveMessage() msg, err := f.cn.pgConn.ReceiveMessage()
if err != nil { if err != nil {
@@ -88,7 +82,6 @@ func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) {
res = make([]byte, len(msg.Result)) res = make([]byte, len(msg.Result))
copy(res, msg.Result) copy(res, msg.Result)
case *pgproto3.ReadyForQuery: case *pgproto3.ReadyForQuery:
f.cn.rxReadyForQuery(msg)
// done // done
return res, err return res, err
default: default:
-5
View File
@@ -361,11 +361,6 @@ func (c *Conn) QueryEx(ctx context.Context, sql string, options *QueryExOptions,
return rows, err return rows, err
} }
if err := c.ensureConnectionReadyForQuery(); err != nil {
rows.fatal(err)
return rows, err
}
if err := c.lock(); err != nil { if err := c.lock(); err != nil {
rows.fatal(err) rows.fatal(err)
return rows, err return rows, err