Batch uses statement cache.
This streamlines Queue's interface as well.
This commit is contained in:
@@ -715,57 +715,74 @@ func (c *Conn) QueryRow(ctx context.Context, sql string, args ...interface{}) Ro
|
||||
// SendBatch sends all queued queries to the server at once. All queries are run in an implicit transaction unless
|
||||
// explicit transaction control statements are executed.
|
||||
func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults {
|
||||
distinctUnpreparedQueries := map[string]struct{}{}
|
||||
|
||||
for _, bi := range b.items {
|
||||
if _, ok := c.preparedStatements[bi.query]; ok {
|
||||
continue
|
||||
}
|
||||
distinctUnpreparedQueries[bi.query] = struct{}{}
|
||||
}
|
||||
|
||||
var stmtCache stmtcache.Cache
|
||||
if c.stmtcache != nil && c.stmtcache.Cap() >= len(distinctUnpreparedQueries) {
|
||||
stmtCache = c.stmtcache
|
||||
} else {
|
||||
stmtCache = stmtcache.New(c.pgConn, stmtcache.ModeDescribe, len(distinctUnpreparedQueries))
|
||||
}
|
||||
|
||||
for sql, _ := range distinctUnpreparedQueries {
|
||||
_, err := stmtCache.Get(ctx, sql)
|
||||
if err != nil {
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
}
|
||||
|
||||
batch := &pgconn.Batch{}
|
||||
|
||||
for _, bi := range b.items {
|
||||
c.eqb.Reset()
|
||||
|
||||
var parameterOIDs []uint32
|
||||
sd := c.preparedStatements[bi.query]
|
||||
if sd == nil {
|
||||
var err error
|
||||
sd, err = stmtCache.Get(ctx, bi.query)
|
||||
if err != nil {
|
||||
// the stmtCache was prefilled from distinctUnpreparedQueries above so we are guaranteed no errors
|
||||
panic("BUG: unexpected error from stmtCache")
|
||||
}
|
||||
}
|
||||
|
||||
if sd != nil {
|
||||
parameterOIDs = sd.ParamOIDs
|
||||
} else {
|
||||
parameterOIDs = bi.parameterOIDs
|
||||
if len(sd.ParamOIDs) != len(bi.arguments) {
|
||||
return &batchResults{ctx: ctx, conn: c, err: errors.Errorf("mismatched param and argument count")}
|
||||
}
|
||||
|
||||
args, err := convertDriverValuers(bi.arguments)
|
||||
if err != nil {
|
||||
return &batchResults{err: err}
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
|
||||
for i := range args {
|
||||
err = c.eqb.AppendParam(c.ConnInfo, parameterOIDs[i], args[i])
|
||||
err = c.eqb.AppendParam(c.ConnInfo, sd.ParamOIDs[i], args[i])
|
||||
if err != nil {
|
||||
return &batchResults{err: err}
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if sd != nil {
|
||||
resultFormats := bi.resultFormatCodes
|
||||
if resultFormats == nil {
|
||||
|
||||
for i := range sd.Fields {
|
||||
if dt, ok := c.ConnInfo.DataTypeForOID(uint32(sd.Fields[i].DataTypeOID)); ok {
|
||||
if _, ok := dt.Value.(pgtype.BinaryDecoder); ok {
|
||||
c.eqb.AppendResultFormat(BinaryFormatCode)
|
||||
} else {
|
||||
c.eqb.AppendResultFormat(TextFormatCode)
|
||||
}
|
||||
}
|
||||
for i := range sd.Fields {
|
||||
if dt, ok := c.ConnInfo.DataTypeForOID(uint32(sd.Fields[i].DataTypeOID)); ok {
|
||||
if _, ok := dt.Value.(pgtype.BinaryDecoder); ok {
|
||||
c.eqb.AppendResultFormat(BinaryFormatCode)
|
||||
} else {
|
||||
c.eqb.AppendResultFormat(TextFormatCode)
|
||||
}
|
||||
|
||||
resultFormats = c.eqb.resultFormats
|
||||
}
|
||||
}
|
||||
|
||||
batch.ExecPrepared(sd.Name, c.eqb.paramValues, c.eqb.paramFormats, resultFormats)
|
||||
if sd.Name == "" {
|
||||
batch.ExecParams(bi.query, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
} else {
|
||||
oids := make([]uint32, len(parameterOIDs))
|
||||
for i := 0; i < len(parameterOIDs); i++ {
|
||||
oids[i] = uint32(parameterOIDs[i])
|
||||
}
|
||||
batch.ExecParams(bi.query, c.eqb.paramValues, oids, c.eqb.paramFormats, bi.resultFormatCodes)
|
||||
batch.ExecPrepared(sd.Name, c.eqb.paramValues, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user