Fix reading interrupted messages
When an message is received and a timeout occurs after reading the header but before reading the entire body the connection state could be corrupted due to the header being consumed. The next read would consider the body of the previous message as the header for the next. fixes #348
This commit is contained in:
+18
-9
@@ -24,6 +24,10 @@ type Backend struct {
|
||||
startupMessage StartupMessage
|
||||
sync Sync
|
||||
terminate Terminate
|
||||
|
||||
bodyLen int
|
||||
msgType byte
|
||||
partialMsg bool
|
||||
}
|
||||
|
||||
func NewBackend(r io.Reader, w io.Writer) (*Backend, error) {
|
||||
@@ -57,16 +61,19 @@ func (b *Backend) ReceiveStartupMessage() (*StartupMessage, error) {
|
||||
}
|
||||
|
||||
func (b *Backend) Receive() (FrontendMessage, error) {
|
||||
header, err := b.cr.Next(5)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !b.partialMsg {
|
||||
header, err := b.cr.Next(5)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.msgType = header[0]
|
||||
b.bodyLen = int(binary.BigEndian.Uint32(header[1:])) - 4
|
||||
b.partialMsg = true
|
||||
}
|
||||
|
||||
msgType := header[0]
|
||||
bodyLen := int(binary.BigEndian.Uint32(header[1:])) - 4
|
||||
|
||||
var msg FrontendMessage
|
||||
switch msgType {
|
||||
switch b.msgType {
|
||||
case 'B':
|
||||
msg = &b.bind
|
||||
case 'C':
|
||||
@@ -88,14 +95,16 @@ func (b *Backend) Receive() (FrontendMessage, error) {
|
||||
case 'X':
|
||||
msg = &b.terminate
|
||||
default:
|
||||
return nil, errors.Errorf("unknown message type: %c", msgType)
|
||||
return nil, errors.Errorf("unknown message type: %c", b.msgType)
|
||||
}
|
||||
|
||||
msgBody, err := b.cr.Next(bodyLen)
|
||||
msgBody, err := b.cr.Next(b.bodyLen)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.partialMsg = false
|
||||
|
||||
err = msg.Decode(msgBody)
|
||||
return msg, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user