Refactor to using bytes.Buffer
This commit is contained in:
+51
-34
@@ -1,6 +1,7 @@
|
||||
package pgx
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
@@ -21,7 +22,7 @@ type ConnectionParameters struct {
|
||||
|
||||
type Connection struct {
|
||||
conn net.Conn // the underlying TCP or unix domain socket connection
|
||||
buf []byte // work buffer to avoid constant alloc and dealloc
|
||||
buf *bytes.Buffer // work buffer to avoid constant alloc and dealloc
|
||||
pid int32 // backend pid
|
||||
secretKey int32 // key to use to send a cancel query message to the server
|
||||
runtimeParams map[string]string // parameters that have been reported by the server
|
||||
@@ -46,6 +47,8 @@ func (e UnexpectedColumnCountError) Error() string {
|
||||
return fmt.Sprintf("Expected result to have %d column(s), instead it has %d", e.ExpectedCount, e.ActualCount)
|
||||
}
|
||||
|
||||
const sharedBufferSize = 1024
|
||||
|
||||
func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
||||
c = new(Connection)
|
||||
|
||||
@@ -66,7 +69,7 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
c.buf = make([]byte, 1024)
|
||||
c.buf = bytes.NewBuffer(make([]byte, sharedBufferSize))
|
||||
c.runtimeParams = make(map[string]string)
|
||||
|
||||
msg := newStartupMessage()
|
||||
@@ -104,9 +107,9 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
|
||||
|
||||
func (c *Connection) Close() (err error) {
|
||||
buf := c.getBuf(5)
|
||||
buf[0] = 'X'
|
||||
binary.BigEndian.PutUint32(buf[1:], 4)
|
||||
_, err = c.conn.Write(buf)
|
||||
buf.WriteByte('X')
|
||||
binary.Write(buf, binary.BigEndian, 4)
|
||||
_, err = buf.WriteTo(c.conn)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -215,12 +218,12 @@ func (c *Connection) SelectValues(sql string) (values []interface{}, err error)
|
||||
func (c *Connection) sendSimpleQuery(sql string) (err error) {
|
||||
bufSize := 5 + len(sql) + 1 // message identifier (1), message size (4), null string terminator (1)
|
||||
buf := c.getBuf(bufSize)
|
||||
buf[0] = 'Q'
|
||||
binary.BigEndian.PutUint32(buf[1:5], uint32(bufSize-1))
|
||||
copy(buf[5:], sql)
|
||||
buf[bufSize-1] = 0
|
||||
buf.WriteByte('Q')
|
||||
binary.Write(buf, binary.BigEndian, uint32(bufSize-1))
|
||||
buf.WriteString(sql)
|
||||
buf.WriteByte(0)
|
||||
|
||||
_, err = c.conn.Write(buf)
|
||||
_, err = buf.WriteTo(c.conn)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -280,7 +283,7 @@ func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
var body []byte
|
||||
var body *bytes.Buffer
|
||||
if body, err = c.rxMsgBody(bodySize); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -290,19 +293,23 @@ func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) {
|
||||
}
|
||||
|
||||
func (c *Connection) rxMsgHeader() (t byte, bodySize int32, err error) {
|
||||
buf := c.buf[:5]
|
||||
if _, err = io.ReadFull(c.conn, buf); err != nil {
|
||||
buf := c.getBuf(5)
|
||||
if _, err = io.CopyN(buf, c.conn, 5); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
t = buf[0]
|
||||
bodySize = int32(binary.BigEndian.Uint32(buf[1:5])) - 4
|
||||
return t, bodySize, nil
|
||||
t, err = buf.ReadByte()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = binary.Read(buf, binary.BigEndian, &bodySize)
|
||||
bodySize -= 4 // remove self from size
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Connection) rxMsgBody(bodySize int32) (buf []byte, err error) {
|
||||
func (c *Connection) rxMsgBody(bodySize int32) (buf *bytes.Buffer, err error) {
|
||||
buf = c.getBuf(int(bodySize))
|
||||
_, err = io.ReadFull(c.conn, buf)
|
||||
_, err = io.CopyN(buf, c.conn, int64(bodySize))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -401,22 +408,32 @@ func (c *Connection) txStartupMessage(msg *startupMessage) (err error) {
|
||||
func (c *Connection) txPasswordMessage(password string) (err error) {
|
||||
bufSize := 5 + len(password) + 1 // message identifier (1), message size (4), password, null string terminator (1)
|
||||
buf := c.getBuf(bufSize)
|
||||
buf[0] = 'p'
|
||||
binary.BigEndian.PutUint32(buf[1:5], uint32(bufSize-1))
|
||||
copy(buf[5:], password)
|
||||
buf[bufSize-1] = 0
|
||||
|
||||
_, err = c.conn.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
// Gets a []byte of n length. If possible it will reuse the connection buffer
|
||||
// otherwise it will allocate a new buffer
|
||||
func (c *Connection) getBuf(n int) (buf []byte) {
|
||||
if n <= cap(c.buf) {
|
||||
buf = c.buf[:n]
|
||||
} else {
|
||||
buf = make([]byte, n)
|
||||
err = buf.WriteByte('p')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = binary.Write(buf, binary.BigEndian, int32(bufSize-1))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = buf.WriteString(password)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
buf.WriteByte(0)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = buf.WriteTo(c.conn)
|
||||
return
|
||||
}
|
||||
|
||||
// Gets a buffer of up to n bytes. If it is a large request it will return a new buffer, if it is small enough it will return the shared connection buffer
|
||||
func (c *Connection) getBuf(n int) *bytes.Buffer {
|
||||
if n < sharedBufferSize {
|
||||
c.buf.Reset()
|
||||
return c.buf
|
||||
} else {
|
||||
return bytes.NewBuffer(make([]byte, n))
|
||||
}
|
||||
}
|
||||
|
||||
+2
-2
@@ -7,8 +7,8 @@ import (
|
||||
|
||||
type MessageReader []byte
|
||||
|
||||
func newMessageReader(buf []byte) *MessageReader {
|
||||
r := MessageReader(buf)
|
||||
func newMessageReader(buf *bytes.Buffer) *MessageReader {
|
||||
r := MessageReader(buf.Bytes())
|
||||
return &r
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user