22
33const { WebsocketFrameSend } = require ( './frame' )
44const { opcodes, sendHints } = require ( './constants' )
5+ const FixedQueue = require ( '../../dispatcher/fixed-queue' )
56
67/** @type {typeof Uint8Array } */
78const FastBuffer = Buffer [ Symbol . species ]
89
910/**
10- * @typedef {object } SendQueueNode
11- * @property {SendQueueNode | null } next
11+ * @typedef {object } QueueNode
1212 * @property {Promise<void> | null } promise
1313 * @property {((...args: any[]) => any) } callback
1414 * @property {Buffer | null } frame
1515 */
1616
1717class SendQueue {
1818 /**
19- * @type {SendQueueNode | null }
19+ * @type {FixedQueue | null }
2020 */
21- #head = null
22- /**
23- * @type {SendQueueNode | null }
24- */
25- #tail = null
21+ #queue = null
2622
2723 /**
2824 * @type {boolean }
@@ -43,24 +39,19 @@ class SendQueue {
4339 // fast-path
4440 this . #socket. write ( frame , cb )
4541 } else {
46- /** @type {SendQueueNode } */
42+ /** @type {QueueNode } */
4743 const node = {
48- next : null ,
4944 promise : null ,
5045 callback : cb ,
5146 frame
5247 }
53- if ( this . #tail !== null ) {
54- this . #tail. next = node
55- }
56- this . #tail = node
48+ ( this . #queue ??= new FixedQueue ( ) ) . push ( node )
5749 }
5850 return
5951 }
6052
61- /** @type {SendQueueNode } */
53+ /** @type {QueueNode } */
6254 const node = {
63- next : null ,
6455 promise : item . arrayBuffer ( ) . then ( ( ab ) => {
6556 node . promise = null
6657 node . frame = createFrame ( ab , hint )
@@ -69,13 +60,7 @@ class SendQueue {
6960 frame : null
7061 }
7162
72- if ( this . #tail === null ) {
73- this . #tail = node
74- }
75-
76- if ( this . #head === null ) {
77- this . #head = node
78- }
63+ ( this . #queue ??= new FixedQueue ( ) ) . push ( node )
7964
8065 if ( ! this . #running) {
8166 this . #run( )
@@ -84,9 +69,10 @@ class SendQueue {
8469
8570 async #run ( ) {
8671 this . #running = true
87- /** @type {SendQueueNode | null } */
88- let node = this . #head
89- while ( node !== null ) {
72+ /** @type {FixedQueue } */
73+ const queue = this . #queue
74+ while ( ! queue . isEmpty ( ) ) {
75+ const node = queue . shift ( )
9076 // wait pending promise
9177 if ( node . promise !== null ) {
9278 await node . promise
@@ -95,11 +81,7 @@ class SendQueue {
9581 this . #socket. write ( node . frame , node . callback )
9682 // cleanup
9783 node . callback = node . frame = null
98- // set next
99- node = node . next
10084 }
101- this . #head = null
102- this . #tail = null
10385 this . #running = false
10486 }
10587}
0 commit comments