Merge pull request #10 from furdarius/configurable-chunkreader-buf
Configurable chunkreader buffer size
This commit is contained in:
+11
-2
@@ -4,6 +4,9 @@ go:
|
|||||||
- 1.x
|
- 1.x
|
||||||
- tip
|
- tip
|
||||||
|
|
||||||
|
git:
|
||||||
|
depth: 1
|
||||||
|
|
||||||
# Derived from https://github.com/lib/pq/blob/master/.travis.yml
|
# Derived from https://github.com/lib/pq/blob/master/.travis.yml
|
||||||
before_install:
|
before_install:
|
||||||
- ./travis/before_install.bash
|
- ./travis/before_install.bash
|
||||||
@@ -11,6 +14,8 @@ before_install:
|
|||||||
env:
|
env:
|
||||||
global:
|
global:
|
||||||
- GO111MODULE=on
|
- GO111MODULE=on
|
||||||
|
- GOPROXY=https://proxy.golang.org
|
||||||
|
- GOFLAGS=-mod=readonly
|
||||||
- PGX_TEST_CONN_STRING=postgres://pgx_md5:secret@127.0.0.1/pgx_test
|
- PGX_TEST_CONN_STRING=postgres://pgx_md5:secret@127.0.0.1/pgx_test
|
||||||
- PGX_TEST_UNIX_SOCKET_CONN_STRING="host=/var/run/postgresql database=pgx_test"
|
- PGX_TEST_UNIX_SOCKET_CONN_STRING="host=/var/run/postgresql database=pgx_test"
|
||||||
- PGX_TEST_TCP_CONN_STRING=postgres://pgx_md5:secret@127.0.0.1/pgx_test
|
- PGX_TEST_TCP_CONN_STRING=postgres://pgx_md5:secret@127.0.0.1/pgx_test
|
||||||
@@ -25,11 +30,15 @@ env:
|
|||||||
- PGVERSION=9.4
|
- PGVERSION=9.4
|
||||||
- PGVERSION=9.3
|
- PGVERSION=9.3
|
||||||
|
|
||||||
|
cache:
|
||||||
|
directories:
|
||||||
|
- $HOME/.cache/go-build
|
||||||
|
- $HOME/gopath/pkg/mod
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
- ./travis/before_script.bash
|
- ./travis/before_script.bash
|
||||||
|
|
||||||
install:
|
install: go mod download
|
||||||
- ./travis/install.bash
|
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- ./travis/script.bash
|
- ./travis/script.bash
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
@@ -18,6 +19,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgpassfile"
|
"github.com/jackc/pgpassfile"
|
||||||
|
"github.com/jackc/pgproto3/v2"
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -26,14 +28,15 @@ type ValidateConnectFunc func(ctx context.Context, pgconn *PgConn) error
|
|||||||
|
|
||||||
// Config is the settings used to establish a connection to a PostgreSQL server.
|
// Config is the settings used to establish a connection to a PostgreSQL server.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Host string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp)
|
Host string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp)
|
||||||
Port uint16
|
Port uint16
|
||||||
Database string
|
Database string
|
||||||
User string
|
User string
|
||||||
Password string
|
Password string
|
||||||
TLSConfig *tls.Config // nil disables TLS
|
TLSConfig *tls.Config // nil disables TLS
|
||||||
DialFunc DialFunc // e.g. net.Dialer.DialContext
|
DialFunc DialFunc // e.g. net.Dialer.DialContext
|
||||||
RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name)
|
BuildFrontendFunc BuildFrontendFunc
|
||||||
|
RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name)
|
||||||
|
|
||||||
Fallbacks []*FallbackConfig
|
Fallbacks []*FallbackConfig
|
||||||
|
|
||||||
@@ -473,6 +476,14 @@ func makeDefaultDialer() *net.Dialer {
|
|||||||
return &net.Dialer{KeepAlive: 5 * time.Minute}
|
return &net.Dialer{KeepAlive: 5 * time.Minute}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeDefaultBuildFrontendFunc() BuildFrontendFunc {
|
||||||
|
return func(r io.Reader) Frontend {
|
||||||
|
frontend, _ := pgproto3.NewFrontend(pgproto3.NewChunkReader(r), nil)
|
||||||
|
|
||||||
|
return frontend
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func makeConnectTimeoutDialFunc(s string) (DialFunc, error) {
|
func makeConnectTimeoutDialFunc(s string) (DialFunc, error) {
|
||||||
timeout, err := strconv.ParseInt(s, 10, 64)
|
timeout, err := strconv.ParseInt(s, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -8,8 +8,6 @@ github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
|
|||||||
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
|
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db h1:UpaKn/gYxzH6/zWyRQH1S260zvKqwJJ4h8+Kf09ooh0=
|
|
||||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
|
|
||||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711 h1:vZp4bYotXUkFx7JUSm7U8KV/7Q0AOdrQxxBBj0ZmZsg=
|
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711 h1:vZp4bYotXUkFx7JUSm7U8KV/7Q0AOdrQxxBBj0ZmZsg=
|
||||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
|
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
|
||||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||||
@@ -25,7 +23,5 @@ golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqY
|
|||||||
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg=
|
|
||||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
|
||||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4=
|
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4=
|
||||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
|||||||
@@ -40,9 +40,12 @@ type Notification struct {
|
|||||||
Payload string
|
Payload string
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialFunc is a function that can be used to connect to a PostgreSQL server
|
// DialFunc is a function that can be used to connect to a PostgreSQL server.
|
||||||
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
|
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
|
||||||
|
// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection.
|
||||||
|
type BuildFrontendFunc func(r io.Reader) Frontend
|
||||||
|
|
||||||
// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
|
// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
|
||||||
// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
|
// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
|
||||||
// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
|
// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
|
||||||
@@ -55,6 +58,11 @@ type NoticeHandler func(*PgConn, *Notice)
|
|||||||
// notice event.
|
// notice event.
|
||||||
type NotificationHandler func(*PgConn, *Notification)
|
type NotificationHandler func(*PgConn, *Notification)
|
||||||
|
|
||||||
|
// Frontend used to receive messages from backend.
|
||||||
|
type Frontend interface {
|
||||||
|
Receive() (pgproto3.BackendMessage, error)
|
||||||
|
}
|
||||||
|
|
||||||
// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
|
// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
|
||||||
type PgConn struct {
|
type PgConn struct {
|
||||||
conn net.Conn // the underlying TCP or unix domain socket connection
|
conn net.Conn // the underlying TCP or unix domain socket connection
|
||||||
@@ -62,7 +70,7 @@ type PgConn struct {
|
|||||||
secretKey uint32 // key to use to send a cancel query message to the server
|
secretKey uint32 // key to use to send a cancel query message to the server
|
||||||
parameterStatuses map[string]string // parameters that have been reported by the server
|
parameterStatuses map[string]string // parameters that have been reported by the server
|
||||||
TxStatus byte
|
TxStatus byte
|
||||||
Frontend *pgproto3.Frontend
|
frontend Frontend
|
||||||
|
|
||||||
Config *Config
|
Config *Config
|
||||||
|
|
||||||
@@ -105,6 +113,9 @@ func ConnectConfig(ctx context.Context, config *Config) (pgConn *PgConn, err err
|
|||||||
if config.DialFunc == nil {
|
if config.DialFunc == nil {
|
||||||
config.DialFunc = makeDefaultDialer().DialContext
|
config.DialFunc = makeDefaultDialer().DialContext
|
||||||
}
|
}
|
||||||
|
if config.BuildFrontendFunc == nil {
|
||||||
|
config.BuildFrontendFunc = makeDefaultBuildFrontendFunc()
|
||||||
|
}
|
||||||
if config.RuntimeParams == nil {
|
if config.RuntimeParams == nil {
|
||||||
config.RuntimeParams = make(map[string]string)
|
config.RuntimeParams = make(map[string]string)
|
||||||
}
|
}
|
||||||
@@ -170,10 +181,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
|||||||
func() { pgConn.conn.SetDeadline(time.Time{}) },
|
func() { pgConn.conn.SetDeadline(time.Time{}) },
|
||||||
)
|
)
|
||||||
|
|
||||||
pgConn.Frontend, err = pgproto3.NewFrontend(pgproto3.NewChunkReader(pgConn.conn), pgConn.conn)
|
pgConn.frontend = config.BuildFrontendFunc(pgConn.conn)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
startupMsg := pgproto3.StartupMessage{
|
startupMsg := pgproto3.StartupMessage{
|
||||||
ProtocolVersion: pgproto3.ProtocolVersionNumber,
|
ProtocolVersion: pgproto3.ProtocolVersionNumber,
|
||||||
@@ -292,7 +300,7 @@ func (pgConn *PgConn) signalMessage() chan struct{} {
|
|||||||
|
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.Frontend.Receive()
|
pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive()
|
||||||
pgConn.bufferingReceiveMux.Unlock()
|
pgConn.bufferingReceiveMux.Unlock()
|
||||||
close(ch)
|
close(ch)
|
||||||
}()
|
}()
|
||||||
@@ -312,10 +320,10 @@ func (pgConn *PgConn) ReceiveMessage() (pgproto3.BackendMessage, error) {
|
|||||||
|
|
||||||
// If a timeout error happened in the background try the read again.
|
// If a timeout error happened in the background try the read again.
|
||||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
msg, err = pgConn.Frontend.Receive()
|
msg, err = pgConn.frontend.Receive()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
msg, err = pgConn.Frontend.Receive()
|
msg, err = pgConn.frontend.Receive()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,14 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
set -eux
|
|
||||||
|
|
||||||
go get -u github.com/cockroachdb/apd
|
|
||||||
go get -u github.com/shopspring/decimal
|
|
||||||
go get -u gopkg.in/inconshreveable/log15.v2
|
|
||||||
go get -u github.com/jackc/fake
|
|
||||||
go get -u github.com/lib/pq
|
|
||||||
go get -u github.com/hashicorp/go-version
|
|
||||||
go get -u github.com/satori/go.uuid
|
|
||||||
go get -u github.com/sirupsen/logrus
|
|
||||||
go get -u github.com/pkg/errors
|
|
||||||
go get -u go.uber.org/zap
|
|
||||||
go get -u github.com/rs/zerolog
|
|
||||||
Reference in New Issue
Block a user