From aff1eee892985ea0d0aa3713e93d06167c0e8668 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 6 Apr 2013 14:52:08 -0400 Subject: [PATCH] Basic query functionality --- conn.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ conn_test.go | 19 ++++++++++++ message_reader.go | 45 +++++++++++++++++++++++++++ messages.go | 20 ++++++++++++ 4 files changed, 161 insertions(+) create mode 100644 message_reader.go diff --git a/conn.go b/conn.go index 0a20f866..04b26e35 100644 --- a/conn.go +++ b/conn.go @@ -12,6 +12,7 @@ import ( type conn struct { conn net.Conn // the underlying TCP or unix domain socket connection + rowDesc rowDescription // current query rowDescription buf []byte // work buffer to avoid constant alloc and dealloc } @@ -61,6 +62,35 @@ func (c *conn) Close() (err error) { return } +func (c *conn) Query(sql string) (rows []map[string]string, err error) { + bufSize := 5 + len(sql) + 1 // message identifier (1), message size (4), null string terminator (1) + buf := c.getBuf(bufSize) + buf[0] = 'Q' + binary.BigEndian.PutUint32(buf[1:5], uint32(bufSize-1)) + copy(buf[5:], sql) + buf[bufSize-1] = 0 + + _, err = c.conn.Write(buf) + if err != nil { + return nil, err + } + + var response interface{} + for { + response, err = c.rxMsg() + if err != nil { + fmt.Println(err) + return nil, err + } + fmt.Println(response) + if _, ok := response.(*readyForQuery); ok { + break + } + } + + return nil, err +} + func (c *conn) rxMsg() (msg interface{}, err error) { var t byte var bodySize int32 @@ -83,6 +113,12 @@ func (c *conn) rxMsg() (msg interface{}, err error) { return c.rxParameterStatus(buf) case 'Z': return c.rxReadyForQuery(buf), nil + case 'T': + return c.rxRowDescription(buf) + case 'D': + return c.rxDataRow(buf) + case 'C': + return c.rxCommandComplete(buf), nil default: return nil, fmt.Errorf("Received unknown message type: %c", t) } @@ -145,6 +181,47 @@ func (c *conn) rxReadyForQuery(buf []byte) (msg *readyForQuery) { return } +func (c *conn) rxRowDescription(buf []byte) (msg *rowDescription, err error) { + r := newMessageReader(buf) + fieldCount := r.readInt16() + c.rowDesc.fields = make([]fieldDescription, fieldCount) + for i := int16(0); i < fieldCount; i++ { + f := &c.rowDesc.fields[i] + f.name = r.readString() + f.table = r.readOid() + f.attributeNumber = r.readInt16() + f.dataType = r.readOid() + f.dataTypeSize = r.readInt16() + f.modifier = r.readInt32() + f.formatCode = r.readInt16() + } + return +} + +func (c *conn) rxDataRow(buf []byte) (row map[string]string, err error) { + r := newMessageReader(buf) + fieldCount := r.readInt16() + + if fieldCount != int16(len(c.rowDesc.fields)) { + return nil, fmt.Errorf("Received DataRow with %d fields, expected %d fields", fieldCount, c.rowDesc.fields) + } + + row = make(map[string]string, fieldCount) + for i := int16(0); i < fieldCount; i++ { + // TODO - handle nulls + size := r.readInt32() + fmt.Println(size) + row[c.rowDesc.fields[i].name] = r.readByteString(size) + } + return +} + + +func (c *conn) rxCommandComplete(buf []byte) string { + r := newMessageReader(buf) + return r.readString() +} + func (c *conn) txStartupMessage(msg *startupMessage) (err error) { _, err = c.conn.Write(msg.Bytes()) return diff --git a/conn_test.go b/conn_test.go index 75122ee6..3329dab3 100644 --- a/conn_test.go +++ b/conn_test.go @@ -15,3 +15,22 @@ func TestConnect(t *testing.T) { t.Fatal("Unable to close connection") } } + + +func TestQuery(t *testing.T) { + conn, err := Connect(map[string]string{"socket": "/private/tmp/.s.PGSQL.5432"}) + if err != nil { + t.Fatal("Unable to establish connection") + } + + // var rows []map[string]string + _, err = conn.Query("SELECT * FROM people") + if err != nil { + t.Fatal("Query failed") + } + + err = conn.Close() + if err != nil { + t.Fatal("Unable to close connection") + } +} \ No newline at end of file diff --git a/message_reader.go b/message_reader.go new file mode 100644 index 00000000..f1b07fb0 --- /dev/null +++ b/message_reader.go @@ -0,0 +1,45 @@ +package pqx + +import ( + "encoding/binary" + "bytes" +) + +type messageReader []byte + +func newMessageReader(buf []byte) *messageReader { + r := messageReader(buf) + return &r +} + +func (r *messageReader) readInt16() int16 { + n := int16(binary.BigEndian.Uint16((*r)[:2])) + *r = (*r)[2:] + return n +} + +func (r *messageReader) readInt32() int32 { + n := int32(binary.BigEndian.Uint32((*r)[:4])) + *r = (*r)[4:] + return n +} + +func (r *messageReader) readOid() oid { + n := oid(binary.BigEndian.Uint32((*r)[:4])) + *r = (*r)[4:] + return n +} + +func (r *messageReader) readString() string { + n := bytes.IndexByte(*r, 0) + s := (*r)[:n] + *r = (*r)[n+1:] + return string(s) +} + +// Read count bytes and return as string +func (r *messageReader) readByteString(count int32) string { + s := (*r)[:count] + *r = (*r)[count:] + return string(s) +} diff --git a/messages.go b/messages.go index 5d24c22c..370ad3ca 100644 --- a/messages.go +++ b/messages.go @@ -63,3 +63,23 @@ type readyForQuery struct { func (self *readyForQuery) String() string { return fmt.Sprintf("ReadyForQuery txStatus: %c", self.txStatus) } + +type oid int32 + +type fieldDescription struct { + name string + table oid + attributeNumber int16 + dataType oid + dataTypeSize int16 + modifier int32 + formatCode int16 +} + +type rowDescription struct { + fields []fieldDescription +} + +func (self *rowDescription) String() string { + return fmt.Sprintf("RowDescription field count: %d", len(self.fields)) +}