2
0

Pass *messageReader to rx* instead of raw buffer

This commit is contained in:
Jack Christensen
2013-04-08 20:37:09 -05:00
parent d6ca7af1e3
commit 006a98d1a2
2 changed files with 38 additions and 30 deletions
+32 -30
View File
@@ -101,20 +101,22 @@ func (c *conn) sendSimpleQuery(sql string) (err error) {
func (c *conn) processMsg() (msg interface{}, err error) { func (c *conn) processMsg() (msg interface{}, err error) {
var t byte var t byte
var body []byte var r *messageReader
t, body, err = c.rxMsg() t, r, err = c.rxMsg()
if err != nil { if err != nil {
return return
} }
return c.parseMsg(t, body) return c.parseMsg(t, r)
} }
// Processes messages that are not exclusive to one context such as authentication sdakl sdafj sda sda sd ds ds sad sd sa sd sad dsfsd af // Processes messages that are not exclusive to one context such as
func (c *conn) processContextFreeMsg(t byte, body []byte) (err error) { // authentication or query response. The response to these messages
// is the same regardless of when they occur.
func (c *conn) processContextFreeMsg(t byte, r *messageReader) (err error) {
switch t { switch t {
case 'S': case 'S':
c.rxParameterStatus(body) c.rxParameterStatus(r)
return nil return nil
default: default:
return fmt.Errorf("Received unknown message type: %c", t) return fmt.Errorf("Received unknown message type: %c", t)
@@ -124,36 +126,41 @@ func (c *conn) processContextFreeMsg(t byte, body []byte) (err error) {
} }
func (c *conn) parseMsg(t byte, body []byte) (msg interface{}, err error) { func (c *conn) parseMsg(t byte, r *messageReader) (msg interface{}, err error) {
switch t { switch t {
case 'K': case 'K':
c.rxBackendKeyData(body) c.rxBackendKeyData(r)
return nil, nil return nil, nil
case 'R': case 'R':
return c.rxAuthenticationX(body) return c.rxAuthenticationX(r)
case 'Z': case 'Z':
return c.rxReadyForQuery(body), nil return c.rxReadyForQuery(r), nil
case 'T': case 'T':
return c.rxRowDescription(body) return c.rxRowDescription(r)
case 'D': case 'D':
return c.rxDataRow(body) return c.rxDataRow(r)
case 'C': case 'C':
return c.rxCommandComplete(body), nil return c.rxCommandComplete(r), nil
default: default:
return nil, c.processContextFreeMsg(t, body) return nil, c.processContextFreeMsg(t, r)
} }
panic("Unreachable") panic("Unreachable")
} }
func (c *conn) rxMsg() (t byte, body []byte, err error) { func (c *conn) rxMsg() (t byte, r *messageReader, err error) {
var bodySize int32 var bodySize int32
t, bodySize, err = c.rxMsgHeader() t, bodySize, err = c.rxMsgHeader()
if err != nil { if err != nil {
return return
} }
body, err = c.rxMsgBody(bodySize) var body []byte
if body, err = c.rxMsgBody(bodySize); err != nil {
return
}
r = newMessageReader(body)
return return
} }
@@ -174,8 +181,8 @@ func (c *conn) rxMsgBody(bodySize int32) (buf []byte, err error) {
return return
} }
func (c *conn) rxAuthenticationX(buf []byte) (msg interface{}, err error) { func (c *conn) rxAuthenticationX(r *messageReader) (msg interface{}, err error) {
code := binary.BigEndian.Uint32(buf[:4]) code := r.readInt32()
switch code { switch code {
case 0: case 0:
return &authenticationOk{}, nil return &authenticationOk{}, nil
@@ -186,27 +193,24 @@ func (c *conn) rxAuthenticationX(buf []byte) (msg interface{}, err error) {
panic("Unreachable") panic("Unreachable")
} }
func (c *conn) rxParameterStatus(buf []byte) { func (c *conn) rxParameterStatus(r *messageReader) {
r := newMessageReader(buf)
key := r.readString() key := r.readString()
value := r.readString() value := r.readString()
c.runtimeParams[key] = value c.runtimeParams[key] = value
} }
func (c *conn) rxBackendKeyData(buf []byte) { func (c *conn) rxBackendKeyData(r *messageReader) {
r := newMessageReader(buf)
c.pid = r.readInt32() c.pid = r.readInt32()
c.secretKey = r.readInt32() c.secretKey = r.readInt32()
} }
func (c *conn) rxReadyForQuery(buf []byte) (msg *readyForQuery) { func (c *conn) rxReadyForQuery(r *messageReader) (msg *readyForQuery) {
msg = new(readyForQuery) msg = new(readyForQuery)
msg.txStatus = buf[0] msg.txStatus = r.readByte()
return return
} }
func (c *conn) rxRowDescription(buf []byte) (msg *rowDescription, err error) { func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err error) {
r := newMessageReader(buf)
fieldCount := r.readInt16() fieldCount := r.readInt16()
c.rowDesc.fields = make([]fieldDescription, fieldCount) c.rowDesc.fields = make([]fieldDescription, fieldCount)
for i := int16(0); i < fieldCount; i++ { for i := int16(0); i < fieldCount; i++ {
@@ -222,8 +226,7 @@ func (c *conn) rxRowDescription(buf []byte) (msg *rowDescription, err error) {
return return
} }
func (c *conn) rxDataRow(buf []byte) (row map[string]string, err error) { func (c *conn) rxDataRow(r *messageReader) (row map[string]string, err error) {
r := newMessageReader(buf)
fieldCount := r.readInt16() fieldCount := r.readInt16()
if fieldCount != int16(len(c.rowDesc.fields)) { if fieldCount != int16(len(c.rowDesc.fields)) {
@@ -240,8 +243,7 @@ func (c *conn) rxDataRow(buf []byte) (row map[string]string, err error) {
return return
} }
func (c *conn) rxCommandComplete(buf []byte) string { func (c *conn) rxCommandComplete(r *messageReader) string {
r := newMessageReader(buf)
return r.readString() return r.readString()
} }
+6
View File
@@ -12,6 +12,12 @@ func newMessageReader(buf []byte) *messageReader {
return &r return &r
} }
func (r *messageReader) readByte() byte {
b := (*r)[0]
*r = (*r)[1:]
return b
}
func (r *messageReader) readInt16() int16 { func (r *messageReader) readInt16() int16 {
n := int16(binary.BigEndian.Uint16((*r)[:2])) n := int16(binary.BigEndian.Uint16((*r)[:2]))
*r = (*r)[2:] *r = (*r)[2:]