2
0

Reduce allocations and copies in pgproto3

Altered chunkreader to never reuse memory.

Altered pgproto3 to to copy memory when decoding. Renamed UnmarshalBinary to
Decode because of changed semantics.
This commit is contained in:
Jack Christensen
2017-04-29 11:55:14 -05:00
parent e1eda90e29
commit 61026b7c21
2 changed files with 10 additions and 59 deletions
+4 -21
View File
@@ -9,14 +9,12 @@ type ChunkReader struct {
buf []byte buf []byte
rp, wp int // buf read position and write position rp, wp int // buf read position and write position
taken bool
options Options options Options
} }
type Options struct { type Options struct {
MinBufLen int // Minimum buffer length MinBufLen int // Minimum buffer length
BlockLen int // Increments to expand buffer (e.g. a 8000 byte request with a BlockLen of 1024 would yield a buffer len of 8192)
} }
func NewChunkReader(r io.Reader) *ChunkReader { func NewChunkReader(r io.Reader) *ChunkReader {
@@ -32,9 +30,6 @@ func NewChunkReaderEx(r io.Reader, options Options) (*ChunkReader, error) {
if options.MinBufLen == 0 { if options.MinBufLen == 0 {
options.MinBufLen = 4096 options.MinBufLen = 4096
} }
if options.BlockLen == 0 {
options.BlockLen = 512
}
return &ChunkReader{ return &ChunkReader{
r: r, r: r,
@@ -43,8 +38,8 @@ func NewChunkReaderEx(r io.Reader, options Options) (*ChunkReader, error) {
}, nil }, nil
} }
// Next returns buf filled with the next n bytes. buf is only valid until the // Next returns buf filled with the next n bytes. If an error occurs, buf will
// next call to Next. If an error occurs, buf will be nil. // be nil.
func (r *ChunkReader) Next(n int) (buf []byte, err error) { func (r *ChunkReader) Next(n int) (buf []byte, err error) {
// n bytes already in buf // n bytes already in buf
if (r.wp - r.rp) >= n { if (r.wp - r.rp) >= n {
@@ -56,17 +51,12 @@ func (r *ChunkReader) Next(n int) (buf []byte, err error) {
// available space in buf is less than n // available space in buf is less than n
if len(r.buf) < n { if len(r.buf) < n {
r.copyBufContents(r.newBuf(n)) r.copyBufContents(r.newBuf(n))
r.taken = false
} }
// buf is large enough, but need to shift filled area to start to make enough contiguous space // buf is large enough, but need to shift filled area to start to make enough contiguous space
minReadCount := n - (r.wp - r.rp) minReadCount := n - (r.wp - r.rp)
if (len(r.buf) - r.wp) < minReadCount { if (len(r.buf) - r.wp) < minReadCount {
newBuf := r.buf newBuf := r.newBuf(n)
if r.taken {
newBuf = r.newBuf(n)
r.taken = false
}
r.copyBufContents(newBuf) r.copyBufContents(newBuf)
} }
@@ -79,20 +69,13 @@ func (r *ChunkReader) Next(n int) (buf []byte, err error) {
return buf, nil return buf, nil
} }
// KeepLast prevents the last data retrieved by Next from being reused by the
// ChunkReader.
func (r *ChunkReader) KeepLast() {
r.taken = true
}
func (r *ChunkReader) appendAtLeast(fillLen int) error { func (r *ChunkReader) appendAtLeast(fillLen int) error {
n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen) n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen)
r.wp += n r.wp += n
return err return err
} }
func (r *ChunkReader) newBuf(min int) []byte { func (r *ChunkReader) newBuf(size int) []byte {
size := ((min / r.options.BlockLen) + 1) * r.options.BlockLen
if size < r.options.MinBufLen { if size < r.options.MinBufLen {
size = r.options.MinBufLen size = r.options.MinBufLen
} }
+6 -38
View File
@@ -7,7 +7,7 @@ import (
func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
server := &bytes.Buffer{} server := &bytes.Buffer{}
r, err := NewChunkReaderEx(server, Options{MinBufLen: 4, BlockLen: 2}) r, err := NewChunkReaderEx(server, Options{MinBufLen: 4})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -44,7 +44,7 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) { func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) {
server := &bytes.Buffer{} server := &bytes.Buffer{}
r, err := NewChunkReaderEx(server, Options{MinBufLen: 4, BlockLen: 2}) r, err := NewChunkReaderEx(server, Options{MinBufLen: 4})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -59,14 +59,14 @@ func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) {
if bytes.Compare(n1, src[0:5]) != 0 { if bytes.Compare(n1, src[0:5]) != 0 {
t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:5], n1) t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:5], n1)
} }
if len(r.buf) != 6 { if len(r.buf) != 5 {
t.Fatalf("Expected len(r.buf) to be %v, but it was %v", 6, len(r.buf)) t.Fatalf("Expected len(r.buf) to be %v, but it was %v", 5, len(r.buf))
} }
} }
func TestChunkReaderNextReusesBuf(t *testing.T) { func TestChunkReaderDoesNotReuseBuf(t *testing.T) {
server := &bytes.Buffer{} server := &bytes.Buffer{}
r, err := NewChunkReaderEx(server, Options{MinBufLen: 4, BlockLen: 1}) r, err := NewChunkReaderEx(server, Options{MinBufLen: 4})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -90,38 +90,6 @@ func TestChunkReaderNextReusesBuf(t *testing.T) {
t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2) t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2)
} }
if bytes.Compare(n1, src[4:8]) != 0 {
t.Fatalf("Expected Next to have reused buf, %v found instead of %v", src[4:8], n1)
}
}
func TestChunkReaderKeepLastPreventsBufReuse(t *testing.T) {
server := &bytes.Buffer{}
r, err := NewChunkReaderEx(server, Options{MinBufLen: 4, BlockLen: 1})
if err != nil {
t.Fatal(err)
}
src := []byte{1, 2, 3, 4, 5, 6, 7, 8}
server.Write(src)
n1, err := r.Next(4)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(n1, src[0:4]) != 0 {
t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:4], n1)
}
r.KeepLast()
n2, err := r.Next(4)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(n2, src[4:8]) != 0 {
t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2)
}
if bytes.Compare(n1, src[0:4]) != 0 { if bytes.Compare(n1, src[0:4]) != 0 {
t.Fatalf("Expected KeepLast to prevent Next from overwriting buf, expected %v but it was %v", src[0:4], n1) t.Fatalf("Expected KeepLast to prevent Next from overwriting buf, expected %v but it was %v", src[0:4], n1)
} }