mirror of
https://github.com/tenrok/axios.git
synced 2026-06-17 19:21:29 +03:00
refactor(http): extract Http2Sessions into its own module (#10861)
* refactor(http): extract Http2Sessions to lib/helpers for direct unit tests * fix(http2): clear pending idle timer when session closes * fix(http2): address review — strict mode, Node-only note, observable timer test --------- Co-authored-by: Jay <jasonsaayman@gmail.com>
This commit is contained in:
committed by
GitHub
parent
ea5dab691e
commit
315ec44489
+1
-103
@@ -24,6 +24,7 @@ import { EventEmitter } from 'events';
|
||||
import formDataToStream from '../helpers/formDataToStream.js';
|
||||
import readBlob from '../helpers/readBlob.js';
|
||||
import ZlibHeaderTransformStream from '../helpers/ZlibHeaderTransformStream.js';
|
||||
import Http2Sessions from '../helpers/Http2Sessions.js';
|
||||
import callbackify from '../helpers/callbackify.js';
|
||||
import shouldBypassProxy from '../helpers/shouldBypassProxy.js';
|
||||
import { toByteStringHeaderObject } from '../helpers/sanitizeHeaderValue.js';
|
||||
@@ -142,109 +143,6 @@ const flushOnFinish = (stream, [throttled, flush]) => {
|
||||
return throttled;
|
||||
};
|
||||
|
||||
class Http2Sessions {
|
||||
constructor() {
|
||||
this.sessions = Object.create(null);
|
||||
}
|
||||
|
||||
getSession(authority, options) {
|
||||
options = Object.assign(
|
||||
{
|
||||
sessionTimeout: 1000,
|
||||
},
|
||||
options
|
||||
);
|
||||
|
||||
let authoritySessions = this.sessions[authority];
|
||||
|
||||
if (authoritySessions) {
|
||||
let len = authoritySessions.length;
|
||||
|
||||
for (let i = 0; i < len; i++) {
|
||||
const [sessionHandle, sessionOptions] = authoritySessions[i];
|
||||
if (
|
||||
!sessionHandle.destroyed &&
|
||||
!sessionHandle.closed &&
|
||||
util.isDeepStrictEqual(sessionOptions, options)
|
||||
) {
|
||||
return sessionHandle;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const session = http2.connect(authority, options);
|
||||
|
||||
let removed;
|
||||
|
||||
const removeSession = () => {
|
||||
if (removed) {
|
||||
return;
|
||||
}
|
||||
|
||||
removed = true;
|
||||
|
||||
let entries = authoritySessions,
|
||||
len = entries.length,
|
||||
i = len;
|
||||
|
||||
while (i--) {
|
||||
if (entries[i][0] === session) {
|
||||
if (len === 1) {
|
||||
delete this.sessions[authority];
|
||||
} else {
|
||||
entries.splice(i, 1);
|
||||
}
|
||||
if (!session.closed) {
|
||||
session.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const originalRequestFn = session.request;
|
||||
|
||||
const { sessionTimeout } = options;
|
||||
|
||||
if (sessionTimeout != null) {
|
||||
let timer;
|
||||
let streamsCount = 0;
|
||||
|
||||
session.request = function () {
|
||||
const stream = originalRequestFn.apply(this, arguments);
|
||||
|
||||
streamsCount++;
|
||||
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
}
|
||||
|
||||
stream.once('close', () => {
|
||||
if (!--streamsCount) {
|
||||
timer = setTimeout(() => {
|
||||
timer = null;
|
||||
removeSession();
|
||||
}, sessionTimeout);
|
||||
}
|
||||
});
|
||||
|
||||
return stream;
|
||||
};
|
||||
}
|
||||
|
||||
session.once('close', removeSession);
|
||||
|
||||
let entry = [session, options];
|
||||
|
||||
authoritySessions
|
||||
? authoritySessions.push(entry)
|
||||
: (authoritySessions = this.sessions[authority] = [entry]);
|
||||
|
||||
return session;
|
||||
}
|
||||
}
|
||||
|
||||
const http2Sessions = new Http2Sessions();
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
'use strict';
|
||||
|
||||
// Node-only: relies on the built-in `http2` module. Browser/react-native
|
||||
// builds replace `lib/adapters/http.js` (the sole importer) with `lib/helpers/null.js`
|
||||
// via the `browser` package.json field, so this module is never reached in
|
||||
// those environments. Do not import it from any browser-reachable code path.
|
||||
|
||||
import http2 from 'http2';
|
||||
import util from 'util';
|
||||
|
||||
class Http2Sessions {
|
||||
constructor() {
|
||||
this.sessions = Object.create(null);
|
||||
}
|
||||
|
||||
getSession(authority, options) {
|
||||
options = Object.assign(
|
||||
{
|
||||
sessionTimeout: 1000,
|
||||
},
|
||||
options
|
||||
);
|
||||
|
||||
let authoritySessions = this.sessions[authority];
|
||||
|
||||
if (authoritySessions) {
|
||||
let len = authoritySessions.length;
|
||||
|
||||
for (let i = 0; i < len; i++) {
|
||||
const [sessionHandle, sessionOptions] = authoritySessions[i];
|
||||
if (
|
||||
!sessionHandle.destroyed &&
|
||||
!sessionHandle.closed &&
|
||||
util.isDeepStrictEqual(sessionOptions, options)
|
||||
) {
|
||||
return sessionHandle;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const session = http2.connect(authority, options);
|
||||
|
||||
let removed;
|
||||
let timer;
|
||||
|
||||
const removeSession = () => {
|
||||
if (removed) {
|
||||
return;
|
||||
}
|
||||
|
||||
removed = true;
|
||||
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
}
|
||||
|
||||
let entries = authoritySessions,
|
||||
len = entries.length,
|
||||
i = len;
|
||||
|
||||
while (i--) {
|
||||
if (entries[i][0] === session) {
|
||||
if (len === 1) {
|
||||
delete this.sessions[authority];
|
||||
} else {
|
||||
entries.splice(i, 1);
|
||||
}
|
||||
if (!session.closed) {
|
||||
session.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const originalRequestFn = session.request;
|
||||
|
||||
const { sessionTimeout } = options;
|
||||
|
||||
if (sessionTimeout != null) {
|
||||
let streamsCount = 0;
|
||||
|
||||
session.request = function () {
|
||||
const stream = originalRequestFn.apply(this, arguments);
|
||||
|
||||
streamsCount++;
|
||||
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
}
|
||||
|
||||
stream.once('close', () => {
|
||||
if (!--streamsCount) {
|
||||
timer = setTimeout(() => {
|
||||
timer = null;
|
||||
removeSession();
|
||||
}, sessionTimeout);
|
||||
}
|
||||
});
|
||||
|
||||
return stream;
|
||||
};
|
||||
}
|
||||
|
||||
session.once('close', removeSession);
|
||||
|
||||
let entry = [session, options];
|
||||
|
||||
authoritySessions
|
||||
? authoritySessions.push(entry)
|
||||
: (authoritySessions = this.sessions[authority] = [entry]);
|
||||
|
||||
return session;
|
||||
}
|
||||
}
|
||||
|
||||
export default Http2Sessions;
|
||||
@@ -0,0 +1,193 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { EventEmitter } from 'events';
|
||||
import http2 from 'http2';
|
||||
import Http2Sessions from '../../../lib/helpers/Http2Sessions.js';
|
||||
|
||||
function createFakeSession() {
|
||||
const session = new EventEmitter();
|
||||
session.destroyed = false;
|
||||
session.closed = false;
|
||||
session.close = vi.fn(() => {
|
||||
session.closed = true;
|
||||
session.emit('close');
|
||||
});
|
||||
const originalRequest = vi.fn(() => {
|
||||
const stream = new EventEmitter();
|
||||
stream.endStream = vi.fn();
|
||||
return stream;
|
||||
});
|
||||
session.request = originalRequest;
|
||||
session._originalRequest = originalRequest;
|
||||
return session;
|
||||
}
|
||||
|
||||
describe('helpers::Http2Sessions', () => {
|
||||
let connectSpy;
|
||||
let pool;
|
||||
|
||||
beforeEach(() => {
|
||||
connectSpy = vi.spyOn(http2, 'connect').mockImplementation(() => createFakeSession());
|
||||
pool = new Http2Sessions();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
connectSpy.mockRestore();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it('reuses a cached session for the same authority and options', () => {
|
||||
const a = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
const b = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
expect(a).toBe(b);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('creates a separate session for a different authority', () => {
|
||||
pool.getSession('https://example.test');
|
||||
pool.getSession('https://other.test');
|
||||
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('creates a new session when options differ for the same authority', () => {
|
||||
pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
pool.getSession('https://example.test', { sessionTimeout: 5000 });
|
||||
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('does not reuse a destroyed session', () => {
|
||||
const first = pool.getSession('https://example.test');
|
||||
first.destroyed = true;
|
||||
|
||||
const second = pool.getSession('https://example.test');
|
||||
|
||||
expect(second).not.toBe(first);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('does not reuse a closed session', () => {
|
||||
const first = pool.getSession('https://example.test');
|
||||
first.closed = true;
|
||||
|
||||
const second = pool.getSession('https://example.test');
|
||||
|
||||
expect(second).not.toBe(first);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('drops a session from the pool when its close event fires', () => {
|
||||
const first = pool.getSession('https://example.test');
|
||||
first.emit('close');
|
||||
|
||||
const second = pool.getSession('https://example.test');
|
||||
|
||||
expect(second).not.toBe(first);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('keeps unrelated authorities cached when one session closes', () => {
|
||||
const first = pool.getSession('https://example.test');
|
||||
const other = pool.getSession('https://other.test');
|
||||
first.emit('close');
|
||||
|
||||
const otherAgain = pool.getSession('https://other.test');
|
||||
|
||||
expect(otherAgain).toBe(other);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('closes and removes the session after sessionTimeout once the last stream ends', () => {
|
||||
vi.useFakeTimers();
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
const stream = session.request();
|
||||
stream.emit('close');
|
||||
|
||||
expect(session.closed).toBe(false);
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
expect(session.close).toHaveBeenCalledTimes(1);
|
||||
|
||||
const next = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
expect(next).not.toBe(session);
|
||||
expect(connectSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('cancels the pending sessionTimeout when a new stream opens', () => {
|
||||
vi.useFakeTimers();
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
const first = session.request();
|
||||
first.emit('close');
|
||||
|
||||
session.request();
|
||||
vi.advanceTimersByTime(5000);
|
||||
|
||||
expect(session.close).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('keeps the session alive while streams are still open', () => {
|
||||
vi.useFakeTimers();
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
const a = session.request();
|
||||
const b = session.request();
|
||||
a.emit('close');
|
||||
|
||||
vi.advanceTimersByTime(5000);
|
||||
|
||||
expect(session.close).not.toHaveBeenCalled();
|
||||
|
||||
b.emit('close');
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
expect(session.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('defaults sessionTimeout to 1000ms when no options are passed', () => {
|
||||
vi.useFakeTimers();
|
||||
const session = pool.getSession('https://example.test');
|
||||
|
||||
const stream = session.request();
|
||||
stream.emit('close');
|
||||
|
||||
vi.advanceTimersByTime(999);
|
||||
expect(session.close).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1);
|
||||
expect(session.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('installs a request wrapper when sessionTimeout is set', () => {
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
expect(session.request).not.toBe(session._originalRequest);
|
||||
});
|
||||
|
||||
it('does not install the request wrapper when sessionTimeout is null', () => {
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: null });
|
||||
|
||||
expect(session.request).toBe(session._originalRequest);
|
||||
});
|
||||
|
||||
it('cancels the pending idle timer when the session itself closes', () => {
|
||||
vi.useFakeTimers();
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, 'clearTimeout');
|
||||
|
||||
const session = pool.getSession('https://example.test', { sessionTimeout: 1000 });
|
||||
|
||||
const stream = session.request();
|
||||
stream.emit('close');
|
||||
// An idle-removal timer is now pending.
|
||||
|
||||
clearTimeoutSpy.mockClear();
|
||||
session.emit('close');
|
||||
|
||||
// Closing the session must cancel the pending idle timer; otherwise it
|
||||
// keeps Node's event loop refed for up to sessionTimeout ms past close.
|
||||
expect(clearTimeoutSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user