Skip to content

Commit eeae378

Browse files
authored
websocket: use FixedQueue instead of Set (#3283)
1 parent fcfa4db commit eeae378

1 file changed

Lines changed: 62 additions & 43 deletions

File tree

lib/web/websocket/sender.js

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,30 @@
22

33
const { WebsocketFrameSend } = require('./frame')
44
const { opcodes, sendHints } = require('./constants')
5+
const FixedQueue = require('../../dispatcher/fixed-queue')
56

6-
/** @type {Uint8Array} */
7+
/** @type {typeof Uint8Array} */
78
const FastBuffer = Buffer[Symbol.species]
89

10+
/**
11+
* @typedef {object} SendQueueNode
12+
* @property {Promise<void> | null} promise
13+
* @property {((...args: any[]) => any)} callback
14+
* @property {Buffer | null} frame
15+
*/
16+
917
class SendQueue {
10-
#queued = new Set()
11-
#size = 0
18+
/**
19+
* @type {FixedQueue}
20+
*/
21+
#queue = new FixedQueue()
22+
23+
/**
24+
* @type {boolean}
25+
*/
26+
#running = false
1227

13-
/** @type {import('net').Socket} */
28+
/** @type {import('node:net').Socket} */
1429
#socket
1530

1631
constructor (socket) {
@@ -19,66 +34,70 @@ class SendQueue {
1934

2035
add (item, cb, hint) {
2136
if (hint !== sendHints.blob) {
22-
const data = clone(item, hint)
23-
24-
if (this.#size === 0) {
25-
this.#dispatch(data, cb, hint)
37+
const frame = createFrame(item, hint)
38+
if (!this.#running) {
39+
// fast-path
40+
this.#socket.write(frame, cb)
2641
} else {
27-
this.#queued.add([data, cb, true, hint])
28-
this.#size++
29-
30-
this.#run()
42+
/** @type {SendQueueNode} */
43+
const node = {
44+
promise: null,
45+
callback: cb,
46+
frame
47+
}
48+
this.#queue.push(node)
3149
}
32-
3350
return
3451
}
3552

36-
const promise = item.arrayBuffer()
37-
const queue = [null, cb, false, hint]
38-
promise.then((ab) => {
39-
queue[0] = clone(ab, hint)
40-
queue[2] = true
53+
/** @type {SendQueueNode} */
54+
const node = {
55+
promise: item.arrayBuffer().then((ab) => {
56+
node.promise = null
57+
node.frame = createFrame(ab, hint)
58+
}),
59+
callback: cb,
60+
frame: null
61+
}
4162

42-
this.#run()
43-
})
63+
this.#queue.push(node)
4464

45-
this.#queued.add(queue)
46-
this.#size++
65+
if (!this.#running) {
66+
this.#run()
67+
}
4768
}
4869

49-
#run () {
50-
for (const queued of this.#queued) {
51-
const [data, cb, done, hint] = queued
52-
53-
if (!done) return
54-
55-
this.#queued.delete(queued)
56-
this.#size--
57-
58-
this.#dispatch(data, cb, hint)
70+
async #run () {
71+
this.#running = true
72+
const queue = this.#queue
73+
while (!queue.isEmpty()) {
74+
const node = queue.shift()
75+
// wait pending promise
76+
if (node.promise !== null) {
77+
await node.promise
78+
}
79+
// write
80+
this.#socket.write(node.frame, node.callback)
81+
// cleanup
82+
node.callback = node.frame = null
5983
}
84+
this.#running = false
6085
}
86+
}
6187

62-
#dispatch (data, cb, hint) {
63-
const frame = new WebsocketFrameSend()
64-
const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY
65-
66-
frame.frameData = data
67-
const buffer = frame.createFrame(opcode)
68-
69-
this.#socket.write(buffer, cb)
70-
}
88+
function createFrame (data, hint) {
89+
return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY)
7190
}
7291

73-
function clone (data, hint) {
92+
function toBuffer (data, hint) {
7493
switch (hint) {
7594
case sendHints.string:
7695
return Buffer.from(data)
7796
case sendHints.arrayBuffer:
7897
case sendHints.blob:
7998
return new FastBuffer(data)
8099
case sendHints.typedArray:
81-
return Buffer.copyBytesFrom(data)
100+
return new FastBuffer(data.buffer, data.byteOffset, data.byteLength)
82101
}
83102
}
84103

0 commit comments

Comments
 (0)