diff --git a/lib/adapters/http.js b/lib/adapters/http.js index b9981159..943bc429 100755 --- a/lib/adapters/http.js +++ b/lib/adapters/http.js @@ -875,6 +875,21 @@ export default isHttpAdapterSupported && req.on('socket', function handleRequestSocket(socket) { // default interval of sending ack packet is 1 minute socket.setKeepAlive(true, 1000 * 60); + + const removeSocketErrorListener = () => { + socket.removeListener('error', handleRequestSocketError); + }; + + function handleRequestSocketError(err) { + removeSocketErrorListener(); + + if (!req.destroyed) { + req.destroy(err); + } + } + + socket.on('error', handleRequestSocketError); + req.once('close', removeSocketErrorListener); }); // Handle request timeout diff --git a/tests/unit/adapters/http.test.js b/tests/unit/adapters/http.test.js index c35c3ec4..96e4ea7e 100644 --- a/tests/unit/adapters/http.test.js +++ b/tests/unit/adapters/http.test.js @@ -29,6 +29,7 @@ import getStream from 'get-stream'; import bodyParser from 'body-parser'; import { AbortController } from 'abortcontroller-polyfill/dist/cjs-ponyfill.js'; import { lookup } from 'dns'; +import { EventEmitter } from 'events'; const OPEN_WEB_PORT = 80; const SERVER_PORT = 8020; @@ -3817,6 +3818,60 @@ describe('supports http with nodejs', () => { } }); + it('should reject when only the request socket emits an error', async () => { + const noop = () => {}; + const socket = new EventEmitter(); + socket.setKeepAlive = noop; + socket.on('error', noop); + + const transport = { + request() { + return new (class MockRequest extends EventEmitter { + constructor() { + super(); + this.destroyed = false; + } + + setTimeout() {} + + write() {} + + end() { + this.emit('socket', socket); + + setImmediate(() => { + socket.emit('error', Object.assign(new Error('write EPIPE'), { code: 'EPIPE' })); + }); + } + + destroy(err) { + if (this.destroyed) { + return; + } + + this.destroyed = true; + err && this.emit('error', err); + this.emit('close'); + } + })(); + }, + }; + + const error = await Promise.race([ + axios.post('http://example.com/', 'test', { + transport, + maxRedirects: 0, + }), + setTimeoutAsync(200).then(() => { + throw new Error('socket error did not reject the request'); + }), + ]).catch((err) => err); + + assert.ok(error instanceof AxiosError); + assert.strictEqual(error.code, 'EPIPE'); + assert.strictEqual(error.message, 'write EPIPE'); + }); + describe('keep-alive', () => { it('should not fail with "socket hang up" when using timeouts', async () => { const server = await startHTTPServer( @@ -3838,5 +3893,66 @@ describe('supports http with nodejs', () => { await stopHTTPServer(server); } }, 15000); + + it('should remove request socket error listeners after keep-alive requests close', async () => { + const noop = () => {}; + const socket = new EventEmitter(); + socket.setKeepAlive = noop; + socket.on('error', noop); + + const baseErrorListenerCount = socket.listenerCount('error'); + + const transport = { + request(_, cb) { + return new (class MockRequest extends EventEmitter { + constructor() { + super(); + this.destroyed = false; + } + + setTimeout() {} + + write() {} + + end() { + this.emit('socket', socket); + + setImmediate(() => { + const response = stream.Readable.from(['ok']); + response.statusCode = 200; + response.headers = {}; + + cb(response); + this.emit('close'); + }); + } + + destroy(err) { + if (this.destroyed) { + return; + } + + this.destroyed = true; + err && this.emit('error', err); + this.emit('close'); + } + })(); + }, + }; + + await axios.get('http://example.com/first', { + transport, + maxRedirects: 0, + }); + await setTimeoutAsync(0); + assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount); + + await axios.get('http://example.com/second', { + transport, + maxRedirects: 0, + }); + await setTimeoutAsync(0); + assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount); + }); }); });