diff --git a/CHANGELOG.md b/CHANGELOG.md index a6668fb0..8b988590 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Unreleased + +* Add PgConn.CleanupChan so connection pools can determine when async close is complete + # 1.6.4 (July 29, 2020) * Fix deadlock on error after CommandComplete but before ReadyForQuery diff --git a/helper_test.go b/helper_test.go index 1a3ca75e..abb04905 100644 --- a/helper_test.go +++ b/helper_test.go @@ -15,6 +15,11 @@ func closeConn(t testing.TB, conn *pgconn.PgConn) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() require.NoError(t, conn.Close(ctx)) + select { + case <-conn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } // Do a simple query to ensure the connection is still usable diff --git a/pgconn.go b/pgconn.go index 50607095..c132b26b 100644 --- a/pgconn.go +++ b/pgconn.go @@ -89,6 +89,8 @@ type PgConn struct { resultReader ResultReader multiResultReader MultiResultReader contextWatcher *ctxwatch.ContextWatcher + + cleanupChan chan struct{} } // Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) @@ -201,6 +203,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig pgConn := new(PgConn) pgConn.config = config pgConn.wbuf = make([]byte, 0, wbufLen) + pgConn.cleanupChan = make(chan struct{}) var err error network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port) @@ -504,6 +507,7 @@ func (pgConn *PgConn) Close(ctx context.Context) error { } pgConn.status = connStatusClosed + defer close(pgConn.cleanupChan) defer pgConn.conn.Close() if ctx != context.Background() { @@ -538,6 +542,7 @@ func (pgConn *PgConn) asyncClose() { pgConn.status = connStatusClosed go func() { + defer close(pgConn.cleanupChan) defer pgConn.conn.Close() deadline := time.Now().Add(time.Second * 15) @@ -554,7 +559,21 @@ func (pgConn *PgConn) asyncClose() { }() } +// CleanupChan returns a channel that will be closed after all underlying resources have been cleaned up. A closed +// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing +// yet. This is because certain errors such as a context cancellation require that the interrupted function call return +// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are +// closed asynchronously. +// +// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while +// an old connection is still being cleaned up and thereby exceeding the maximum pool size. +func (pgConn *PgConn) CleanupChan() chan (struct{}) { + return pgConn.cleanupChan +} + // IsClosed reports if the connection has been closed. +// +// CleanupChan() can be used to determine if all cleanup has been completed. func (pgConn *PgConn) IsClosed() bool { return pgConn.status < connStatusIdle } @@ -1585,7 +1604,8 @@ func Construct(hc *HijackedConn) (*PgConn, error) { status: connStatusIdle, - wbuf: make([]byte, 0, wbufLen), + wbuf: make([]byte, 0, wbufLen), + cleanupChan: make(chan struct{}), } pgConn.contextWatcher = ctxwatch.NewContextWatcher( diff --git a/pgconn_test.go b/pgconn_test.go index 379aa266..56afc1c2 100644 --- a/pgconn_test.go +++ b/pgconn_test.go @@ -547,6 +547,11 @@ func TestConnExecContextCanceled(t *testing.T) { err = multiResult.Close() assert.True(t, pgconn.Timeout(err)) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } func TestConnExecContextPrecanceled(t *testing.T) { @@ -680,6 +685,11 @@ func TestConnExecParamsCanceled(t *testing.T) { assert.True(t, pgconn.Timeout(err)) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } func TestConnExecParamsPrecanceled(t *testing.T) { @@ -824,6 +834,11 @@ func TestConnExecPreparedCanceled(t *testing.T) { assert.Equal(t, pgconn.CommandTag(nil), commandTag) assert.True(t, pgconn.Timeout(err)) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } func TestConnExecPreparedPrecanceled(t *testing.T) { @@ -1306,6 +1321,11 @@ func TestConnCopyToCanceled(t *testing.T) { assert.Equal(t, pgconn.CommandTag(nil), res) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } func TestConnCopyToPrecanceled(t *testing.T) { @@ -1397,6 +1417,11 @@ func TestConnCopyFromCanceled(t *testing.T) { assert.Error(t, err) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } func TestConnCopyFromPrecanceled(t *testing.T) { @@ -1647,6 +1672,11 @@ func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) { err = multiResult.Close() assert.True(t, pgconn.Timeout(err)) assert.True(t, pgConn.IsClosed()) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } otherConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING")) require.NoError(t, err) @@ -1750,6 +1780,11 @@ func TestConnCloseWhileCancellableQueryInProgress(t *testing.T) { closeCtx, _ := context.WithCancel(context.Background()) pgConn.Close(closeCtx) + select { + case <-pgConn.CleanupChan(): + case <-time.After(5 * time.Second): + t.Fatal("Connection cleanup exceeded maximum time") + } } // https://github.com/jackc/pgx/issues/800