mirror of
https://github.com/tenrok/axios.git
synced 2026-06-17 19:21:29 +03:00
fix: added fix for memory leak in sockets (#10788)
This commit is contained in:
+25
-12
@@ -48,6 +48,11 @@ const { http: httpFollow, https: httpsFollow } = followRedirects;
|
||||
|
||||
const isHttps = /https:?/;
|
||||
|
||||
// Symbols used to bind a single 'error' listener to a pooled socket and track
|
||||
// the request currently owning that socket across keep-alive reuse (issue #10780).
|
||||
const kAxiosSocketListener = Symbol('axios.http.socketListener');
|
||||
const kAxiosCurrentReq = Symbol('axios.http.currentReq');
|
||||
|
||||
const supportedProtocols = platform.protocols.map((protocol) => {
|
||||
return protocol + ':';
|
||||
});
|
||||
@@ -943,20 +948,28 @@ export default isHttpAdapterSupported &&
|
||||
// default interval of sending ack packet is 1 minute
|
||||
socket.setKeepAlive(true, 1000 * 60);
|
||||
|
||||
const removeSocketErrorListener = () => {
|
||||
socket.removeListener('error', handleRequestSocketError);
|
||||
};
|
||||
|
||||
function handleRequestSocketError(err) {
|
||||
removeSocketErrorListener();
|
||||
|
||||
if (!req.destroyed) {
|
||||
req.destroy(err);
|
||||
}
|
||||
// Install a single 'error' listener per socket (not per request) to avoid
|
||||
// accumulating listeners on pooled keep-alive sockets that get reassigned
|
||||
// to new requests before the previous request's 'close' fires (issue #10780).
|
||||
// The listener is bound to the socket's currently-active request via a
|
||||
// symbol, which is swapped as the socket is reassigned.
|
||||
if (!socket[kAxiosSocketListener]) {
|
||||
socket.on('error', function handleSocketError(err) {
|
||||
const current = socket[kAxiosCurrentReq];
|
||||
if (current && !current.destroyed) {
|
||||
current.destroy(err);
|
||||
}
|
||||
});
|
||||
socket[kAxiosSocketListener] = true;
|
||||
}
|
||||
|
||||
socket.on('error', handleRequestSocketError);
|
||||
req.once('close', removeSocketErrorListener);
|
||||
socket[kAxiosCurrentReq] = req;
|
||||
|
||||
req.once('close', function clearCurrentReq() {
|
||||
if (socket[kAxiosCurrentReq] === req) {
|
||||
socket[kAxiosCurrentReq] = null;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Handle request timeout
|
||||
|
||||
@@ -4079,6 +4079,120 @@ describe('supports http with nodejs', () => {
|
||||
});
|
||||
|
||||
describe('keep-alive', () => {
|
||||
it('should not emit MaxListenersExceededWarning under concurrent requests through a pooled keep-alive agent (regression #10780)', async () => {
|
||||
const server = await startHTTPServer(
|
||||
(req, res) => {
|
||||
// Small delay forces concurrent requests to queue on the single pooled socket.
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||
res.end('ok');
|
||||
}, 5);
|
||||
},
|
||||
{ port: SERVER_PORT }
|
||||
);
|
||||
|
||||
const warnings = [];
|
||||
const warningHandler = (warning) => {
|
||||
if (warning && warning.name === 'MaxListenersExceededWarning') {
|
||||
warnings.push(warning);
|
||||
}
|
||||
};
|
||||
process.on('warning', warningHandler);
|
||||
|
||||
const agent = new http.Agent({ keepAlive: true, maxSockets: 1 });
|
||||
|
||||
try {
|
||||
const baseURL = `http://localhost:${server.address().port}`;
|
||||
const CONCURRENCY = 30;
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from({ length: CONCURRENCY }, (_, i) =>
|
||||
axios.get(`/req-${i}`, { baseURL, httpAgent: agent })
|
||||
)
|
||||
);
|
||||
|
||||
assert.strictEqual(results.length, CONCURRENCY);
|
||||
for (const r of results) {
|
||||
assert.strictEqual(r.status, 200);
|
||||
assert.strictEqual(r.data, 'ok');
|
||||
}
|
||||
|
||||
// Allow any deferred process 'warning' emissions to flush.
|
||||
await setTimeoutAsync(50);
|
||||
|
||||
assert.strictEqual(
|
||||
warnings.length,
|
||||
0,
|
||||
`expected no MaxListenersExceededWarning, got ${warnings.length}: ${warnings.map((w) => w.message).join('; ')}`
|
||||
);
|
||||
|
||||
// Inspect live sockets on the agent: none should have more than one
|
||||
// axios-installed error listener, regardless of how many requests ran.
|
||||
const allSockets = []
|
||||
.concat(...Object.values(agent.sockets || {}))
|
||||
.concat(...Object.values(agent.freeSockets || {}));
|
||||
for (const sock of allSockets) {
|
||||
assert.ok(
|
||||
sock.listenerCount('error') <= 2,
|
||||
`socket should have at most a couple of error listeners (agent + axios), got ${sock.listenerCount('error')}`
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
process.removeListener('warning', warningHandler);
|
||||
agent.destroy();
|
||||
await stopHTTPServer(server);
|
||||
}
|
||||
}, 30000);
|
||||
|
||||
it('should not leak memory via retained request closures under a long burst of keep-alive requests (regression #10780)', async () => {
|
||||
// This guards against stage88's report of OOM at ~480k sequential requests:
|
||||
// if the per-request closure leaked, heap would grow linearly. We simulate
|
||||
// a shorter burst and verify retained closures are released (via WeakRef
|
||||
// reachability check after GC, if exposed).
|
||||
if (typeof global.gc !== 'function') {
|
||||
// Skip when GC is not exposed (run with `node --expose-gc`).
|
||||
return;
|
||||
}
|
||||
|
||||
const server = await startHTTPServer(
|
||||
(req, res) => {
|
||||
res.writeHead(200);
|
||||
res.end('ok');
|
||||
},
|
||||
{ port: SERVER_PORT }
|
||||
);
|
||||
|
||||
const agent = new http.Agent({ keepAlive: true, maxSockets: 4 });
|
||||
|
||||
try {
|
||||
const baseURL = `http://localhost:${server.address().port}`;
|
||||
|
||||
const refs = [];
|
||||
for (let i = 0; i < 200; i += 1) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const response = await axios.get('/', { baseURL, httpAgent: agent });
|
||||
refs.push(new WeakRef(response.request));
|
||||
}
|
||||
|
||||
// Drop strong refs and force GC.
|
||||
global.gc();
|
||||
await setTimeoutAsync(10);
|
||||
global.gc();
|
||||
|
||||
const retained = refs.filter((r) => r.deref() !== undefined).length;
|
||||
// Some trailing requests may still be referenced in internal buffers.
|
||||
// The fix's correctness: retained count scales with agent socket count,
|
||||
// NOT with request count. A pre-fix leak would keep >>socket count.
|
||||
assert.ok(
|
||||
retained <= 20,
|
||||
`expected most request objects to be collectible after GC; ${retained}/200 retained suggests a closure leak`
|
||||
);
|
||||
} finally {
|
||||
agent.destroy();
|
||||
await stopHTTPServer(server);
|
||||
}
|
||||
}, 30000);
|
||||
|
||||
it('should not fail with "socket hang up" when using timeouts', async () => {
|
||||
const server = await startHTTPServer(
|
||||
async (req, res) => {
|
||||
@@ -4100,7 +4214,7 @@ describe('supports http with nodejs', () => {
|
||||
}
|
||||
}, 15000);
|
||||
|
||||
it('should remove request socket error listeners after keep-alive requests close', async () => {
|
||||
it('should install at most one socket error listener across reused keep-alive sockets', async () => {
|
||||
const noop = () => {};
|
||||
const socket = new EventEmitter();
|
||||
socket.setKeepAlive = noop;
|
||||
@@ -4146,19 +4260,210 @@ describe('supports http with nodejs', () => {
|
||||
},
|
||||
};
|
||||
|
||||
// First request: axios installs its single per-socket listener.
|
||||
await axios.get('http://example.com/first', {
|
||||
transport,
|
||||
maxRedirects: 0,
|
||||
});
|
||||
await setTimeoutAsync(0);
|
||||
assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount);
|
||||
assert.strictEqual(
|
||||
socket.listenerCount('error'),
|
||||
baseErrorListenerCount + 1,
|
||||
'axios should install exactly one socket error listener'
|
||||
);
|
||||
|
||||
await axios.get('http://example.com/second', {
|
||||
transport,
|
||||
maxRedirects: 0,
|
||||
});
|
||||
// Many subsequent requests reusing the same socket must not add more listeners.
|
||||
for (let i = 0; i < 20; i += 1) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await axios.get(`http://example.com/next-${i}`, {
|
||||
transport,
|
||||
maxRedirects: 0,
|
||||
});
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await setTimeoutAsync(0);
|
||||
assert.strictEqual(
|
||||
socket.listenerCount('error'),
|
||||
baseErrorListenerCount + 1,
|
||||
'listener count must stay constant across keep-alive reuse'
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it('should not accumulate socket error listeners when a pooled socket is reassigned before the previous request closes (regression #10780)', async () => {
|
||||
const noop = () => {};
|
||||
const socket = new EventEmitter();
|
||||
socket.setKeepAlive = noop;
|
||||
socket.on('error', noop);
|
||||
|
||||
const baseErrorListenerCount = socket.listenerCount('error');
|
||||
|
||||
// Each request defers its 'close' emission so that the socket is
|
||||
// reassigned to the next request before the previous one closes.
|
||||
// This reproduces the race condition described in #10780.
|
||||
const pendingRequests = [];
|
||||
|
||||
const transport = {
|
||||
request(_, cb) {
|
||||
const req = new (class MockRequest extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
this.destroyed = false;
|
||||
}
|
||||
|
||||
setTimeout() {}
|
||||
write() {}
|
||||
|
||||
end() {
|
||||
// Share the single pooled socket across every request.
|
||||
this.emit('socket', socket);
|
||||
|
||||
setImmediate(() => {
|
||||
const response = stream.Readable.from(['ok']);
|
||||
response.statusCode = 200;
|
||||
response.headers = {};
|
||||
cb(response);
|
||||
// Intentionally do NOT emit 'close' yet. Collect the req
|
||||
// so close can be emitted later, after other reqs have
|
||||
// already claimed the socket.
|
||||
pendingRequests.push(this);
|
||||
});
|
||||
}
|
||||
|
||||
destroy(err) {
|
||||
if (this.destroyed) return;
|
||||
this.destroyed = true;
|
||||
err && this.emit('error', err);
|
||||
this.emit('close');
|
||||
}
|
||||
})();
|
||||
|
||||
return req;
|
||||
},
|
||||
};
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from({ length: 20 }, (_, i) =>
|
||||
axios.get(`http://example.com/concurrent-${i}`, {
|
||||
transport,
|
||||
maxRedirects: 0,
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
assert.strictEqual(results.length, 20);
|
||||
|
||||
// Critical assertion: despite 20 concurrent requests all claiming the
|
||||
// same pooled socket before any emitted 'close', only ONE axios listener
|
||||
// must be attached. This is the difference between the pre-fix
|
||||
// behaviour (20 listeners, MaxListenersExceededWarning) and the fix.
|
||||
assert.strictEqual(
|
||||
socket.listenerCount('error'),
|
||||
baseErrorListenerCount + 1,
|
||||
`expected a single axios socket error listener under concurrent reuse, got ${socket.listenerCount('error') - baseErrorListenerCount}`
|
||||
);
|
||||
|
||||
// Now drain the queued close events. Listener count must still be 1.
|
||||
for (const req of pendingRequests) {
|
||||
req.emit('close');
|
||||
}
|
||||
await setTimeoutAsync(0);
|
||||
assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount);
|
||||
|
||||
assert.strictEqual(
|
||||
socket.listenerCount('error'),
|
||||
baseErrorListenerCount + 1,
|
||||
'listener must persist on the socket after requests close (cleanup is per-request ownership, not per-listener removal)'
|
||||
);
|
||||
});
|
||||
|
||||
it('should route a socket error to the currently-active request after the socket has been reassigned', async () => {
|
||||
const noop = () => {};
|
||||
const socket = new EventEmitter();
|
||||
socket.setKeepAlive = noop;
|
||||
socket.on('error', noop);
|
||||
|
||||
const createdReqs = [];
|
||||
|
||||
// First transport: completes cleanly (emits response then close).
|
||||
const cleanTransport = {
|
||||
request(_, cb) {
|
||||
const emitter = new (class MockRequest extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
this.destroyed = false;
|
||||
createdReqs.push(this);
|
||||
}
|
||||
setTimeout() {}
|
||||
write() {}
|
||||
end() {
|
||||
this.emit('socket', socket);
|
||||
setImmediate(() => {
|
||||
const response = stream.Readable.from(['ok']);
|
||||
response.statusCode = 200;
|
||||
response.headers = {};
|
||||
cb(response);
|
||||
this.emit('close');
|
||||
});
|
||||
}
|
||||
destroy(err) {
|
||||
if (this.destroyed) return;
|
||||
this.destroyed = true;
|
||||
err && this.emit('error', err);
|
||||
this.emit('close');
|
||||
}
|
||||
})();
|
||||
return emitter;
|
||||
},
|
||||
};
|
||||
|
||||
// Second transport: emits socket error instead of a response.
|
||||
const errorTransport = {
|
||||
request() {
|
||||
const emitter = new (class MockRequest extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
this.destroyed = false;
|
||||
createdReqs.push(this);
|
||||
}
|
||||
setTimeout() {}
|
||||
write() {}
|
||||
end() {
|
||||
this.emit('socket', socket);
|
||||
setImmediate(() => {
|
||||
socket.emit('error', Object.assign(new Error('boom'), { code: 'EPIPE' }));
|
||||
});
|
||||
}
|
||||
destroy(err) {
|
||||
if (this.destroyed) return;
|
||||
this.destroyed = true;
|
||||
err && this.emit('error', err);
|
||||
this.emit('close');
|
||||
}
|
||||
})();
|
||||
return emitter;
|
||||
},
|
||||
};
|
||||
|
||||
// First request completes successfully; socket is released.
|
||||
await axios.get('http://example.com/first', { transport: cleanTransport, maxRedirects: 0 });
|
||||
await setTimeoutAsync(0);
|
||||
|
||||
const firstReq = createdReqs[0];
|
||||
assert.ok(firstReq && firstReq.destroyed === false, 'first request must not have been destroyed by a socket error');
|
||||
|
||||
// Stray socket error after first req has closed: must not destroy firstReq.
|
||||
socket.emit('error', new Error('stray error after close'));
|
||||
assert.strictEqual(firstReq.destroyed, false, 'socket error after close must not destroy the old request');
|
||||
|
||||
// Second request claims the socket, then its socket errors. It should reject.
|
||||
const err = await axios
|
||||
.get('http://example.com/second', { transport: errorTransport, maxRedirects: 0 })
|
||||
.catch((e) => e);
|
||||
|
||||
assert.ok(err instanceof AxiosError, 'second request should reject with an AxiosError');
|
||||
assert.strictEqual(err.code, 'EPIPE');
|
||||
|
||||
const secondReq = createdReqs[1];
|
||||
assert.strictEqual(secondReq.destroyed, true, 'second request should be destroyed by its own active socket error');
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user