2
0

Reduce allocations and copies in pgproto3

Altered chunkreader to never reuse memory.

Altered pgproto3 to to copy memory when decoding. Renamed UnmarshalBinary to
Decode because of changed semantics.
This commit is contained in:
Jack Christensen
2017-04-29 11:55:14 -05:00
parent de9bb7e6d8
commit eff55451cf
24 changed files with 70 additions and 108 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ type Authentication struct {
func (*Authentication) Backend() {}
func (dst *Authentication) UnmarshalBinary(src []byte) error {
func (dst *Authentication) Decode(src []byte) error {
*dst = Authentication{Type: binary.BigEndian.Uint32(src[:4])}
switch dst.Type {
+1 -1
View File
@@ -13,7 +13,7 @@ type BackendKeyData struct {
func (*BackendKeyData) Backend() {}
func (dst *BackendKeyData) UnmarshalBinary(src []byte) error {
func (dst *BackendKeyData) Decode(src []byte) error {
if len(src) != 8 {
return &invalidMessageLenErr{messageType: "BackendKeyData", expectedLen: 8, actualLen: len(src)}
}
+1 -1
View File
@@ -8,7 +8,7 @@ type BindComplete struct{}
func (*BindComplete) Backend() {}
func (dst *BindComplete) UnmarshalBinary(src []byte) error {
func (dst *BindComplete) Decode(src []byte) error {
if len(src) != 0 {
return &invalidMessageLenErr{messageType: "BindComplete", expectedLen: 0, actualLen: len(src)}
}
+1 -1
View File
@@ -8,7 +8,7 @@ type CloseComplete struct{}
func (*CloseComplete) Backend() {}
func (dst *CloseComplete) UnmarshalBinary(src []byte) error {
func (dst *CloseComplete) Decode(src []byte) error {
if len(src) != 0 {
return &invalidMessageLenErr{messageType: "CloseComplete", expectedLen: 0, actualLen: len(src)}
}
+6 -7
View File
@@ -11,14 +11,13 @@ type CommandComplete struct {
func (*CommandComplete) Backend() {}
func (dst *CommandComplete) UnmarshalBinary(src []byte) error {
buf := bytes.NewBuffer(src)
b, err := buf.ReadBytes(0)
if err != nil {
return err
func (dst *CommandComplete) Decode(src []byte) error {
idx := bytes.IndexByte(src, 0)
if idx != len(src)-1 {
return &invalidMessageFormatErr{messageType: "CommandComplete"}
}
dst.CommandTag = string(b[:len(b)-1])
dst.CommandTag = string(src[:idx])
return nil
}
+1 -1
View File
@@ -13,7 +13,7 @@ type CopyBothResponse struct {
func (*CopyBothResponse) Backend() {}
func (dst *CopyBothResponse) UnmarshalBinary(src []byte) error {
func (dst *CopyBothResponse) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 3 {
+2 -3
View File
@@ -13,9 +13,8 @@ type CopyData struct {
func (*CopyData) Backend() {}
func (*CopyData) Frontend() {}
func (dst *CopyData) UnmarshalBinary(src []byte) error {
dst.Data = make([]byte, len(src))
copy(dst.Data, src)
func (dst *CopyData) Decode(src []byte) error {
dst.Data = src
return nil
}
+1 -1
View File
@@ -13,7 +13,7 @@ type CopyInResponse struct {
func (*CopyInResponse) Backend() {}
func (dst *CopyInResponse) UnmarshalBinary(src []byte) error {
func (dst *CopyInResponse) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 3 {
+1 -1
View File
@@ -13,7 +13,7 @@ type CopyOutResponse struct {
func (*CopyOutResponse) Backend() {}
func (dst *CopyOutResponse) UnmarshalBinary(src []byte) error {
func (dst *CopyOutResponse) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 3 {
+23 -16
View File
@@ -13,35 +13,42 @@ type DataRow struct {
func (*DataRow) Backend() {}
func (dst *DataRow) UnmarshalBinary(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 2 {
func (dst *DataRow) Decode(src []byte) error {
if len(src) < 2 {
return &invalidMessageFormatErr{messageType: "DataRow"}
}
fieldCount := int(binary.BigEndian.Uint16(buf.Next(2)))
rp := 0
fieldCount := int(binary.BigEndian.Uint16(src[rp:]))
rp += 2
dst.Values = make([][]byte, fieldCount)
// If the capacity of the values slice is too small OR substantially too
// large reallocate. This is too avoid one row with many columns from
// permanently allocating memory.
if cap(dst.Values) < fieldCount || cap(dst.Values)-fieldCount > 32 {
dst.Values = make([][]byte, fieldCount, 32)
} else {
dst.Values = dst.Values[:fieldCount]
}
for i := 0; i < fieldCount; i++ {
if buf.Len() < 4 {
if len(src[rp:]) < 4 {
return &invalidMessageFormatErr{messageType: "DataRow"}
}
msgSize := int(int32(binary.BigEndian.Uint32(buf.Next(4))))
msgSize := int(int32(binary.BigEndian.Uint32(src[rp:])))
rp += 4
// null
if msgSize == -1 {
continue
}
dst.Values[i] = nil
} else {
if len(src[rp:]) < msgSize {
return &invalidMessageFormatErr{messageType: "DataRow"}
}
value := make([]byte, msgSize)
_, err := buf.Read(value)
if err != nil {
return err
dst.Values[i] = src[rp : rp+msgSize]
rp += msgSize
}
dst.Values[i] = value
}
return nil
+1 -1
View File
@@ -8,7 +8,7 @@ type EmptyQueryResponse struct{}
func (*EmptyQueryResponse) Backend() {}
func (dst *EmptyQueryResponse) UnmarshalBinary(src []byte) error {
func (dst *EmptyQueryResponse) Decode(src []byte) error {
if len(src) != 0 {
return &invalidMessageLenErr{messageType: "EmptyQueryResponse", expectedLen: 0, actualLen: len(src)}
}
+1 -1
View File
@@ -30,7 +30,7 @@ type ErrorResponse struct {
func (*ErrorResponse) Backend() {}
func (dst *ErrorResponse) UnmarshalBinary(src []byte) error {
func (dst *ErrorResponse) Decode(src []byte) error {
*dst = ErrorResponse{}
buf := bytes.NewBuffer(src)
+1 -1
View File
@@ -108,6 +108,6 @@ func (b *Frontend) Receive() (BackendMessage, error) {
return nil, err
}
err = msg.UnmarshalBinary(msgBody)
err = msg.Decode(msgBody)
return msg, err
}
+13 -9
View File
@@ -13,20 +13,24 @@ type FunctionCallResponse struct {
func (*FunctionCallResponse) Backend() {}
func (dst *FunctionCallResponse) UnmarshalBinary(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 4 {
func (dst *FunctionCallResponse) Decode(src []byte) error {
if len(src) < 4 {
return &invalidMessageFormatErr{messageType: "FunctionCallResponse"}
}
resultSize := int(binary.BigEndian.Uint32(buf.Next(4)))
if buf.Len() != resultSize {
rp := 0
resultSize := int(binary.BigEndian.Uint32(src[rp:]))
rp += 4
if resultSize == -1 {
dst.Result = nil
return nil
}
if len(src[rp:]) != resultSize {
return &invalidMessageFormatErr{messageType: "FunctionCallResponse"}
}
dst.Result = make([]byte, resultSize)
copy(dst.Result, buf.Bytes())
dst.Result = src[rp:]
return nil
}
+1 -1
View File
@@ -8,7 +8,7 @@ type NoData struct{}
func (*NoData) Backend() {}
func (dst *NoData) UnmarshalBinary(src []byte) error {
func (dst *NoData) Decode(src []byte) error {
if len(src) != 0 {
return &invalidMessageLenErr{messageType: "NoData", expectedLen: 0, actualLen: len(src)}
}
+2 -2
View File
@@ -4,8 +4,8 @@ type NoticeResponse ErrorResponse
func (*NoticeResponse) Backend() {}
func (dst *NoticeResponse) UnmarshalBinary(src []byte) error {
return (*ErrorResponse)(dst).UnmarshalBinary(src)
func (dst *NoticeResponse) Decode(src []byte) error {
return (*ErrorResponse)(dst).Decode(src)
}
func (src *NoticeResponse) MarshalBinary() ([]byte, error) {
+1 -1
View File
@@ -14,7 +14,7 @@ type NotificationResponse struct {
func (*NotificationResponse) Backend() {}
func (dst *NotificationResponse) UnmarshalBinary(src []byte) error {
func (dst *NotificationResponse) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
pid := binary.BigEndian.Uint32(buf.Next(4))
+1 -1
View File
@@ -12,7 +12,7 @@ type ParameterDescription struct {
func (*ParameterDescription) Backend() {}
func (dst *ParameterDescription) UnmarshalBinary(src []byte) error {
func (dst *ParameterDescription) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 2 {
+1 -1
View File
@@ -13,7 +13,7 @@ type ParameterStatus struct {
func (*ParameterStatus) Backend() {}
func (dst *ParameterStatus) UnmarshalBinary(src []byte) error {
func (dst *ParameterStatus) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
b, err := buf.ReadBytes(0)
+1 -1
View File
@@ -8,7 +8,7 @@ type ParseComplete struct{}
func (*ParseComplete) Backend() {}
func (dst *ParseComplete) UnmarshalBinary(src []byte) error {
func (dst *ParseComplete) Decode(src []byte) error {
if len(src) != 0 {
return &invalidMessageLenErr{messageType: "ParseComplete", expectedLen: 0, actualLen: len(src)}
}
+6 -53
View File
@@ -2,8 +2,13 @@ package pgproto3
import "fmt"
// Message is the interface implemented by an object that can decode and encode
// a particular PostgreSQL message.
//
// Decode is allowed and expected to retain a reference to data after
// returning (unlike encoding.BinaryUnmarshaler).
type Message interface {
UnmarshalBinary(data []byte) error
Decode(data []byte) error
MarshalBinary() (data []byte, err error)
}
@@ -17,58 +22,6 @@ type BackendMessage interface {
Backend() // no-op method to distinguish frontend from backend methods
}
// func ParseBackend(typeByte byte, body []byte) (BackendMessage, error) {
// switch typeByte {
// case '1':
// return ParseParseComplete(body)
// case '2':
// return ParseBindComplete(body)
// case 'C':
// return ParseCommandComplete(body)
// case 'D':
// return ParseDataRow(body)
// case 'E':
// return ParseErrorResponse(body)
// case 'K':
// return ParseBackendKeyData(body)
// case 'R':
// return ParseAuthentication(body)
// case 'S':
// return ParseParameterStatus(body)
// case 'T':
// return ParseRowDescription(body)
// case 't':
// return ParseParameterDescription(body)
// case 'Z':
// return ParseReadyForQuery(body)
// default:
// return ParseUnknownMessage(typeByte, body)
// }
// }
// func ParseFrontend(typeByte byte, body []byte) (FrontendMessage, error) {
// switch typeByte {
// case 'B':
// return ParseBind(body)
// case 'D':
// return ParseDescribe(body)
// case 'E':
// return ParseExecute(body)
// case 'P':
// return ParseParse(body)
// case 'p':
// return ParsePasswordMessage(body)
// case 'Q':
// return ParseQuery(body)
// case 'S':
// return ParseSync(body)
// case 'X':
// return ParseTerminate(body)
// default:
// return ParseUnknownMessage(typeByte, body)
// }
// }
type invalidMessageLenErr struct {
messageType string
expectedLen int
+1 -1
View File
@@ -11,7 +11,7 @@ type Query struct {
func (*Query) Frontend() {}
func (dst *Query) UnmarshalBinary(src []byte) error {
func (dst *Query) Decode(src []byte) error {
i := bytes.IndexByte(src, 0)
if i != len(src)-1 {
return &invalidMessageFormatErr{messageType: "Query"}
+1 -1
View File
@@ -10,7 +10,7 @@ type ReadyForQuery struct {
func (*ReadyForQuery) Backend() {}
func (dst *ReadyForQuery) UnmarshalBinary(src []byte) error {
func (dst *ReadyForQuery) Decode(src []byte) error {
if len(src) != 1 {
return &invalidMessageLenErr{messageType: "ReadyForQuery", expectedLen: 1, actualLen: len(src)}
}
+1 -1
View File
@@ -27,7 +27,7 @@ type RowDescription struct {
func (*RowDescription) Backend() {}
func (dst *RowDescription) UnmarshalBinary(src []byte) error {
func (dst *RowDescription) Decode(src []byte) error {
buf := bytes.NewBuffer(src)
if buf.Len() < 2 {