From 357e5c47358f8d705dd2a4cf86d7332eff7a6daa Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 29 Jun 2013 16:39:07 -0500 Subject: [PATCH] Refactor to using bytes.Buffer --- connection.go | 85 ++++++++++++++++++++++++++++------------------- message_reader.go | 4 +-- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/connection.go b/connection.go index e30ce681..6499779d 100644 --- a/connection.go +++ b/connection.go @@ -1,6 +1,7 @@ package pgx import ( + "bytes" "crypto/md5" "encoding/binary" "encoding/hex" @@ -21,7 +22,7 @@ type ConnectionParameters struct { type Connection struct { conn net.Conn // the underlying TCP or unix domain socket connection - buf []byte // work buffer to avoid constant alloc and dealloc + buf *bytes.Buffer // work buffer to avoid constant alloc and dealloc pid int32 // backend pid secretKey int32 // key to use to send a cancel query message to the server runtimeParams map[string]string // parameters that have been reported by the server @@ -46,6 +47,8 @@ func (e UnexpectedColumnCountError) Error() string { return fmt.Sprintf("Expected result to have %d column(s), instead it has %d", e.ExpectedCount, e.ActualCount) } +const sharedBufferSize = 1024 + func Connect(parameters ConnectionParameters) (c *Connection, err error) { c = new(Connection) @@ -66,7 +69,7 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) { } } - c.buf = make([]byte, 1024) + c.buf = bytes.NewBuffer(make([]byte, sharedBufferSize)) c.runtimeParams = make(map[string]string) msg := newStartupMessage() @@ -104,9 +107,9 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) { func (c *Connection) Close() (err error) { buf := c.getBuf(5) - buf[0] = 'X' - binary.BigEndian.PutUint32(buf[1:], 4) - _, err = c.conn.Write(buf) + buf.WriteByte('X') + binary.Write(buf, binary.BigEndian, 4) + _, err = buf.WriteTo(c.conn) return } @@ -215,12 +218,12 @@ func (c *Connection) SelectValues(sql string) (values []interface{}, err error) func (c *Connection) sendSimpleQuery(sql string) (err error) { bufSize := 5 + len(sql) + 1 // message identifier (1), message size (4), null string terminator (1) buf := c.getBuf(bufSize) - buf[0] = 'Q' - binary.BigEndian.PutUint32(buf[1:5], uint32(bufSize-1)) - copy(buf[5:], sql) - buf[bufSize-1] = 0 + buf.WriteByte('Q') + binary.Write(buf, binary.BigEndian, uint32(bufSize-1)) + buf.WriteString(sql) + buf.WriteByte(0) - _, err = c.conn.Write(buf) + _, err = buf.WriteTo(c.conn) return err } @@ -280,7 +283,7 @@ func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) { return } - var body []byte + var body *bytes.Buffer if body, err = c.rxMsgBody(bodySize); err != nil { return } @@ -290,19 +293,23 @@ func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) { } func (c *Connection) rxMsgHeader() (t byte, bodySize int32, err error) { - buf := c.buf[:5] - if _, err = io.ReadFull(c.conn, buf); err != nil { + buf := c.getBuf(5) + if _, err = io.CopyN(buf, c.conn, 5); err != nil { return 0, 0, err } - t = buf[0] - bodySize = int32(binary.BigEndian.Uint32(buf[1:5])) - 4 - return t, bodySize, nil + t, err = buf.ReadByte() + if err != nil { + return + } + err = binary.Read(buf, binary.BigEndian, &bodySize) + bodySize -= 4 // remove self from size + return } -func (c *Connection) rxMsgBody(bodySize int32) (buf []byte, err error) { +func (c *Connection) rxMsgBody(bodySize int32) (buf *bytes.Buffer, err error) { buf = c.getBuf(int(bodySize)) - _, err = io.ReadFull(c.conn, buf) + _, err = io.CopyN(buf, c.conn, int64(bodySize)) return } @@ -401,22 +408,32 @@ func (c *Connection) txStartupMessage(msg *startupMessage) (err error) { func (c *Connection) txPasswordMessage(password string) (err error) { bufSize := 5 + len(password) + 1 // message identifier (1), message size (4), password, null string terminator (1) buf := c.getBuf(bufSize) - buf[0] = 'p' - binary.BigEndian.PutUint32(buf[1:5], uint32(bufSize-1)) - copy(buf[5:], password) - buf[bufSize-1] = 0 - - _, err = c.conn.Write(buf) - return err -} - -// Gets a []byte of n length. If possible it will reuse the connection buffer -// otherwise it will allocate a new buffer -func (c *Connection) getBuf(n int) (buf []byte) { - if n <= cap(c.buf) { - buf = c.buf[:n] - } else { - buf = make([]byte, n) + err = buf.WriteByte('p') + if err != nil { + return } + err = binary.Write(buf, binary.BigEndian, int32(bufSize-1)) + if err != nil { + return + } + _, err = buf.WriteString(password) + if err != nil { + return + } + buf.WriteByte(0) + if err != nil { + return + } + _, err = buf.WriteTo(c.conn) return } + +// Gets a buffer of up to n bytes. If it is a large request it will return a new buffer, if it is small enough it will return the shared connection buffer +func (c *Connection) getBuf(n int) *bytes.Buffer { + if n < sharedBufferSize { + c.buf.Reset() + return c.buf + } else { + return bytes.NewBuffer(make([]byte, n)) + } +} diff --git a/message_reader.go b/message_reader.go index 932e7ed1..7e2c74c6 100644 --- a/message_reader.go +++ b/message_reader.go @@ -7,8 +7,8 @@ import ( type MessageReader []byte -func newMessageReader(buf []byte) *MessageReader { - r := MessageReader(buf) +func newMessageReader(buf *bytes.Buffer) *MessageReader { + r := MessageReader(buf.Bytes()) return &r }