From 006a98d1a24ad3d173e63af8dcc9954733927251 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Mon, 8 Apr 2013 20:37:09 -0500 Subject: [PATCH] Pass *messageReader to rx* instead of raw buffer --- conn.go | 62 ++++++++++++++++++++++++----------------------- message_reader.go | 6 +++++ 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/conn.go b/conn.go index 20e373ea..a1ccfe95 100644 --- a/conn.go +++ b/conn.go @@ -101,20 +101,22 @@ func (c *conn) sendSimpleQuery(sql string) (err error) { func (c *conn) processMsg() (msg interface{}, err error) { var t byte - var body []byte - t, body, err = c.rxMsg() + var r *messageReader + t, r, err = c.rxMsg() if err != nil { return } - return c.parseMsg(t, body) + return c.parseMsg(t, r) } -// Processes messages that are not exclusive to one context such as authentication sdakl sdafj sda sda sd ds ds sad sd sa sd sad dsfsd af -func (c *conn) processContextFreeMsg(t byte, body []byte) (err error) { +// Processes messages that are not exclusive to one context such as +// authentication or query response. The response to these messages +// is the same regardless of when they occur. +func (c *conn) processContextFreeMsg(t byte, r *messageReader) (err error) { switch t { case 'S': - c.rxParameterStatus(body) + c.rxParameterStatus(r) return nil default: return fmt.Errorf("Received unknown message type: %c", t) @@ -124,36 +126,41 @@ func (c *conn) processContextFreeMsg(t byte, body []byte) (err error) { } -func (c *conn) parseMsg(t byte, body []byte) (msg interface{}, err error) { +func (c *conn) parseMsg(t byte, r *messageReader) (msg interface{}, err error) { switch t { case 'K': - c.rxBackendKeyData(body) + c.rxBackendKeyData(r) return nil, nil case 'R': - return c.rxAuthenticationX(body) + return c.rxAuthenticationX(r) case 'Z': - return c.rxReadyForQuery(body), nil + return c.rxReadyForQuery(r), nil case 'T': - return c.rxRowDescription(body) + return c.rxRowDescription(r) case 'D': - return c.rxDataRow(body) + return c.rxDataRow(r) case 'C': - return c.rxCommandComplete(body), nil + return c.rxCommandComplete(r), nil default: - return nil, c.processContextFreeMsg(t, body) + return nil, c.processContextFreeMsg(t, r) } panic("Unreachable") } -func (c *conn) rxMsg() (t byte, body []byte, err error) { +func (c *conn) rxMsg() (t byte, r *messageReader, err error) { var bodySize int32 t, bodySize, err = c.rxMsgHeader() if err != nil { return } - body, err = c.rxMsgBody(bodySize) + var body []byte + if body, err = c.rxMsgBody(bodySize); err != nil { + return + } + + r = newMessageReader(body) return } @@ -174,8 +181,8 @@ func (c *conn) rxMsgBody(bodySize int32) (buf []byte, err error) { return } -func (c *conn) rxAuthenticationX(buf []byte) (msg interface{}, err error) { - code := binary.BigEndian.Uint32(buf[:4]) +func (c *conn) rxAuthenticationX(r *messageReader) (msg interface{}, err error) { + code := r.readInt32() switch code { case 0: return &authenticationOk{}, nil @@ -186,27 +193,24 @@ func (c *conn) rxAuthenticationX(buf []byte) (msg interface{}, err error) { panic("Unreachable") } -func (c *conn) rxParameterStatus(buf []byte) { - r := newMessageReader(buf) +func (c *conn) rxParameterStatus(r *messageReader) { key := r.readString() value := r.readString() c.runtimeParams[key] = value } -func (c *conn) rxBackendKeyData(buf []byte) { - r := newMessageReader(buf) +func (c *conn) rxBackendKeyData(r *messageReader) { c.pid = r.readInt32() c.secretKey = r.readInt32() } -func (c *conn) rxReadyForQuery(buf []byte) (msg *readyForQuery) { +func (c *conn) rxReadyForQuery(r *messageReader) (msg *readyForQuery) { msg = new(readyForQuery) - msg.txStatus = buf[0] + msg.txStatus = r.readByte() return } -func (c *conn) rxRowDescription(buf []byte) (msg *rowDescription, err error) { - r := newMessageReader(buf) +func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err error) { fieldCount := r.readInt16() c.rowDesc.fields = make([]fieldDescription, fieldCount) for i := int16(0); i < fieldCount; i++ { @@ -222,8 +226,7 @@ func (c *conn) rxRowDescription(buf []byte) (msg *rowDescription, err error) { return } -func (c *conn) rxDataRow(buf []byte) (row map[string]string, err error) { - r := newMessageReader(buf) +func (c *conn) rxDataRow(r *messageReader) (row map[string]string, err error) { fieldCount := r.readInt16() if fieldCount != int16(len(c.rowDesc.fields)) { @@ -240,8 +243,7 @@ func (c *conn) rxDataRow(buf []byte) (row map[string]string, err error) { return } -func (c *conn) rxCommandComplete(buf []byte) string { - r := newMessageReader(buf) +func (c *conn) rxCommandComplete(r *messageReader) string { return r.readString() } diff --git a/message_reader.go b/message_reader.go index 581e4536..f0712fa5 100644 --- a/message_reader.go +++ b/message_reader.go @@ -12,6 +12,12 @@ func newMessageReader(buf []byte) *messageReader { return &r } +func (r *messageReader) readByte() byte { + b := (*r)[0] + *r = (*r)[1:] + return b +} + func (r *messageReader) readInt16() int16 { n := int16(binary.BigEndian.Uint16((*r)[:2])) *r = (*r)[2:]