2
0
mirror of https://github.com/tenrok/axios.git synced 2026-06-17 19:21:29 +03:00

fix(adapter): fix progress event emitting; (#6518)

This commit is contained in:
Dmitriy Mozgovoy
2024-08-01 16:59:58 +03:00
committed by GitHub
parent 85d4d0ea0a
commit e3c76fc9bd
10 changed files with 205 additions and 152 deletions
+3 -52
View File
@@ -2,8 +2,6 @@
import stream from 'stream';
import utils from '../utils.js';
import throttle from './throttle.js';
import speedometer from './speedometer.js';
const kInternals = Symbol('internals');
@@ -24,12 +22,8 @@ class AxiosTransformStream extends stream.Transform{
readableHighWaterMark: options.chunkSize
});
const self = this;
const internals = this[kInternals] = {
length: options.length,
timeWindow: options.timeWindow,
ticksRate: options.ticksRate,
chunkSize: options.chunkSize,
maxRate: options.maxRate,
minChunkSize: options.minChunkSize,
@@ -41,8 +35,6 @@ class AxiosTransformStream extends stream.Transform{
onReadCallback: null
};
const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
this.on('newListener', event => {
if (event === 'progress') {
if (!internals.isCaptured) {
@@ -50,39 +42,6 @@ class AxiosTransformStream extends stream.Transform{
}
}
});
let bytesNotified = 0;
internals.updateProgress = throttle(function throttledHandler() {
const totalBytes = internals.length;
const bytesTransferred = internals.bytesSeen;
const progressBytes = bytesTransferred - bytesNotified;
if (!progressBytes || self.destroyed) return;
const rate = _speedometer(progressBytes);
bytesNotified = bytesTransferred;
process.nextTick(() => {
self.emit('progress', {
loaded: bytesTransferred,
total: totalBytes,
progress: totalBytes ? (bytesTransferred / totalBytes) : undefined,
bytes: progressBytes,
rate: rate ? rate : undefined,
estimated: rate && totalBytes && bytesTransferred <= totalBytes ?
(totalBytes - bytesTransferred) / rate : undefined,
lengthComputable: totalBytes != null
});
});
}, internals.ticksRate);
const onFinish = () => {
internals.updateProgress.call(true);
};
this.once('end', onFinish);
this.once('error', onFinish);
}
_read(size) {
@@ -96,7 +55,6 @@ class AxiosTransformStream extends stream.Transform{
}
_transform(chunk, encoding, callback) {
const self = this;
const internals = this[kInternals];
const maxRate = internals.maxRate;
@@ -108,16 +66,14 @@ class AxiosTransformStream extends stream.Transform{
const bytesThreshold = (maxRate / divider);
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
function pushChunk(_chunk, _callback) {
const pushChunk = (_chunk, _callback) => {
const bytes = Buffer.byteLength(_chunk);
internals.bytesSeen += bytes;
internals.bytes += bytes;
if (internals.isCaptured) {
internals.updateProgress();
}
internals.isCaptured && this.emit('progress', internals.bytesSeen);
if (self.push(_chunk)) {
if (this.push(_chunk)) {
process.nextTick(_callback);
} else {
internals.onReadCallback = () => {
@@ -182,11 +138,6 @@ class AxiosTransformStream extends stream.Transform{
}
});
}
setLength(length) {
this[kInternals].length = +length;
return this;
}
}
export default AxiosTransformStream;
+16 -4
View File
@@ -1,7 +1,8 @@
import speedometer from "./speedometer.js";
import throttle from "./throttle.js";
import utils from "../utils.js";
export default (listener, isDownloadStream, freq = 3) => {
export const progressEventReducer = (listener, isDownloadStream, freq = 3) => {
let bytesNotified = 0;
const _speedometer = speedometer(50, 250);
@@ -22,11 +23,22 @@ export default (listener, isDownloadStream, freq = 3) => {
rate: rate ? rate : undefined,
estimated: rate && total && inRange ? (total - loaded) / rate : undefined,
event: e,
lengthComputable: total != null
lengthComputable: total != null,
[isDownloadStream ? 'download' : 'upload']: true
};
data[isDownloadStream ? 'download' : 'upload'] = true;
listener(data);
}, freq);
}
export const progressEventDecorator = (total, throttled) => {
const lengthComputable = total != null;
return [(loaded) => throttled[0]({
lengthComputable,
total,
loaded
}), throttled[1]];
}
export const asyncDecorator = (fn) => (...args) => utils.asap(() => fn(...args));
+29 -20
View File
@@ -1,5 +1,3 @@
'use strict';
/**
* Throttle decorator
* @param {Function} fn
@@ -8,28 +6,39 @@
*/
function throttle(fn, freq) {
let timestamp = 0;
const threshold = 1000 / freq;
let timer = null;
return function throttled() {
const force = this === true;
let threshold = 1000 / freq;
let lastArgs;
let timer;
const invoke = (args, now = Date.now()) => {
timestamp = now;
lastArgs = null;
if (timer) {
clearTimeout(timer);
timer = null;
}
fn.apply(null, args);
}
const throttled = (...args) => {
const now = Date.now();
if (force || now - timestamp > threshold) {
if (timer) {
clearTimeout(timer);
timer = null;
const passed = now - timestamp;
if ( passed >= threshold) {
invoke(args, now);
} else {
lastArgs = args;
if (!timer) {
timer = setTimeout(() => {
timer = null;
invoke(lastArgs)
}, threshold - passed);
}
timestamp = now;
return fn.apply(null, arguments);
}
if (!timer) {
timer = setTimeout(() => {
timer = null;
timestamp = Date.now();
return fn.apply(null, arguments);
}, threshold - (now - timestamp));
}
};
}
const flush = () => lastArgs && invoke(lastArgs);
return [throttled, flush];
}
export default throttle;
+25 -13
View File
@@ -1,5 +1,4 @@
export const streamChunk = function* (chunk, chunkSize) {
let len = chunk.byteLength;
@@ -28,25 +27,38 @@ export const trackStream = (stream, chunkSize, onProgress, onFinish, encode) =>
const iterator = readBytes(stream, chunkSize, encode);
let bytes = 0;
let done;
let _onFinish = (e) => {
if (!done) {
done = true;
onFinish && onFinish(e);
}
}
return new ReadableStream({
type: 'bytes',
async pull(controller) {
const {done, value} = await iterator.next();
try {
const {done, value} = await iterator.next();
if (done) {
controller.close();
onFinish();
return;
if (done) {
_onFinish();
controller.close();
return;
}
let len = value.byteLength;
if (onProgress) {
let loadedBytes = bytes += len;
onProgress(loadedBytes);
}
controller.enqueue(new Uint8Array(value));
} catch (err) {
_onFinish(err);
throw err;
}
let len = value.byteLength;
onProgress && onProgress(bytes += len);
controller.enqueue(new Uint8Array(value));
},
cancel(reason) {
onFinish(reason);
_onFinish(reason);
return iterator.return();
}
}, {