From e3c76fc9bdd03aa4d98afaf211df943e2031453f Mon Sep 17 00:00:00 2001 From: Dmitriy Mozgovoy Date: Thu, 1 Aug 2024 16:59:58 +0300 Subject: [PATCH] fix(adapter): fix progress event emitting; (#6518) --- lib/adapters/fetch.js | 56 +++++++++++++++-------------- lib/adapters/http.js | 40 +++++++++++++-------- lib/adapters/xhr.js | 33 ++++++++++------- lib/helpers/AxiosTransformStream.js | 55 ++-------------------------- lib/helpers/progressEventReducer.js | 20 ++++++++--- lib/helpers/throttle.js | 49 ++++++++++++++----------- lib/helpers/trackStream.js | 38 +++++++++++++------- lib/utils.js | 34 +++++++++++++++++- test/manual/progress.html | 25 +++++++++---- test/unit/adapters/http.js | 7 +++- 10 files changed, 205 insertions(+), 152 deletions(-) diff --git a/lib/adapters/fetch.js b/lib/adapters/fetch.js index 06dece9..f997f1d 100644 --- a/lib/adapters/fetch.js +++ b/lib/adapters/fetch.js @@ -4,19 +4,10 @@ import AxiosError from "../core/AxiosError.js"; import composeSignals from "../helpers/composeSignals.js"; import {trackStream} from "../helpers/trackStream.js"; import AxiosHeaders from "../core/AxiosHeaders.js"; -import progressEventReducer from "../helpers/progressEventReducer.js"; +import {progressEventReducer, progressEventDecorator, asyncDecorator} from "../helpers/progressEventReducer.js"; import resolveConfig from "../helpers/resolveConfig.js"; import settle from "../core/settle.js"; -const fetchProgressDecorator = (total, fn) => { - const lengthComputable = total != null; - return (loaded) => setTimeout(() => fn({ - lengthComputable, - total, - loaded - })); -} - const isFetchSupported = typeof fetch === 'function' && typeof Request === 'function' && typeof Response === 'function'; const isReadableStreamSupported = isFetchSupported && typeof ReadableStream === 'function'; @@ -26,7 +17,15 @@ const encodeText = isFetchSupported && (typeof TextEncoder === 'function' ? async (str) => new Uint8Array(await new Response(str).arrayBuffer()) ); -const supportsRequestStream = isReadableStreamSupported && (() => { +const test = (fn, ...args) => { + try { + return !!fn(...args); + } catch (e) { + return false + } +} + +const supportsRequestStream = isReadableStreamSupported && test(() => { let duplexAccessed = false; const hasContentType = new Request(platform.origin, { @@ -39,17 +38,13 @@ const supportsRequestStream = isReadableStreamSupported && (() => { }).headers.has('Content-Type'); return duplexAccessed && !hasContentType; -})(); +}); const DEFAULT_CHUNK_SIZE = 64 * 1024; -const supportsResponseStream = isReadableStreamSupported && !!(()=> { - try { - return utils.isReadableStream(new Response('').body); - } catch(err) { - // return undefined - } -})(); +const supportsResponseStream = isReadableStreamSupported && + test(() => utils.isReadableStream(new Response('').body)); + const resolvers = { stream: supportsResponseStream && ((res) => res.body) @@ -77,7 +72,7 @@ const getBodyLength = async (body) => { return (await new Request(body).arrayBuffer()).byteLength; } - if(utils.isArrayBufferView(body)) { + if(utils.isArrayBufferView(body) || utils.isArrayBuffer(body)) { return body.byteLength; } @@ -147,10 +142,12 @@ export default isFetchSupported && (async (config) => { } if (_request.body) { - data = trackStream(_request.body, DEFAULT_CHUNK_SIZE, fetchProgressDecorator( + const [onProgress, flush] = progressEventDecorator( requestContentLength, - progressEventReducer(onUploadProgress) - ), null, encodeText); + progressEventReducer(asyncDecorator(onUploadProgress)) + ); + + data = trackStream(_request.body, DEFAULT_CHUNK_SIZE, onProgress, flush, encodeText); } } @@ -181,11 +178,16 @@ export default isFetchSupported && (async (config) => { const responseContentLength = utils.toFiniteNumber(response.headers.get('content-length')); + const [onProgress, flush] = onDownloadProgress && progressEventDecorator( + responseContentLength, + progressEventReducer(asyncDecorator(onDownloadProgress), true) + ) || []; + response = new Response( - trackStream(response.body, DEFAULT_CHUNK_SIZE, onDownloadProgress && fetchProgressDecorator( - responseContentLength, - progressEventReducer(onDownloadProgress, true) - ), isStreamResponse && onFinish, encodeText), + trackStream(response.body, DEFAULT_CHUNK_SIZE, onProgress, () => { + flush && flush(); + isStreamResponse && onFinish(); + }, encodeText), options ); } diff --git a/lib/adapters/http.js b/lib/adapters/http.js index 768e4a5..7c982fa 100755 --- a/lib/adapters/http.js +++ b/lib/adapters/http.js @@ -24,6 +24,7 @@ import formDataToStream from "../helpers/formDataToStream.js"; import readBlob from "../helpers/readBlob.js"; import ZlibHeaderTransformStream from '../helpers/ZlibHeaderTransformStream.js'; import callbackify from "../helpers/callbackify.js"; +import {progressEventReducer, progressEventDecorator, asyncDecorator} from "../helpers/progressEventReducer.js"; const zlibOptions = { flush: zlib.constants.Z_SYNC_FLUSH, @@ -45,6 +46,14 @@ const supportedProtocols = platform.protocols.map(protocol => { return protocol + ':'; }); +const flushOnFinish = (stream, [throttled, flush]) => { + stream + .on('end', flush) + .on('error', flush); + + return throttled; +} + /** * If the proxy or config beforeRedirects functions are defined, call them with the options * object. @@ -278,8 +287,7 @@ export default isHttpAdapterSupported && function httpAdapter(config) { // Only set header if it hasn't been set in config headers.set('User-Agent', 'axios/' + VERSION, false); - const onDownloadProgress = config.onDownloadProgress; - const onUploadProgress = config.onUploadProgress; + const {onUploadProgress, onDownloadProgress} = config; const maxRate = config.maxRate; let maxUploadRate = undefined; let maxDownloadRate = undefined; @@ -352,15 +360,16 @@ export default isHttpAdapterSupported && function httpAdapter(config) { } data = stream.pipeline([data, new AxiosTransformStream({ - length: contentLength, maxRate: utils.toFiniteNumber(maxUploadRate) })], utils.noop); - onUploadProgress && data.on('progress', progress => { - onUploadProgress(Object.assign(progress, { - upload: true - })); - }); + onUploadProgress && data.on('progress', flushOnFinish( + data, + progressEventDecorator( + contentLength, + progressEventReducer(asyncDecorator(onUploadProgress), false, 3) + ) + )); } // HTTP basic authentication @@ -459,17 +468,18 @@ export default isHttpAdapterSupported && function httpAdapter(config) { const responseLength = +res.headers['content-length']; - if (onDownloadProgress) { + if (onDownloadProgress || maxDownloadRate) { const transformStream = new AxiosTransformStream({ - length: utils.toFiniteNumber(responseLength), maxRate: utils.toFiniteNumber(maxDownloadRate) }); - onDownloadProgress && transformStream.on('progress', progress => { - onDownloadProgress(Object.assign(progress, { - download: true - })); - }); + onDownloadProgress && transformStream.on('progress', flushOnFinish( + transformStream, + progressEventDecorator( + responseLength, + progressEventReducer(asyncDecorator(onDownloadProgress), true, 3) + ) + )); streams.push(transformStream); } diff --git a/lib/adapters/xhr.js b/lib/adapters/xhr.js index c4bf677..a7ee548 100644 --- a/lib/adapters/xhr.js +++ b/lib/adapters/xhr.js @@ -6,7 +6,7 @@ import CanceledError from '../cancel/CanceledError.js'; import parseProtocol from '../helpers/parseProtocol.js'; import platform from '../platform/index.js'; import AxiosHeaders from '../core/AxiosHeaders.js'; -import progressEventReducer from '../helpers/progressEventReducer.js'; +import {progressEventReducer} from '../helpers/progressEventReducer.js'; import resolveConfig from "../helpers/resolveConfig.js"; const isXHRAdapterSupported = typeof XMLHttpRequest !== 'undefined'; @@ -16,16 +16,18 @@ export default isXHRAdapterSupported && function (config) { const _config = resolveConfig(config); let requestData = _config.data; const requestHeaders = AxiosHeaders.from(_config.headers).normalize(); - let {responseType} = _config; + let {responseType, onUploadProgress, onDownloadProgress} = _config; let onCanceled; - function done() { - if (_config.cancelToken) { - _config.cancelToken.unsubscribe(onCanceled); - } + let uploadThrottled, downloadThrottled; + let flushUpload, flushDownload; - if (_config.signal) { - _config.signal.removeEventListener('abort', onCanceled); - } + function done() { + flushUpload && flushUpload(); // flush events + flushDownload && flushDownload(); // flush events + + _config.cancelToken && _config.cancelToken.unsubscribe(onCanceled); + + _config.signal && _config.signal.removeEventListener('abort', onCanceled); } let request = new XMLHttpRequest(); @@ -149,13 +151,18 @@ export default isXHRAdapterSupported && function (config) { } // Handle progress if needed - if (typeof _config.onDownloadProgress === 'function') { - request.addEventListener('progress', progressEventReducer(_config.onDownloadProgress, true)); + if (onDownloadProgress) { + ([downloadThrottled, flushDownload] = progressEventReducer(onDownloadProgress, true)); + request.addEventListener('progress', downloadThrottled); } // Not all browsers support upload events - if (typeof _config.onUploadProgress === 'function' && request.upload) { - request.upload.addEventListener('progress', progressEventReducer(_config.onUploadProgress)); + if (onUploadProgress && request.upload) { + ([uploadThrottled, flushUpload] = progressEventReducer(onUploadProgress)); + + request.upload.addEventListener('progress', uploadThrottled); + + request.upload.addEventListener('loadend', flushUpload); } if (_config.cancelToken || _config.signal) { diff --git a/lib/helpers/AxiosTransformStream.js b/lib/helpers/AxiosTransformStream.js index 92ba883..4140071 100644 --- a/lib/helpers/AxiosTransformStream.js +++ b/lib/helpers/AxiosTransformStream.js @@ -2,8 +2,6 @@ import stream from 'stream'; import utils from '../utils.js'; -import throttle from './throttle.js'; -import speedometer from './speedometer.js'; const kInternals = Symbol('internals'); @@ -24,12 +22,8 @@ class AxiosTransformStream extends stream.Transform{ readableHighWaterMark: options.chunkSize }); - const self = this; - const internals = this[kInternals] = { - length: options.length, timeWindow: options.timeWindow, - ticksRate: options.ticksRate, chunkSize: options.chunkSize, maxRate: options.maxRate, minChunkSize: options.minChunkSize, @@ -41,8 +35,6 @@ class AxiosTransformStream extends stream.Transform{ onReadCallback: null }; - const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow); - this.on('newListener', event => { if (event === 'progress') { if (!internals.isCaptured) { @@ -50,39 +42,6 @@ class AxiosTransformStream extends stream.Transform{ } } }); - - let bytesNotified = 0; - - internals.updateProgress = throttle(function throttledHandler() { - const totalBytes = internals.length; - const bytesTransferred = internals.bytesSeen; - const progressBytes = bytesTransferred - bytesNotified; - if (!progressBytes || self.destroyed) return; - - const rate = _speedometer(progressBytes); - - bytesNotified = bytesTransferred; - - process.nextTick(() => { - self.emit('progress', { - loaded: bytesTransferred, - total: totalBytes, - progress: totalBytes ? (bytesTransferred / totalBytes) : undefined, - bytes: progressBytes, - rate: rate ? rate : undefined, - estimated: rate && totalBytes && bytesTransferred <= totalBytes ? - (totalBytes - bytesTransferred) / rate : undefined, - lengthComputable: totalBytes != null - }); - }); - }, internals.ticksRate); - - const onFinish = () => { - internals.updateProgress.call(true); - }; - - this.once('end', onFinish); - this.once('error', onFinish); } _read(size) { @@ -96,7 +55,6 @@ class AxiosTransformStream extends stream.Transform{ } _transform(chunk, encoding, callback) { - const self = this; const internals = this[kInternals]; const maxRate = internals.maxRate; @@ -108,16 +66,14 @@ class AxiosTransformStream extends stream.Transform{ const bytesThreshold = (maxRate / divider); const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; - function pushChunk(_chunk, _callback) { + const pushChunk = (_chunk, _callback) => { const bytes = Buffer.byteLength(_chunk); internals.bytesSeen += bytes; internals.bytes += bytes; - if (internals.isCaptured) { - internals.updateProgress(); - } + internals.isCaptured && this.emit('progress', internals.bytesSeen); - if (self.push(_chunk)) { + if (this.push(_chunk)) { process.nextTick(_callback); } else { internals.onReadCallback = () => { @@ -182,11 +138,6 @@ class AxiosTransformStream extends stream.Transform{ } }); } - - setLength(length) { - this[kInternals].length = +length; - return this; - } } export default AxiosTransformStream; diff --git a/lib/helpers/progressEventReducer.js b/lib/helpers/progressEventReducer.js index 7829be3..ff601cc 100644 --- a/lib/helpers/progressEventReducer.js +++ b/lib/helpers/progressEventReducer.js @@ -1,7 +1,8 @@ import speedometer from "./speedometer.js"; import throttle from "./throttle.js"; +import utils from "../utils.js"; -export default (listener, isDownloadStream, freq = 3) => { +export const progressEventReducer = (listener, isDownloadStream, freq = 3) => { let bytesNotified = 0; const _speedometer = speedometer(50, 250); @@ -22,11 +23,22 @@ export default (listener, isDownloadStream, freq = 3) => { rate: rate ? rate : undefined, estimated: rate && total && inRange ? (total - loaded) / rate : undefined, event: e, - lengthComputable: total != null + lengthComputable: total != null, + [isDownloadStream ? 'download' : 'upload']: true }; - data[isDownloadStream ? 'download' : 'upload'] = true; - listener(data); }, freq); } + +export const progressEventDecorator = (total, throttled) => { + const lengthComputable = total != null; + + return [(loaded) => throttled[0]({ + lengthComputable, + total, + loaded + }), throttled[1]]; +} + +export const asyncDecorator = (fn) => (...args) => utils.asap(() => fn(...args)); diff --git a/lib/helpers/throttle.js b/lib/helpers/throttle.js index a3204fd..e256272 100644 --- a/lib/helpers/throttle.js +++ b/lib/helpers/throttle.js @@ -1,5 +1,3 @@ -'use strict'; - /** * Throttle decorator * @param {Function} fn @@ -8,28 +6,39 @@ */ function throttle(fn, freq) { let timestamp = 0; - const threshold = 1000 / freq; - let timer = null; - return function throttled() { - const force = this === true; + let threshold = 1000 / freq; + let lastArgs; + let timer; + const invoke = (args, now = Date.now()) => { + timestamp = now; + lastArgs = null; + if (timer) { + clearTimeout(timer); + timer = null; + } + fn.apply(null, args); + } + + const throttled = (...args) => { const now = Date.now(); - if (force || now - timestamp > threshold) { - if (timer) { - clearTimeout(timer); - timer = null; + const passed = now - timestamp; + if ( passed >= threshold) { + invoke(args, now); + } else { + lastArgs = args; + if (!timer) { + timer = setTimeout(() => { + timer = null; + invoke(lastArgs) + }, threshold - passed); } - timestamp = now; - return fn.apply(null, arguments); } - if (!timer) { - timer = setTimeout(() => { - timer = null; - timestamp = Date.now(); - return fn.apply(null, arguments); - }, threshold - (now - timestamp)); - } - }; + } + + const flush = () => lastArgs && invoke(lastArgs); + + return [throttled, flush]; } export default throttle; diff --git a/lib/helpers/trackStream.js b/lib/helpers/trackStream.js index 6241d06..0ae39ab 100644 --- a/lib/helpers/trackStream.js +++ b/lib/helpers/trackStream.js @@ -1,5 +1,4 @@ - export const streamChunk = function* (chunk, chunkSize) { let len = chunk.byteLength; @@ -28,25 +27,38 @@ export const trackStream = (stream, chunkSize, onProgress, onFinish, encode) => const iterator = readBytes(stream, chunkSize, encode); let bytes = 0; + let done; + let _onFinish = (e) => { + if (!done) { + done = true; + onFinish && onFinish(e); + } + } return new ReadableStream({ - type: 'bytes', - async pull(controller) { - const {done, value} = await iterator.next(); + try { + const {done, value} = await iterator.next(); - if (done) { - controller.close(); - onFinish(); - return; + if (done) { + _onFinish(); + controller.close(); + return; + } + + let len = value.byteLength; + if (onProgress) { + let loadedBytes = bytes += len; + onProgress(loadedBytes); + } + controller.enqueue(new Uint8Array(value)); + } catch (err) { + _onFinish(err); + throw err; } - - let len = value.byteLength; - onProgress && onProgress(bytes += len); - controller.enqueue(new Uint8Array(value)); }, cancel(reason) { - onFinish(reason); + _onFinish(reason); return iterator.return(); } }, { diff --git a/lib/utils.js b/lib/utils.js index 72b9f1d..32679da 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -669,6 +669,36 @@ const isAsyncFn = kindOfTest('AsyncFunction'); const isThenable = (thing) => thing && (isObject(thing) || isFunction(thing)) && isFunction(thing.then) && isFunction(thing.catch); +// original code +// https://github.com/DigitalBrainJS/AxiosPromise/blob/16deab13710ec09779922131f3fa5954320f83ab/lib/utils.js#L11-L34 + +const _setImmediate = ((setImmediateSupported, postMessageSupported) => { + if (setImmediateSupported) { + return setImmediate; + } + + return postMessageSupported ? ((token, callbacks) => { + _global.addEventListener("message", ({source, data}) => { + if (source === _global && data === token) { + callbacks.length && callbacks.shift()(); + } + }, false); + + return (cb) => { + callbacks.push(cb); + _global.postMessage(token, "*"); + } + })(`axios@${Math.random()}`, []) : (cb) => setTimeout(cb); +})( + typeof setImmediate === 'function', + isFunction(_global.postMessage) +); + +const asap = typeof queueMicrotask !== 'undefined' ? + queueMicrotask.bind(_global) : ( typeof process !== 'undefined' && process.nextTick || _setImmediate); + +// ********************* + export default { isArray, isArrayBuffer, @@ -724,5 +754,7 @@ export default { isSpecCompliantForm, toJSONObject, isAsyncFn, - isThenable + isThenable, + setImmediate: _setImmediate, + asap }; diff --git a/test/manual/progress.html b/test/manual/progress.html index 628b794..e651b12 100644 --- a/test/manual/progress.html +++ b/test/manual/progress.html @@ -11,12 +11,25 @@ See your console data.fill(123); - axios.post('http://httpbin.org/post', data, { - onUploadProgress: console.log, - onDownloadProgress: console.log, - }).then(data=> { - console.log(`Done: `, data); - }).catch(console.warn); + console.log('Starting...'); + + (async() => { + await axios.post('http://httpbin.org/post', data, { + adapter: 'xhr', + onUploadProgress: (e) => console.log('xhr upload', e), + onDownloadProgress: (e) => console.log('xhr download', e), + }).then(data=> { + console.log(`Done: `, data); + }).catch(e => console.warn('xhr', e)); + + await axios.post('https://httpbin.org/post', data, { + adapter: 'fetch', + onUploadProgress: (e) => console.log('fetch upload', e), + onDownloadProgress: (e) => console.log('fetch download', e) + }).then(data=> { + console.log(`Done: `, data); + }).catch(e => console.warn('fetch', e)); + })(); diff --git a/test/unit/adapters/http.js b/test/unit/adapters/http.js index 4e27eb2..d5f70ff 100644 --- a/test/unit/adapters/http.js +++ b/test/unit/adapters/http.js @@ -1919,6 +1919,7 @@ describe('supports http with nodejs', function () { describe('progress', function () { describe('upload', function () { it('should support upload progress capturing', async function () { + this.timeout(15000); server = await startHTTPServer({ rate: 100 * 1024 }); @@ -1943,6 +1944,7 @@ describe('supports http with nodejs', function () { const {data} = await axios.post(LOCAL_SERVER_URL, readable, { onUploadProgress: ({loaded, total, progress, bytes, upload}) => { + console.log('onUploadProgress', loaded, '/', total); samples.push({ loaded, total, @@ -1975,6 +1977,8 @@ describe('supports http with nodejs', function () { describe('download', function () { it('should support download progress capturing', async function () { + this.timeout(15000); + server = await startHTTPServer({ rate: 100 * 1024 }); @@ -1999,6 +2003,7 @@ describe('supports http with nodejs', function () { const {data} = await axios.post(LOCAL_SERVER_URL, readable, { onDownloadProgress: ({loaded, total, progress, bytes, download}) => { + console.log('onDownloadProgress', loaded, '/', total); samples.push({ loaded, total, @@ -2114,7 +2119,7 @@ describe('supports http with nodejs', function () { }]` ); - const progressTicksRate = 2; + const progressTicksRate = 3; const expectedProgress = ((i + skip) / secs) / progressTicksRate; assert.ok(