Skip to content

Commit

Permalink
[fix] Emit at most one event per event loop iteration
Browse files Browse the repository at this point in the history
Fixes #2216
  • Loading branch information
lpinca committed Apr 17, 2024
1 parent b119b41 commit 1d7d65d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 59 deletions.
49 changes: 4 additions & 45 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util');
const { isValidStatusCode, isValidUTF8 } = require('./validation');

const FastBuffer = Buffer[Symbol.species];
const promise = Promise.resolve();

//
// `queueMicrotask()` is not available in Node.js < 11.
//
const queueTask =
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;

const GET_INFO = 0;
const GET_PAYLOAD_LENGTH_16 = 1;
Expand Down Expand Up @@ -567,17 +560,12 @@ class Receiver extends Writable {
data = fragments;
}

//
// If the state is `INFLATING`, it means that the frame data was
// decompressed asynchronously, so there is no need to defer the event
// as it will be emitted asynchronously anyway.
//
if (this._state === INFLATING || this._allowSynchronousEvents) {
if (this._allowSynchronousEvents) {
this.emit('message', data, true);
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', data, true);
this._state = GET_INFO;
this.startLoop(cb);
Expand All @@ -604,7 +592,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', buf, false);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -675,7 +663,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -711,32 +699,3 @@ class Receiver extends Writable {
}

module.exports = Receiver;

/**
* A shim for `queueMicrotask()`.
*
* @param {Function} cb Callback
*/
function queueMicrotaskShim(cb) {
promise.then(cb).catch(throwErrorNextTick);
}

/**
* Throws an error.
*
* @param {Error} err The error to throw
* @private
*/
function throwError(err) {
throw err;
}

/**
* Throws an error in the next tick.
*
* @param {Error} err The error to throw
* @private
*/
function throwErrorNextTick(err) {
process.nextTick(throwError, err);
}
28 changes: 18 additions & 10 deletions test/receiver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1085,17 +1085,21 @@ describe('Receiver', () => {
receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
});

it('emits at most one event per microtask', (done) => {
it('emits at most one event per event loop iteration', (done) => {
const actual = [];
const expected = [
'1',
'microtask 1',
'- 1',
'-- 1',
'2',
'microtask 2',
'- 2',
'-- 2',
'3',
'microtask 3',
'- 3',
'-- 3',
'4',
'microtask 4'
'- 4',
'-- 4'
];

function listener(data) {
Expand All @@ -1104,12 +1108,16 @@ describe('Receiver', () => {

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);
actual.push(`- ${message}`);

if (actual.length === 8) {
assert.deepStrictEqual(actual, expected);
done();
}
Promise.resolve().then(() => {
actual.push(`-- ${message}`);

if (actual.length === 12) {
assert.deepStrictEqual(actual, expected);
done();
}
});
});
}

Expand Down
6 changes: 2 additions & 4 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4205,8 +4205,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.close(1000);
Expand Down Expand Up @@ -4456,8 +4455,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.terminate();
Expand Down

0 comments on commit 1d7d65d

Please sign in to comment.