diff --git a/lib/adapters/http.js b/lib/adapters/http.js index 58ebfa4a..cf3d164f 100755 --- a/lib/adapters/http.js +++ b/lib/adapters/http.js @@ -48,6 +48,11 @@ const { http: httpFollow, https: httpsFollow } = followRedirects; const isHttps = /https:?/; +// Symbols used to bind a single 'error' listener to a pooled socket and track +// the request currently owning that socket across keep-alive reuse (issue #10780). +const kAxiosSocketListener = Symbol('axios.http.socketListener'); +const kAxiosCurrentReq = Symbol('axios.http.currentReq'); + const supportedProtocols = platform.protocols.map((protocol) => { return protocol + ':'; }); @@ -943,20 +948,28 @@ export default isHttpAdapterSupported && // default interval of sending ack packet is 1 minute socket.setKeepAlive(true, 1000 * 60); - const removeSocketErrorListener = () => { - socket.removeListener('error', handleRequestSocketError); - }; - - function handleRequestSocketError(err) { - removeSocketErrorListener(); - - if (!req.destroyed) { - req.destroy(err); - } + // Install a single 'error' listener per socket (not per request) to avoid + // accumulating listeners on pooled keep-alive sockets that get reassigned + // to new requests before the previous request's 'close' fires (issue #10780). + // The listener is bound to the socket's currently-active request via a + // symbol, which is swapped as the socket is reassigned. + if (!socket[kAxiosSocketListener]) { + socket.on('error', function handleSocketError(err) { + const current = socket[kAxiosCurrentReq]; + if (current && !current.destroyed) { + current.destroy(err); + } + }); + socket[kAxiosSocketListener] = true; } - socket.on('error', handleRequestSocketError); - req.once('close', removeSocketErrorListener); + socket[kAxiosCurrentReq] = req; + + req.once('close', function clearCurrentReq() { + if (socket[kAxiosCurrentReq] === req) { + socket[kAxiosCurrentReq] = null; + } + }); }); // Handle request timeout diff --git a/tests/unit/adapters/http.test.js b/tests/unit/adapters/http.test.js index f34ce6f2..69069147 100644 --- a/tests/unit/adapters/http.test.js +++ b/tests/unit/adapters/http.test.js @@ -4079,6 +4079,120 @@ describe('supports http with nodejs', () => { }); describe('keep-alive', () => { + it('should not emit MaxListenersExceededWarning under concurrent requests through a pooled keep-alive agent (regression #10780)', async () => { + const server = await startHTTPServer( + (req, res) => { + // Small delay forces concurrent requests to queue on the single pooled socket. + setTimeout(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('ok'); + }, 5); + }, + { port: SERVER_PORT } + ); + + const warnings = []; + const warningHandler = (warning) => { + if (warning && warning.name === 'MaxListenersExceededWarning') { + warnings.push(warning); + } + }; + process.on('warning', warningHandler); + + const agent = new http.Agent({ keepAlive: true, maxSockets: 1 }); + + try { + const baseURL = `http://localhost:${server.address().port}`; + const CONCURRENCY = 30; + + const results = await Promise.all( + Array.from({ length: CONCURRENCY }, (_, i) => + axios.get(`/req-${i}`, { baseURL, httpAgent: agent }) + ) + ); + + assert.strictEqual(results.length, CONCURRENCY); + for (const r of results) { + assert.strictEqual(r.status, 200); + assert.strictEqual(r.data, 'ok'); + } + + // Allow any deferred process 'warning' emissions to flush. + await setTimeoutAsync(50); + + assert.strictEqual( + warnings.length, + 0, + `expected no MaxListenersExceededWarning, got ${warnings.length}: ${warnings.map((w) => w.message).join('; ')}` + ); + + // Inspect live sockets on the agent: none should have more than one + // axios-installed error listener, regardless of how many requests ran. + const allSockets = [] + .concat(...Object.values(agent.sockets || {})) + .concat(...Object.values(agent.freeSockets || {})); + for (const sock of allSockets) { + assert.ok( + sock.listenerCount('error') <= 2, + `socket should have at most a couple of error listeners (agent + axios), got ${sock.listenerCount('error')}` + ); + } + } finally { + process.removeListener('warning', warningHandler); + agent.destroy(); + await stopHTTPServer(server); + } + }, 30000); + + it('should not leak memory via retained request closures under a long burst of keep-alive requests (regression #10780)', async () => { + // This guards against stage88's report of OOM at ~480k sequential requests: + // if the per-request closure leaked, heap would grow linearly. We simulate + // a shorter burst and verify retained closures are released (via WeakRef + // reachability check after GC, if exposed). + if (typeof global.gc !== 'function') { + // Skip when GC is not exposed (run with `node --expose-gc`). + return; + } + + const server = await startHTTPServer( + (req, res) => { + res.writeHead(200); + res.end('ok'); + }, + { port: SERVER_PORT } + ); + + const agent = new http.Agent({ keepAlive: true, maxSockets: 4 }); + + try { + const baseURL = `http://localhost:${server.address().port}`; + + const refs = []; + for (let i = 0; i < 200; i += 1) { + // eslint-disable-next-line no-await-in-loop + const response = await axios.get('/', { baseURL, httpAgent: agent }); + refs.push(new WeakRef(response.request)); + } + + // Drop strong refs and force GC. + global.gc(); + await setTimeoutAsync(10); + global.gc(); + + const retained = refs.filter((r) => r.deref() !== undefined).length; + // Some trailing requests may still be referenced in internal buffers. + // The fix's correctness: retained count scales with agent socket count, + // NOT with request count. A pre-fix leak would keep >>socket count. + assert.ok( + retained <= 20, + `expected most request objects to be collectible after GC; ${retained}/200 retained suggests a closure leak` + ); + } finally { + agent.destroy(); + await stopHTTPServer(server); + } + }, 30000); + it('should not fail with "socket hang up" when using timeouts', async () => { const server = await startHTTPServer( async (req, res) => { @@ -4100,7 +4214,7 @@ describe('supports http with nodejs', () => { } }, 15000); - it('should remove request socket error listeners after keep-alive requests close', async () => { + it('should install at most one socket error listener across reused keep-alive sockets', async () => { const noop = () => {}; const socket = new EventEmitter(); socket.setKeepAlive = noop; @@ -4146,19 +4260,210 @@ describe('supports http with nodejs', () => { }, }; + // First request: axios installs its single per-socket listener. await axios.get('http://example.com/first', { transport, maxRedirects: 0, }); await setTimeoutAsync(0); - assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount); + assert.strictEqual( + socket.listenerCount('error'), + baseErrorListenerCount + 1, + 'axios should install exactly one socket error listener' + ); - await axios.get('http://example.com/second', { - transport, - maxRedirects: 0, - }); + // Many subsequent requests reusing the same socket must not add more listeners. + for (let i = 0; i < 20; i += 1) { + // eslint-disable-next-line no-await-in-loop + await axios.get(`http://example.com/next-${i}`, { + transport, + maxRedirects: 0, + }); + // eslint-disable-next-line no-await-in-loop + await setTimeoutAsync(0); + assert.strictEqual( + socket.listenerCount('error'), + baseErrorListenerCount + 1, + 'listener count must stay constant across keep-alive reuse' + ); + } + }); + + it('should not accumulate socket error listeners when a pooled socket is reassigned before the previous request closes (regression #10780)', async () => { + const noop = () => {}; + const socket = new EventEmitter(); + socket.setKeepAlive = noop; + socket.on('error', noop); + + const baseErrorListenerCount = socket.listenerCount('error'); + + // Each request defers its 'close' emission so that the socket is + // reassigned to the next request before the previous one closes. + // This reproduces the race condition described in #10780. + const pendingRequests = []; + + const transport = { + request(_, cb) { + const req = new (class MockRequest extends EventEmitter { + constructor() { + super(); + this.destroyed = false; + } + + setTimeout() {} + write() {} + + end() { + // Share the single pooled socket across every request. + this.emit('socket', socket); + + setImmediate(() => { + const response = stream.Readable.from(['ok']); + response.statusCode = 200; + response.headers = {}; + cb(response); + // Intentionally do NOT emit 'close' yet. Collect the req + // so close can be emitted later, after other reqs have + // already claimed the socket. + pendingRequests.push(this); + }); + } + + destroy(err) { + if (this.destroyed) return; + this.destroyed = true; + err && this.emit('error', err); + this.emit('close'); + } + })(); + + return req; + }, + }; + + const results = await Promise.all( + Array.from({ length: 20 }, (_, i) => + axios.get(`http://example.com/concurrent-${i}`, { + transport, + maxRedirects: 0, + }) + ) + ); + + assert.strictEqual(results.length, 20); + + // Critical assertion: despite 20 concurrent requests all claiming the + // same pooled socket before any emitted 'close', only ONE axios listener + // must be attached. This is the difference between the pre-fix + // behaviour (20 listeners, MaxListenersExceededWarning) and the fix. + assert.strictEqual( + socket.listenerCount('error'), + baseErrorListenerCount + 1, + `expected a single axios socket error listener under concurrent reuse, got ${socket.listenerCount('error') - baseErrorListenerCount}` + ); + + // Now drain the queued close events. Listener count must still be 1. + for (const req of pendingRequests) { + req.emit('close'); + } await setTimeoutAsync(0); - assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount); + + assert.strictEqual( + socket.listenerCount('error'), + baseErrorListenerCount + 1, + 'listener must persist on the socket after requests close (cleanup is per-request ownership, not per-listener removal)' + ); + }); + + it('should route a socket error to the currently-active request after the socket has been reassigned', async () => { + const noop = () => {}; + const socket = new EventEmitter(); + socket.setKeepAlive = noop; + socket.on('error', noop); + + const createdReqs = []; + + // First transport: completes cleanly (emits response then close). + const cleanTransport = { + request(_, cb) { + const emitter = new (class MockRequest extends EventEmitter { + constructor() { + super(); + this.destroyed = false; + createdReqs.push(this); + } + setTimeout() {} + write() {} + end() { + this.emit('socket', socket); + setImmediate(() => { + const response = stream.Readable.from(['ok']); + response.statusCode = 200; + response.headers = {}; + cb(response); + this.emit('close'); + }); + } + destroy(err) { + if (this.destroyed) return; + this.destroyed = true; + err && this.emit('error', err); + this.emit('close'); + } + })(); + return emitter; + }, + }; + + // Second transport: emits socket error instead of a response. + const errorTransport = { + request() { + const emitter = new (class MockRequest extends EventEmitter { + constructor() { + super(); + this.destroyed = false; + createdReqs.push(this); + } + setTimeout() {} + write() {} + end() { + this.emit('socket', socket); + setImmediate(() => { + socket.emit('error', Object.assign(new Error('boom'), { code: 'EPIPE' })); + }); + } + destroy(err) { + if (this.destroyed) return; + this.destroyed = true; + err && this.emit('error', err); + this.emit('close'); + } + })(); + return emitter; + }, + }; + + // First request completes successfully; socket is released. + await axios.get('http://example.com/first', { transport: cleanTransport, maxRedirects: 0 }); + await setTimeoutAsync(0); + + const firstReq = createdReqs[0]; + assert.ok(firstReq && firstReq.destroyed === false, 'first request must not have been destroyed by a socket error'); + + // Stray socket error after first req has closed: must not destroy firstReq. + socket.emit('error', new Error('stray error after close')); + assert.strictEqual(firstReq.destroyed, false, 'socket error after close must not destroy the old request'); + + // Second request claims the socket, then its socket errors. It should reject. + const err = await axios + .get('http://example.com/second', { transport: errorTransport, maxRedirects: 0 }) + .catch((e) => e); + + assert.ok(err instanceof AxiosError, 'second request should reject with an AxiosError'); + assert.strictEqual(err.code, 'EPIPE'); + + const secondReq = createdReqs[1]; + assert.strictEqual(secondReq.destroyed, true, 'second request should be destroyed by its own active socket error'); }); });