Expose pgx functionality for manual integration with pgconn
This is primarily useful for using pipeline mode.
This commit is contained in:
@@ -75,7 +75,7 @@ type Conn struct {
|
||||
typeMap *pgtype.Map
|
||||
|
||||
wbuf []byte
|
||||
eqb extendedQueryBuilder
|
||||
eqb ExtendedQueryBuilder
|
||||
}
|
||||
|
||||
// Identifier a PostgreSQL identifier or name. Identifiers can be composed of
|
||||
@@ -485,49 +485,25 @@ func (c *Conn) execSimpleProtocol(ctx context.Context, sql string, arguments []a
|
||||
return commandTag, err
|
||||
}
|
||||
|
||||
func (c *Conn) execParamsAndPreparedPrefix(sd *pgconn.StatementDescription, args []any) error {
|
||||
if len(sd.ParamOIDs) != len(args) {
|
||||
return fmt.Errorf("expected %d arguments, got %d", len(sd.ParamOIDs), len(args))
|
||||
}
|
||||
|
||||
c.eqb.Reset()
|
||||
|
||||
anynil.NormalizeSlice(args)
|
||||
|
||||
for i := range args {
|
||||
err := c.eqb.AppendParam(c.typeMap, sd.ParamOIDs[i], args[i])
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to encode args[%d]: %v", i, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for i := range sd.Fields {
|
||||
c.eqb.AppendResultFormat(c.TypeMap().FormatCodeForOID(sd.Fields[i].DataTypeOID))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) execParams(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) {
|
||||
err := c.execParamsAndPreparedPrefix(sd, arguments)
|
||||
err := c.eqb.Build(c.typeMap, sd, arguments)
|
||||
if err != nil {
|
||||
return pgconn.CommandTag{}, err
|
||||
}
|
||||
|
||||
result := c.pgConn.ExecParams(ctx, sd.SQL, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, c.eqb.resultFormats).Read()
|
||||
c.eqb.Reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
result := c.pgConn.ExecParams(ctx, sd.SQL, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
|
||||
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
return result.CommandTag, result.Err
|
||||
}
|
||||
|
||||
func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription, arguments []any) (pgconn.CommandTag, error) {
|
||||
err := c.execParamsAndPreparedPrefix(sd, arguments)
|
||||
err := c.eqb.Build(c.typeMap, sd, arguments)
|
||||
if err != nil {
|
||||
return pgconn.CommandTag{}, err
|
||||
}
|
||||
|
||||
result := c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.paramValues, c.eqb.paramFormats, c.eqb.resultFormats).Read()
|
||||
c.eqb.Reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
result := c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
|
||||
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
return result.CommandTag, result.Err
|
||||
}
|
||||
|
||||
@@ -540,79 +516,18 @@ func (e *unknownArgumentTypeQueryExecModeExecError) Error() string {
|
||||
}
|
||||
|
||||
func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) {
|
||||
c.eqb.Reset()
|
||||
|
||||
anynil.NormalizeSlice(args)
|
||||
err := c.appendParamsForQueryExecModeExec(args)
|
||||
err := c.eqb.Build(c.typeMap, nil, args)
|
||||
if err != nil {
|
||||
return pgconn.CommandTag{}, err
|
||||
}
|
||||
|
||||
result := c.pgConn.ExecParams(ctx, sql, c.eqb.paramValues, nil, c.eqb.paramFormats, c.eqb.resultFormats).Read()
|
||||
c.eqb.Reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
result := c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
|
||||
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
return result.CommandTag, result.Err
|
||||
}
|
||||
|
||||
// appendParamsForQueryExecModeExec appends the args to c.eqb.
|
||||
//
|
||||
// Parameters must be encoded in the text format because of differences in type conversion between timestamps and
|
||||
// dates. In QueryExecModeExec we don't know what the actual PostgreSQL type is. To determine the type we use the
|
||||
// Go type to OID type mapping registered by RegisterDefaultPgType. However, the Go time.Time represents both
|
||||
// PostgreSQL timestamp[tz] and date. To use the binary format we would need to also specify what the PostgreSQL
|
||||
// type OID is. But that would mean telling PostgreSQL that we have sent a timestamp[tz] when what is needed is a date.
|
||||
// This means that the value is converted from text to timestamp[tz] to date. This means it does a time zone conversion
|
||||
// before converting it to date. This means that dates can be shifted by one day. In text format without that double
|
||||
// type conversion it takes the date directly and ignores time zone (i.e. it works).
|
||||
//
|
||||
// Given that the whole point of QueryExecModeExec is to operate without having to know the PostgreSQL types there is
|
||||
// no way to safely use binary or to specify the parameter OIDs.
|
||||
func (c *Conn) appendParamsForQueryExecModeExec(args []any) error {
|
||||
for _, arg := range args {
|
||||
if arg == nil {
|
||||
err := c.eqb.AppendParamFormat(c.typeMap, 0, TextFormatCode, arg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
dt, ok := c.TypeMap().TypeForValue(arg)
|
||||
if !ok {
|
||||
var tv pgtype.TextValuer
|
||||
if tv, ok = arg.(pgtype.TextValuer); ok {
|
||||
t, err := tv.TextValue()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dt, ok = c.TypeMap().TypeForOID(pgtype.TextOID)
|
||||
if ok {
|
||||
arg = t
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
var str fmt.Stringer
|
||||
if str, ok = arg.(fmt.Stringer); ok {
|
||||
dt, ok = c.TypeMap().TypeForOID(pgtype.TextOID)
|
||||
if ok {
|
||||
arg = str.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
return &unknownArgumentTypeQueryExecModeExecError{arg: arg}
|
||||
}
|
||||
err := c.eqb.AppendParamFormat(c.typeMap, dt.OID, TextFormatCode, arg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) getRows(ctx context.Context, sql string, args []any) *connRows {
|
||||
r := &connRows{}
|
||||
func (c *Conn) getRows(ctx context.Context, sql string, args []any) *baseRows {
|
||||
r := &baseRows{}
|
||||
|
||||
r.ctx = ctx
|
||||
r.logger = c
|
||||
@@ -735,7 +650,7 @@ optionLoop:
|
||||
sql, args = queryRewriter.RewriteQuery(ctx, c, sql, args)
|
||||
}
|
||||
|
||||
c.eqb.Reset()
|
||||
c.eqb.reset()
|
||||
anynil.NormalizeSlice(args)
|
||||
rows := c.getRows(ctx, sql, args)
|
||||
|
||||
@@ -782,13 +697,10 @@ optionLoop:
|
||||
|
||||
rows.sql = sd.SQL
|
||||
|
||||
for i := range args {
|
||||
err = c.eqb.AppendParam(c.typeMap, sd.ParamOIDs[i], args[i])
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to encode args[%d]: %v", i, err)
|
||||
rows.fatal(err)
|
||||
return rows, rows.err
|
||||
}
|
||||
err = c.eqb.Build(c.typeMap, sd, args)
|
||||
if err != nil {
|
||||
rows.fatal(err)
|
||||
return rows, rows.err
|
||||
}
|
||||
|
||||
if resultFormatsByOID != nil {
|
||||
@@ -799,26 +711,22 @@ optionLoop:
|
||||
}
|
||||
|
||||
if resultFormats == nil {
|
||||
for i := range sd.Fields {
|
||||
c.eqb.AppendResultFormat(c.TypeMap().FormatCodeForOID(sd.Fields[i].DataTypeOID))
|
||||
}
|
||||
|
||||
resultFormats = c.eqb.resultFormats
|
||||
resultFormats = c.eqb.ResultFormats
|
||||
}
|
||||
|
||||
if !explicitPreparedStatement && mode == QueryExecModeCacheDescribe {
|
||||
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, resultFormats)
|
||||
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, resultFormats)
|
||||
} else {
|
||||
rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.paramValues, c.eqb.paramFormats, resultFormats)
|
||||
rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, resultFormats)
|
||||
}
|
||||
} else if mode == QueryExecModeExec {
|
||||
err := c.appendParamsForQueryExecModeExec(args)
|
||||
err := c.eqb.Build(c.typeMap, nil, args)
|
||||
if err != nil {
|
||||
rows.fatal(err)
|
||||
return rows, rows.err
|
||||
}
|
||||
|
||||
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.paramValues, nil, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats)
|
||||
} else if mode == QueryExecModeSimpleProtocol {
|
||||
sql, err = c.sanitizeForSimpleQuery(sql, args...)
|
||||
if err != nil {
|
||||
@@ -843,7 +751,7 @@ optionLoop:
|
||||
return rows, rows.err
|
||||
}
|
||||
|
||||
c.eqb.Reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
|
||||
return rows, rows.err
|
||||
}
|
||||
@@ -853,7 +761,7 @@ optionLoop:
|
||||
// error with ErrNoRows if no rows are returned.
|
||||
func (c *Conn) QueryRow(ctx context.Context, sql string, args ...any) Row {
|
||||
rows, _ := c.Query(ctx, sql, args...)
|
||||
return (*connRow)(rows.(*connRows))
|
||||
return (*connRow)(rows.(*baseRows))
|
||||
}
|
||||
|
||||
// QueryFuncRow is the argument to the QueryFunc callback function.
|
||||
@@ -954,34 +862,23 @@ func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults {
|
||||
|
||||
if mode == QueryExecModeExec {
|
||||
for _, bi := range b.items {
|
||||
c.eqb.Reset()
|
||||
c.eqb.reset()
|
||||
anynil.NormalizeSlice(bi.arguments)
|
||||
|
||||
sd := c.preparedStatements[bi.query]
|
||||
if sd != nil {
|
||||
if len(sd.ParamOIDs) != len(bi.arguments) {
|
||||
return &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("mismatched param and argument count")}
|
||||
}
|
||||
|
||||
for i := range bi.arguments {
|
||||
err := c.eqb.AppendParam(c.typeMap, sd.ParamOIDs[i], bi.arguments[i])
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to encode args[%d]: %v", i, err)
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
}
|
||||
|
||||
for i := range sd.Fields {
|
||||
c.eqb.AppendResultFormat(c.TypeMap().FormatCodeForOID(sd.Fields[i].DataTypeOID))
|
||||
}
|
||||
|
||||
batch.ExecPrepared(sd.Name, c.eqb.paramValues, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
} else {
|
||||
err := c.appendParamsForQueryExecModeExec(bi.arguments)
|
||||
err := c.eqb.Build(c.typeMap, sd, bi.arguments)
|
||||
if err != nil {
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
batch.ExecParams(bi.query, c.eqb.paramValues, nil, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
|
||||
batch.ExecPrepared(sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
|
||||
} else {
|
||||
err := c.eqb.Build(c.typeMap, nil, bi.arguments)
|
||||
if err != nil {
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
batch.ExecParams(bi.query, c.eqb.ParamValues, nil, c.eqb.ParamFormats, c.eqb.ResultFormats)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1014,7 +911,7 @@ func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults {
|
||||
}
|
||||
|
||||
for _, bi := range b.items {
|
||||
c.eqb.Reset()
|
||||
c.eqb.reset()
|
||||
|
||||
sd := c.preparedStatements[bi.query]
|
||||
if sd == nil {
|
||||
@@ -1029,29 +926,20 @@ func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults {
|
||||
return &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("mismatched param and argument count")}
|
||||
}
|
||||
|
||||
anynil.NormalizeSlice(bi.arguments)
|
||||
|
||||
for i := range bi.arguments {
|
||||
err := c.eqb.AppendParam(c.typeMap, sd.ParamOIDs[i], bi.arguments[i])
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to encode args[%d]: %v", i, err)
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
}
|
||||
|
||||
for i := range sd.Fields {
|
||||
c.eqb.AppendResultFormat(c.TypeMap().FormatCodeForOID(sd.Fields[i].DataTypeOID))
|
||||
err := c.eqb.Build(c.typeMap, sd, bi.arguments)
|
||||
if err != nil {
|
||||
return &batchResults{ctx: ctx, conn: c, err: err}
|
||||
}
|
||||
|
||||
if sd.Name == "" {
|
||||
batch.ExecParams(bi.query, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
batch.ExecParams(bi.query, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats)
|
||||
} else {
|
||||
batch.ExecPrepared(sd.Name, c.eqb.paramValues, c.eqb.paramFormats, c.eqb.resultFormats)
|
||||
batch.ExecPrepared(sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.eqb.Reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
|
||||
|
||||
mrr := c.pgConn.ExecBatch(ctx, batch)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user