Remove nbconn
The non-blocking IO system was designed to solve three problems: 1. Deadlock that can occur when both sides of a connection are blocked writing because all buffers between are full. 2. The inability to use a write deadline with a TLS.Conn without killing the connection. 3. Efficiently check if a connection has been closed before writing. This reduces the cases where the application doesn't know if a query that does a INSERT/UPDATE/DELETE was actually sent to the server or not. However, the nbconn package is extraordinarily complex, has been a source of very tricky bugs, and has OS specific code paths. It also does not work at all with underlying net.Conn implementations that do not have platform specific non-blocking IO syscall support and do not properly implement deadlines. In particular, this is the case with golang.org/x/crypto/ssh. I believe the deadlock problem can be solved with a combination of a goroutine for CopyFrom like v4 used and a watchdog for regular queries that uses time.AfterFunc. The write deadline problem actually should be ignorable. We check for context cancellation before sending a query and the actual Write should be almost instant as long as the underlying connection is not blocked. (We should only have to wait until it is accepted by the OS, not until it is fully sent.) Efficiently checking if a connection has been closed is probably the hardest to solve without non-blocking reads. However, the existing code only solves part of the problem. It can detect a closed or broken connection the OS knows about, but it won't actually detect other types of broken connections such as a network interruption. This is currently implemented in CheckConn and called automatically when checking a connection out of the pool that has been idle for over one second. I think that changing CheckConn to a very short deadline read and changing the pool to do an actual Ping would be an acceptable solution. Remove nbconn and non-blocking code. This does not leave the system in an entirely working state. In particular, CopyFrom is broken, deadlocks can occur for extremely large queries or batches, and PgConn.CheckConn is now a `select 1` ping. These will be resolved in subsequent commits.
This commit is contained in:
committed by
Jack Christensen
parent
9cfdd21f1c
commit
4410fc0a65
+27
-34
@@ -16,7 +16,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/internal/iobufpool"
|
||||
"github.com/jackc/pgx/v5/internal/nbconn"
|
||||
"github.com/jackc/pgx/v5/internal/pgio"
|
||||
"github.com/jackc/pgx/v5/pgconn/internal/ctxwatch"
|
||||
"github.com/jackc/pgx/v5/pgproto3"
|
||||
@@ -65,7 +64,7 @@ type NotificationHandler func(*PgConn, *Notification)
|
||||
|
||||
// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage.
|
||||
type PgConn struct {
|
||||
conn nbconn.Conn // the non-blocking wrapper for the underlying TCP or unix domain socket connection
|
||||
conn net.Conn
|
||||
pid uint32 // backend pid
|
||||
secretKey uint32 // key to use to send a cancel query message to the server
|
||||
parameterStatuses map[string]string // parameters that have been reported by the server
|
||||
@@ -266,14 +265,13 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||
if err != nil {
|
||||
return nil, &connectError{config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)}
|
||||
}
|
||||
nbNetConn := nbconn.NewNetConn(netConn, false)
|
||||
|
||||
pgConn.conn = nbNetConn
|
||||
pgConn.contextWatcher = newContextWatcher(nbNetConn)
|
||||
pgConn.conn = netConn
|
||||
pgConn.contextWatcher = newContextWatcher(netConn)
|
||||
pgConn.contextWatcher.Watch(ctx)
|
||||
|
||||
if fallbackConfig.TLSConfig != nil {
|
||||
nbTLSConn, err := startTLS(nbNetConn, fallbackConfig.TLSConfig)
|
||||
nbTLSConn, err := startTLS(netConn, fallbackConfig.TLSConfig)
|
||||
pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
|
||||
if err != nil {
|
||||
netConn.Close()
|
||||
@@ -392,7 +390,7 @@ func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher {
|
||||
)
|
||||
}
|
||||
|
||||
func startTLS(conn *nbconn.NetConn, tlsConfig *tls.Config) (*nbconn.TLSConn, error) {
|
||||
func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) {
|
||||
err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -407,12 +405,7 @@ func startTLS(conn *nbconn.NetConn, tlsConfig *tls.Config) (*nbconn.TLSConn, err
|
||||
return nil, errors.New("server refused TLS connection")
|
||||
}
|
||||
|
||||
tlsConn, err := nbconn.TLSClient(conn, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tlsConn, nil
|
||||
return tls.Client(conn, tlsConfig), nil
|
||||
}
|
||||
|
||||
func (pgConn *PgConn) txPasswordMessage(password string) (err error) {
|
||||
@@ -468,10 +461,6 @@ func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) {
|
||||
msg, err := pgConn.frontend.Receive()
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, nbconn.ErrWouldBlock) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Close on anything other than timeout error - everything else is fatal
|
||||
var netErr net.Error
|
||||
isNetErr := errors.As(err, &netErr)
|
||||
@@ -1174,11 +1163,11 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
|
||||
return CommandTag{}, err
|
||||
}
|
||||
|
||||
err = pgConn.conn.SetReadDeadline(nbconn.NonBlockingDeadline)
|
||||
if err != nil {
|
||||
pgConn.asyncClose()
|
||||
return CommandTag{}, err
|
||||
}
|
||||
// err = pgConn.conn.SetReadDeadline(nbconn.NonBlockingDeadline)
|
||||
// if err != nil {
|
||||
// pgConn.asyncClose()
|
||||
// return CommandTag{}, err
|
||||
// }
|
||||
nonblocking := true
|
||||
defer func() {
|
||||
if nonblocking {
|
||||
@@ -1217,9 +1206,9 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
|
||||
for pgErr == nil {
|
||||
msg, err := pgConn.receiveMessage()
|
||||
if err != nil {
|
||||
if errors.Is(err, nbconn.ErrWouldBlock) {
|
||||
break
|
||||
}
|
||||
// if errors.Is(err, nbconn.ErrWouldBlock) {
|
||||
// break
|
||||
// }
|
||||
pgConn.asyncClose()
|
||||
return CommandTag{}, normalizeTimeoutError(ctx, err)
|
||||
}
|
||||
@@ -1638,15 +1627,19 @@ func (pgConn *PgConn) EscapeString(s string) (string, error) {
|
||||
// connection. If this is done immediately before sending a query it reduces the chances a query will be sent that fails
|
||||
// without the client knowing whether the server received it or not.
|
||||
func (pgConn *PgConn) CheckConn() error {
|
||||
if err := pgConn.lock(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer pgConn.unlock()
|
||||
// if err := pgConn.lock(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// defer pgConn.unlock()
|
||||
|
||||
err := pgConn.conn.BufferReadUntilBlock()
|
||||
if err != nil && !errors.Is(err, nbconn.ErrWouldBlock) {
|
||||
return err
|
||||
}
|
||||
rr := pgConn.ExecParams(context.Background(), "select 1", nil, nil, nil, nil)
|
||||
_, err := rr.Close()
|
||||
return err
|
||||
|
||||
// err := pgConn.conn.BufferReadUntilBlock()
|
||||
// if err != nil && !errors.Is(err, nbconn.ErrWouldBlock) {
|
||||
// return err
|
||||
// }
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1660,7 +1653,7 @@ func (pgConn *PgConn) makeCommandTag(buf []byte) CommandTag {
|
||||
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
|
||||
// compatibility.
|
||||
type HijackedConn struct {
|
||||
Conn nbconn.Conn // the non-blocking wrapper of the underlying TCP or unix domain socket connection
|
||||
Conn net.Conn
|
||||
PID uint32 // backend pid
|
||||
SecretKey uint32 // key to use to send a cancel query message to the server
|
||||
ParameterStatuses map[string]string // parameters that have been reported by the server
|
||||
|
||||
Reference in New Issue
Block a user