Extract iobufpool
This commit is contained in:
@@ -0,0 +1,46 @@
|
||||
// Package iobufpool implements a global segregated-fit pool of buffers for IO.
|
||||
package iobufpool
|
||||
|
||||
import "sync"
|
||||
|
||||
const minPoolExpOf2 = 8
|
||||
|
||||
var pools [18]*sync.Pool
|
||||
|
||||
func init() {
|
||||
for i := range pools {
|
||||
bufLen := 1 << (minPoolExpOf2 + i)
|
||||
pools[i] = &sync.Pool{New: func() any { return make([]byte, bufLen) }}
|
||||
}
|
||||
}
|
||||
|
||||
// Get gets a []byte with len >= size and len <= size*2.
|
||||
func Get(size int) []byte {
|
||||
i := poolIdx(size)
|
||||
if i >= len(pools) {
|
||||
return make([]byte, size)
|
||||
}
|
||||
return pools[i].Get().([]byte)
|
||||
}
|
||||
|
||||
// Put returns buf to the pool.
|
||||
func Put(buf []byte) {
|
||||
i := poolIdx(len(buf))
|
||||
if i >= len(pools) {
|
||||
return
|
||||
}
|
||||
|
||||
pools[i].Put(buf)
|
||||
}
|
||||
|
||||
func poolIdx(size int) int {
|
||||
size--
|
||||
size >>= minPoolExpOf2
|
||||
i := 0
|
||||
for size > 0 {
|
||||
size >>= 1
|
||||
i++
|
||||
}
|
||||
|
||||
return i
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package iobufpool
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPoolIdx(t *testing.T) {
|
||||
tests := []struct {
|
||||
size int
|
||||
expected int
|
||||
}{
|
||||
{size: 0, expected: 0},
|
||||
{size: 1, expected: 0},
|
||||
{size: 255, expected: 0},
|
||||
{size: 256, expected: 0},
|
||||
{size: 257, expected: 1},
|
||||
{size: 511, expected: 1},
|
||||
{size: 512, expected: 1},
|
||||
{size: 513, expected: 2},
|
||||
{size: 1023, expected: 2},
|
||||
{size: 1024, expected: 2},
|
||||
{size: 1025, expected: 3},
|
||||
{size: 2047, expected: 3},
|
||||
{size: 2048, expected: 3},
|
||||
{size: 2049, expected: 4},
|
||||
{size: 8388607, expected: 15},
|
||||
{size: 8388608, expected: 15},
|
||||
{size: 8388609, expected: 16},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
idx := poolIdx(tt.size)
|
||||
assert.Equalf(t, tt.expected, idx, "size: %d", tt.size)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package iobufpool_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v5/internal/iobufpool"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
tests := []struct {
|
||||
requestedLen int
|
||||
expectedLen int
|
||||
}{
|
||||
{requestedLen: 0, expectedLen: 256},
|
||||
{requestedLen: 128, expectedLen: 256},
|
||||
{requestedLen: 255, expectedLen: 256},
|
||||
{requestedLen: 256, expectedLen: 256},
|
||||
{requestedLen: 257, expectedLen: 512},
|
||||
{requestedLen: 511, expectedLen: 512},
|
||||
{requestedLen: 512, expectedLen: 512},
|
||||
{requestedLen: 513, expectedLen: 1024},
|
||||
{requestedLen: 1023, expectedLen: 1024},
|
||||
{requestedLen: 1024, expectedLen: 1024},
|
||||
{requestedLen: 33554431, expectedLen: 33554432},
|
||||
{requestedLen: 33554432, expectedLen: 33554432},
|
||||
|
||||
// Above 32 MiB skip the pool and allocate exactly the requested size.
|
||||
{requestedLen: 33554433, expectedLen: 33554433},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
buf := iobufpool.Get(tt.requestedLen)
|
||||
assert.Equalf(t, tt.expectedLen, len(buf), "requestedLen: %d", tt.requestedLen)
|
||||
}
|
||||
}
|
||||
+16
-55
@@ -2,48 +2,10 @@ package pgproto3
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/jackc/pgx/v5/internal/iobufpool"
|
||||
)
|
||||
|
||||
type bigBufPool struct {
|
||||
pool sync.Pool
|
||||
byteSize int
|
||||
}
|
||||
|
||||
var bigBufPools []*bigBufPool
|
||||
|
||||
func init() {
|
||||
KiB := 1024
|
||||
bigBufSizes := []int{64 * KiB, 256 * KiB, 1024 * KiB, 4096 * KiB}
|
||||
bigBufPools = make([]*bigBufPool, len(bigBufSizes))
|
||||
|
||||
for i := range bigBufPools {
|
||||
byteSize := bigBufSizes[i]
|
||||
bigBufPools[i] = &bigBufPool{
|
||||
pool: sync.Pool{New: func() any { return make([]byte, byteSize) }},
|
||||
byteSize: byteSize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getBigBuf(size int) []byte {
|
||||
for _, bigBufPool := range bigBufPools {
|
||||
if size < bigBufPool.byteSize {
|
||||
return bigBufPool.pool.Get().([]byte)
|
||||
}
|
||||
}
|
||||
return make([]byte, size)
|
||||
}
|
||||
|
||||
func releaseBigBuf(buf []byte) {
|
||||
for _, bigBufPool := range bigBufPools {
|
||||
if len(buf) == bigBufPool.byteSize {
|
||||
bigBufPool.pool.Put(buf)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and
|
||||
// will read as much as will fit in the current buffer in a single call regardless of how large a read is actually
|
||||
// requested. The memory returned via Next is only valid until the next call to Next.
|
||||
@@ -55,28 +17,26 @@ type chunkReader struct {
|
||||
buf []byte
|
||||
rp, wp int // buf read position and write position
|
||||
|
||||
ownBuf []byte // buf owned by chunkReader
|
||||
minBufSize int
|
||||
}
|
||||
|
||||
// newChunkReader creates and returns a new chunkReader for r with default configuration with bufSize internal buffer.
|
||||
// If bufSize is <= 0 it uses a default value.
|
||||
func newChunkReader(r io.Reader, bufSize int) *chunkReader {
|
||||
if bufSize <= 0 {
|
||||
// newChunkReader creates and returns a new chunkReader for r with default configuration. If minBufSize is <= 0 it uses
|
||||
// a default value.
|
||||
func newChunkReader(r io.Reader, minBufSize int) *chunkReader {
|
||||
if minBufSize <= 0 {
|
||||
// By historical reasons Postgres currently has 8KB send buffer inside,
|
||||
// so here we want to have at least the same size buffer.
|
||||
// @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134
|
||||
// @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru
|
||||
//
|
||||
// In addition, testing has found no benefit of any larger buffer.
|
||||
bufSize = 8192
|
||||
minBufSize = 8192
|
||||
}
|
||||
|
||||
buf := make([]byte, bufSize)
|
||||
|
||||
return &chunkReader{
|
||||
r: r,
|
||||
buf: buf,
|
||||
ownBuf: buf,
|
||||
r: r,
|
||||
minBufSize: minBufSize,
|
||||
buf: iobufpool.Get(minBufSize),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,9 +45,9 @@ func newChunkReader(r io.Reader, bufSize int) *chunkReader {
|
||||
func (r *chunkReader) Next(n int) (buf []byte, err error) {
|
||||
// Reset the buffer if it is empty
|
||||
if r.rp == r.wp {
|
||||
if len(r.buf) != len(r.ownBuf) {
|
||||
releaseBigBuf(r.buf)
|
||||
r.buf = r.ownBuf
|
||||
if len(r.buf) != r.minBufSize {
|
||||
iobufpool.Put(r.buf)
|
||||
r.buf = iobufpool.Get(r.minBufSize)
|
||||
}
|
||||
r.rp = 0
|
||||
r.wp = 0
|
||||
@@ -102,9 +62,10 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) {
|
||||
|
||||
// buf is smaller than requested number of bytes
|
||||
if len(r.buf) < n {
|
||||
bigBuf := getBigBuf(n)
|
||||
bigBuf := iobufpool.Get(n)
|
||||
r.wp = copy(bigBuf, r.buf[r.rp:r.wp])
|
||||
r.rp = 0
|
||||
iobufpool.Put(r.buf)
|
||||
r.buf = bigBuf
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
|
||||
t.Fatalf("Expected read bytes to be %v, but they were %v", src[2:4], n2)
|
||||
}
|
||||
|
||||
if bytes.Compare(r.buf, src) != 0 {
|
||||
if bytes.Compare(r.buf[:len(src)], src) != 0 {
|
||||
t.Fatalf("Expected r.buf to be %v, but it was %v", src, r.buf)
|
||||
}
|
||||
|
||||
@@ -46,53 +46,6 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkReaderNextGetsBiggerBufAsNeededFromBigBufPools(t *testing.T) {
|
||||
server := &bytes.Buffer{}
|
||||
r := newChunkReader(server, 4)
|
||||
|
||||
src := []byte{1, 2, 3, 4, 5, 6, 7, 8}
|
||||
server.Write(src)
|
||||
|
||||
n1, err := r.Next(5)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(n1, src[0:5]) != 0 {
|
||||
t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:5], n1)
|
||||
}
|
||||
if len(r.buf) != bigBufPools[0].byteSize {
|
||||
t.Fatalf("Expected len(r.buf) to be %v, but it was %v", bigBufPools[0].byteSize, len(r.buf))
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkReaderReusesBuf(t *testing.T) {
|
||||
server := &bytes.Buffer{}
|
||||
r := newChunkReader(server, 4)
|
||||
|
||||
src := []byte{1, 2, 3, 4, 5, 6, 7, 8}
|
||||
server.Write(src)
|
||||
|
||||
n1, err := r.Next(4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(n1, src[0:4]) != 0 {
|
||||
t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:4], n1)
|
||||
}
|
||||
|
||||
n2, err := r.Next(4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(n2, src[4:8]) != 0 {
|
||||
t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2)
|
||||
}
|
||||
|
||||
if bytes.Compare(n1, src[4:8]) != 0 {
|
||||
t.Fatalf("Expected slice to be reused, expected %v but it was %v", src[4:8], n1)
|
||||
}
|
||||
}
|
||||
|
||||
type randomReader struct {
|
||||
rnd *rand.Rand
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user