2
0

Finish core batch operations

This commit is contained in:
Jack Christensen
2017-06-03 11:49:27 -05:00
parent fe0af9b357
commit 73f496d7de
7 changed files with 572 additions and 75 deletions
+106 -74
View File
@@ -14,60 +14,31 @@ type batchItem struct {
resultFormatCodes []int16
}
// Batch queries are a way of bundling multiple queries together to avoid
// unnecessary network round trips.
type Batch struct {
conn *Conn
connPool *ConnPool
items []*batchItem
resultsRead int
sent bool
ctx context.Context
err error
}
// Begin starts a transaction with the default transaction mode for the
// current connection. To use a specific transaction mode see BeginEx.
// BeginBatch returns a *Batch query for c.
func (c *Conn) BeginBatch() *Batch {
// TODO - the type stuff below
// err = c.waitForPreviousCancelQuery(ctx)
// if err != nil {
// return nil, err
// }
// if err := c.ensureConnectionReadyForQuery(); err != nil {
// return nil, err
// }
// c.lastActivityTime = time.Now()
// rows = c.getRows(sql, args)
// if err := c.lock(); err != nil {
// rows.fatal(err)
// return rows, err
// }
// rows.unlockConn = true
// err = c.initContext(ctx)
// if err != nil {
// rows.fatal(err)
// return rows, rows.err
// }
// if options != nil && options.SimpleProtocol {
// err = c.sanitizeAndSendSimpleQuery(sql, args...)
// if err != nil {
// rows.fatal(err)
// return rows, err
// }
// return rows, nil
// }
return &Batch{conn: c}
}
// Conn returns the underlying connection that b will or was performed on.
func (b *Batch) Conn() *Conn {
return b.conn
}
// Queue queues a query to batch b. parameterOids are required if there are
// parameters and query is not the name of a prepared statement.
// resultFormatCodes are required if there is a result.
func (b *Batch) Queue(query string, arguments []interface{}, parameterOids []pgtype.Oid, resultFormatCodes []int16) {
b.items = append(b.items, &batchItem{
query: query,
@@ -77,15 +48,46 @@ func (b *Batch) Queue(query string, arguments []interface{}, parameterOids []pgt
})
}
// Send sends all queued queries to the server at once. All queries are wrapped
// in a transaction. The transaction can optionally be configured with
// txOptions. The context is in effect until the Batch is closed.
func (b *Batch) Send(ctx context.Context, txOptions *TxOptions) error {
if b.err != nil {
return b.err
}
b.ctx = ctx
err := b.conn.waitForPreviousCancelQuery(ctx)
if err != nil {
return err
}
if err := b.conn.ensureConnectionReadyForQuery(); err != nil {
return err
}
err = b.conn.initContext(ctx)
if err != nil {
return err
}
buf := appendQuery(b.conn.wbuf, txOptions.beginSQL())
for _, bi := range b.items {
// TODO - don't parse if named prepared statement
buf = appendParse(buf, "", bi.query, bi.parameterOids)
var psName string
var psParameterOids []pgtype.Oid
if ps, ok := b.conn.preparedStatements[bi.query]; ok {
psName = ps.Name
psParameterOids = ps.ParameterOids
} else {
psParameterOids = bi.parameterOids
buf = appendParse(buf, "", bi.query, psParameterOids)
}
var err error
buf, err = appendBind(buf, "", "", b.conn.ConnInfo, bi.parameterOids, bi.arguments, bi.resultFormatCodes)
buf, err = appendBind(buf, "", psName, b.conn.ConnInfo, psParameterOids, bi.arguments, bi.resultFormatCodes)
if err != nil {
return err
}
@@ -129,7 +131,20 @@ func (b *Batch) Send(ctx context.Context, txOptions *TxOptions) error {
return nil
}
// ExecResults reads the results from the next query in the batch as if the
// query has been sent with Exec.
func (b *Batch) ExecResults() (CommandTag, error) {
if b.err != nil {
return "", b.err
}
select {
case <-b.ctx.Done():
b.die(b.ctx.Err())
return "", b.ctx.Err()
default:
}
b.resultsRead++
for {
@@ -149,63 +164,80 @@ func (b *Batch) ExecResults() (CommandTag, error) {
}
}
// QueryResults reads the results from the next query in the batch as if the
// query has been sent with Query.
func (b *Batch) QueryResults() (*Rows, error) {
if b.err != nil {
return nil, b.err
}
select {
case <-b.ctx.Done():
b.die(b.ctx.Err())
return nil, b.ctx.Err()
default:
}
b.resultsRead++
rows := b.conn.getRows("batch query", nil)
fieldDescriptions, err := b.conn.readUntilRowDescription()
if err != nil {
rows.fatal(err)
b.die(b.ctx.Err())
return nil, err
}
rows.batch = b
rows.fields = fieldDescriptions
return rows, nil
}
// QueryRowResults reads the results from the next query in the batch as if the
// query has been sent with QueryRow.
func (b *Batch) QueryRowResults() *Row {
rows, _ := b.QueryResults()
return (*Row)(rows)
}
func (b *Batch) Finish() error {
// Close closes the batch operation. Any error that occured during a batch
// operation may have made it impossible to resyncronize the connection with the
// server. In this case the underlying connection will have been closed.
func (b *Batch) Close() (err error) {
if b.err != nil {
return b.err
}
defer func() {
err = b.conn.termContext(err)
if b.conn != nil && b.connPool != nil {
b.connPool.Release(b.conn)
}
}()
for i := b.resultsRead; i < len(b.items); i++ {
_, err := b.ExecResults()
if err != nil {
if _, err = b.ExecResults(); err != nil {
return err
}
}
// readyForQueryCount := 0
// for {
// msg, err := b.conn.rxMsg()
// if err != nil {
// return "", err
// }
// switch msg := msg.(type) {
// case *pgproto3.ReadyForQuery:
// c.rxReadyForQuery(msg)
// default:
// if err := b.conn.processContextFreeMsg(msg); err != nil {
// return "", err
// }
// }
// }
// switch msg := msg.(type) {
// case *pgproto3.ErrorResponse:
// return c.rxErrorResponse(msg)
// case *pgproto3.NotificationResponse:
// c.rxNotificationResponse(msg)
// case *pgproto3.ReadyForQuery:
// c.rxReadyForQuery(msg)
// case *pgproto3.ParameterStatus:
// c.rxParameterStatus(msg)
// }
if err = b.conn.ensureConnectionReadyForQuery(); err != nil {
return err
}
return nil
}
func (b *Batch) die(err error) {
if b.err != nil {
return
}
b.err = err
b.conn.die(err)
if b.conn != nil && b.connPool != nil {
b.connPool.Release(b.conn)
}
}