From e12ba1b6b90590447e2cc597c2d62be7877c3b72 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 28 May 2022 10:59:54 -0500 Subject: [PATCH] Extract iobufpool --- internal/iobufpool/iobufpool.go | 46 ++++++++++++ internal/iobufpool/iobufpool_internal_test.go | 36 ++++++++++ internal/iobufpool/iobufpool_test.go | 35 +++++++++ pgproto3/chunkreader.go | 71 +++++-------------- pgproto3/chunkreader_test.go | 49 +------------ 5 files changed, 134 insertions(+), 103 deletions(-) create mode 100644 internal/iobufpool/iobufpool.go create mode 100644 internal/iobufpool/iobufpool_internal_test.go create mode 100644 internal/iobufpool/iobufpool_test.go diff --git a/internal/iobufpool/iobufpool.go b/internal/iobufpool/iobufpool.go new file mode 100644 index 00000000..52c52f45 --- /dev/null +++ b/internal/iobufpool/iobufpool.go @@ -0,0 +1,46 @@ +// Package iobufpool implements a global segregated-fit pool of buffers for IO. +package iobufpool + +import "sync" + +const minPoolExpOf2 = 8 + +var pools [18]*sync.Pool + +func init() { + for i := range pools { + bufLen := 1 << (minPoolExpOf2 + i) + pools[i] = &sync.Pool{New: func() any { return make([]byte, bufLen) }} + } +} + +// Get gets a []byte with len >= size and len <= size*2. +func Get(size int) []byte { + i := poolIdx(size) + if i >= len(pools) { + return make([]byte, size) + } + return pools[i].Get().([]byte) +} + +// Put returns buf to the pool. +func Put(buf []byte) { + i := poolIdx(len(buf)) + if i >= len(pools) { + return + } + + pools[i].Put(buf) +} + +func poolIdx(size int) int { + size-- + size >>= minPoolExpOf2 + i := 0 + for size > 0 { + size >>= 1 + i++ + } + + return i +} diff --git a/internal/iobufpool/iobufpool_internal_test.go b/internal/iobufpool/iobufpool_internal_test.go new file mode 100644 index 00000000..38b499f9 --- /dev/null +++ b/internal/iobufpool/iobufpool_internal_test.go @@ -0,0 +1,36 @@ +package iobufpool + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPoolIdx(t *testing.T) { + tests := []struct { + size int + expected int + }{ + {size: 0, expected: 0}, + {size: 1, expected: 0}, + {size: 255, expected: 0}, + {size: 256, expected: 0}, + {size: 257, expected: 1}, + {size: 511, expected: 1}, + {size: 512, expected: 1}, + {size: 513, expected: 2}, + {size: 1023, expected: 2}, + {size: 1024, expected: 2}, + {size: 1025, expected: 3}, + {size: 2047, expected: 3}, + {size: 2048, expected: 3}, + {size: 2049, expected: 4}, + {size: 8388607, expected: 15}, + {size: 8388608, expected: 15}, + {size: 8388609, expected: 16}, + } + for _, tt := range tests { + idx := poolIdx(tt.size) + assert.Equalf(t, tt.expected, idx, "size: %d", tt.size) + } +} diff --git a/internal/iobufpool/iobufpool_test.go b/internal/iobufpool/iobufpool_test.go new file mode 100644 index 00000000..9ad7417d --- /dev/null +++ b/internal/iobufpool/iobufpool_test.go @@ -0,0 +1,35 @@ +package iobufpool_test + +import ( + "testing" + + "github.com/jackc/pgx/v5/internal/iobufpool" + "github.com/stretchr/testify/assert" +) + +func TestGet(t *testing.T) { + tests := []struct { + requestedLen int + expectedLen int + }{ + {requestedLen: 0, expectedLen: 256}, + {requestedLen: 128, expectedLen: 256}, + {requestedLen: 255, expectedLen: 256}, + {requestedLen: 256, expectedLen: 256}, + {requestedLen: 257, expectedLen: 512}, + {requestedLen: 511, expectedLen: 512}, + {requestedLen: 512, expectedLen: 512}, + {requestedLen: 513, expectedLen: 1024}, + {requestedLen: 1023, expectedLen: 1024}, + {requestedLen: 1024, expectedLen: 1024}, + {requestedLen: 33554431, expectedLen: 33554432}, + {requestedLen: 33554432, expectedLen: 33554432}, + + // Above 32 MiB skip the pool and allocate exactly the requested size. + {requestedLen: 33554433, expectedLen: 33554433}, + } + for _, tt := range tests { + buf := iobufpool.Get(tt.requestedLen) + assert.Equalf(t, tt.expectedLen, len(buf), "requestedLen: %d", tt.requestedLen) + } +} diff --git a/pgproto3/chunkreader.go b/pgproto3/chunkreader.go index 8834f521..3c35d0b1 100644 --- a/pgproto3/chunkreader.go +++ b/pgproto3/chunkreader.go @@ -2,48 +2,10 @@ package pgproto3 import ( "io" - "sync" + + "github.com/jackc/pgx/v5/internal/iobufpool" ) -type bigBufPool struct { - pool sync.Pool - byteSize int -} - -var bigBufPools []*bigBufPool - -func init() { - KiB := 1024 - bigBufSizes := []int{64 * KiB, 256 * KiB, 1024 * KiB, 4096 * KiB} - bigBufPools = make([]*bigBufPool, len(bigBufSizes)) - - for i := range bigBufPools { - byteSize := bigBufSizes[i] - bigBufPools[i] = &bigBufPool{ - pool: sync.Pool{New: func() any { return make([]byte, byteSize) }}, - byteSize: byteSize, - } - } -} - -func getBigBuf(size int) []byte { - for _, bigBufPool := range bigBufPools { - if size < bigBufPool.byteSize { - return bigBufPool.pool.Get().([]byte) - } - } - return make([]byte, size) -} - -func releaseBigBuf(buf []byte) { - for _, bigBufPool := range bigBufPools { - if len(buf) == bigBufPool.byteSize { - bigBufPool.pool.Put(buf) - return - } - } -} - // chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and // will read as much as will fit in the current buffer in a single call regardless of how large a read is actually // requested. The memory returned via Next is only valid until the next call to Next. @@ -55,28 +17,26 @@ type chunkReader struct { buf []byte rp, wp int // buf read position and write position - ownBuf []byte // buf owned by chunkReader + minBufSize int } -// newChunkReader creates and returns a new chunkReader for r with default configuration with bufSize internal buffer. -// If bufSize is <= 0 it uses a default value. -func newChunkReader(r io.Reader, bufSize int) *chunkReader { - if bufSize <= 0 { +// newChunkReader creates and returns a new chunkReader for r with default configuration. If minBufSize is <= 0 it uses +// a default value. +func newChunkReader(r io.Reader, minBufSize int) *chunkReader { + if minBufSize <= 0 { // By historical reasons Postgres currently has 8KB send buffer inside, // so here we want to have at least the same size buffer. // @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134 // @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru // // In addition, testing has found no benefit of any larger buffer. - bufSize = 8192 + minBufSize = 8192 } - buf := make([]byte, bufSize) - return &chunkReader{ - r: r, - buf: buf, - ownBuf: buf, + r: r, + minBufSize: minBufSize, + buf: iobufpool.Get(minBufSize), } } @@ -85,9 +45,9 @@ func newChunkReader(r io.Reader, bufSize int) *chunkReader { func (r *chunkReader) Next(n int) (buf []byte, err error) { // Reset the buffer if it is empty if r.rp == r.wp { - if len(r.buf) != len(r.ownBuf) { - releaseBigBuf(r.buf) - r.buf = r.ownBuf + if len(r.buf) != r.minBufSize { + iobufpool.Put(r.buf) + r.buf = iobufpool.Get(r.minBufSize) } r.rp = 0 r.wp = 0 @@ -102,9 +62,10 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) { // buf is smaller than requested number of bytes if len(r.buf) < n { - bigBuf := getBigBuf(n) + bigBuf := iobufpool.Get(n) r.wp = copy(bigBuf, r.buf[r.rp:r.wp]) r.rp = 0 + iobufpool.Put(r.buf) r.buf = bigBuf } diff --git a/pgproto3/chunkreader_test.go b/pgproto3/chunkreader_test.go index 7d7bac7f..41c8ce65 100644 --- a/pgproto3/chunkreader_test.go +++ b/pgproto3/chunkreader_test.go @@ -29,7 +29,7 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { t.Fatalf("Expected read bytes to be %v, but they were %v", src[2:4], n2) } - if bytes.Compare(r.buf, src) != 0 { + if bytes.Compare(r.buf[:len(src)], src) != 0 { t.Fatalf("Expected r.buf to be %v, but it was %v", src, r.buf) } @@ -46,53 +46,6 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { } } -func TestChunkReaderNextGetsBiggerBufAsNeededFromBigBufPools(t *testing.T) { - server := &bytes.Buffer{} - r := newChunkReader(server, 4) - - src := []byte{1, 2, 3, 4, 5, 6, 7, 8} - server.Write(src) - - n1, err := r.Next(5) - if err != nil { - t.Fatal(err) - } - if bytes.Compare(n1, src[0:5]) != 0 { - t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:5], n1) - } - if len(r.buf) != bigBufPools[0].byteSize { - t.Fatalf("Expected len(r.buf) to be %v, but it was %v", bigBufPools[0].byteSize, len(r.buf)) - } -} - -func TestChunkReaderReusesBuf(t *testing.T) { - server := &bytes.Buffer{} - r := newChunkReader(server, 4) - - src := []byte{1, 2, 3, 4, 5, 6, 7, 8} - server.Write(src) - - n1, err := r.Next(4) - if err != nil { - t.Fatal(err) - } - if bytes.Compare(n1, src[0:4]) != 0 { - t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:4], n1) - } - - n2, err := r.Next(4) - if err != nil { - t.Fatal(err) - } - if bytes.Compare(n2, src[4:8]) != 0 { - t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2) - } - - if bytes.Compare(n1, src[4:8]) != 0 { - t.Fatalf("Expected slice to be reused, expected %v but it was %v", src[4:8], n1) - } -} - type randomReader struct { rnd *rand.Rand }