Fix Windows non-blocking I/O for CopyFrom
Created based on discussion here: https://github.com/jackc/pgx/pull/1525#pullrequestreview-1344511991 Fixes https://github.com/jackc/pgx/issues/1552
This commit is contained in:
committed by
Jack Christensen
parent
9ae852eb58
commit
8b5e8d9d89
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/jackc/pgx/v5/internal/nbconn"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/internal/pgio"
|
"github.com/jackc/pgx/v5/internal/pgio"
|
||||||
@@ -134,6 +135,17 @@ func (ct *copyFrom) run(ctx context.Context) (int64, error) {
|
|||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
doneChan := make(chan struct{})
|
doneChan := make(chan struct{})
|
||||||
|
|
||||||
|
if realNbConn, ok := ct.conn.pgConn.Conn().(*nbconn.NetConn); ok {
|
||||||
|
if err := realNbConn.SetBlockingMode(false); err != nil {
|
||||||
|
return 0, fmt.Errorf("cannot set socket non-blocking mode: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// TODO: Deal with it
|
||||||
|
_ = realNbConn.SetBlockingMode(true)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(doneChan)
|
defer close(doneChan)
|
||||||
|
|
||||||
|
|||||||
@@ -96,6 +96,9 @@ type NetConn struct {
|
|||||||
|
|
||||||
writeDeadlineLock sync.Mutex
|
writeDeadlineLock sync.Mutex
|
||||||
writeDeadline time.Time
|
writeDeadline time.Time
|
||||||
|
|
||||||
|
// Indicates that underlying socket connection mode set to be non-blocking
|
||||||
|
isNonBlocking bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNetConn(conn net.Conn, fakeNonBlockingIO bool) *NetConn {
|
func NewNetConn(conn net.Conn, fakeNonBlockingIO bool) *NetConn {
|
||||||
|
|||||||
@@ -79,3 +79,8 @@ func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
|
|||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *NetConn) SetBlockingMode(blocking bool) error {
|
||||||
|
// for UNIX do nothing
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -43,10 +43,12 @@ func setSockMode(fd uintptr, mode sockMode) error {
|
|||||||
func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
|
func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
|
||||||
if c.nonblockWriteFunc == nil {
|
if c.nonblockWriteFunc == nil {
|
||||||
c.nonblockWriteFunc = func(fd uintptr) (done bool) {
|
c.nonblockWriteFunc = func(fd uintptr) (done bool) {
|
||||||
// Make sock non-blocking
|
if !c.isNonBlocking {
|
||||||
if err := setSockMode(fd, sockModeNonBlocking); err != nil {
|
// Make sock non-blocking
|
||||||
c.nonblockWriteErr = err
|
if err := setSockMode(fd, sockModeNonBlocking); err != nil {
|
||||||
return true
|
c.nonblockWriteErr = err
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var written uint32
|
var written uint32
|
||||||
@@ -56,10 +58,12 @@ func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
|
|||||||
c.nonblockWriteErr = syscall.WSASend(syscall.Handle(fd), &buf, 1, &written, 0, nil, nil)
|
c.nonblockWriteErr = syscall.WSASend(syscall.Handle(fd), &buf, 1, &written, 0, nil, nil)
|
||||||
c.nonblockWriteN = int(written)
|
c.nonblockWriteN = int(written)
|
||||||
|
|
||||||
// Make sock blocking again
|
if !c.isNonBlocking {
|
||||||
if err := setSockMode(fd, sockModeBlocking); err != nil {
|
// Make sock blocking again
|
||||||
c.nonblockWriteErr = err
|
if err := setSockMode(fd, sockModeBlocking); err != nil {
|
||||||
return true
|
c.nonblockWriteErr = err
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
@@ -94,10 +98,12 @@ func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
|
|||||||
func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
|
func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
|
||||||
if c.nonblockReadFunc == nil {
|
if c.nonblockReadFunc == nil {
|
||||||
c.nonblockReadFunc = func(fd uintptr) (done bool) {
|
c.nonblockReadFunc = func(fd uintptr) (done bool) {
|
||||||
// Make sock non-blocking
|
if !c.isNonBlocking {
|
||||||
if err := setSockMode(fd, sockModeNonBlocking); err != nil {
|
// Make sock non-blocking
|
||||||
c.nonblockReadErr = err
|
if err := setSockMode(fd, sockModeNonBlocking); err != nil {
|
||||||
return true
|
c.nonblockReadErr = err
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var read uint32
|
var read uint32
|
||||||
@@ -108,10 +114,12 @@ func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
|
|||||||
c.nonblockReadErr = syscall.WSARecv(syscall.Handle(fd), &buf, 1, &read, &flags, nil, nil)
|
c.nonblockReadErr = syscall.WSARecv(syscall.Handle(fd), &buf, 1, &read, &flags, nil, nil)
|
||||||
c.nonblockReadN = int(read)
|
c.nonblockReadN = int(read)
|
||||||
|
|
||||||
// Make sock blocking again
|
if !c.isNonBlocking {
|
||||||
if err := setSockMode(fd, sockModeBlocking); err != nil {
|
// Make sock blocking again
|
||||||
c.nonblockReadErr = err
|
if err := setSockMode(fd, sockModeBlocking); err != nil {
|
||||||
return true
|
c.nonblockReadErr = err
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
@@ -147,3 +155,22 @@ func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
|
|||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *NetConn) SetBlockingMode(blocking bool) error {
|
||||||
|
mode := sockModeNonBlocking
|
||||||
|
if blocking {
|
||||||
|
mode = sockModeBlocking
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
c.rawConn.Control(func(fd uintptr) {
|
||||||
|
err = setSockMode(fd, mode)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
c.isNonBlocking = !blocking
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user