22
33const { WebsocketFrameSend } = require ( './frame' )
44const { opcodes, sendHints } = require ( './constants' )
5+ const FixedQueue = require ( '../../dispatcher/fixed-queue' )
56
6- /** @type {Uint8Array } */
7+ /** @type {typeof Uint8Array } */
78const FastBuffer = Buffer [ Symbol . species ]
89
10+ /**
11+ * @typedef {object } SendQueueNode
12+ * @property {Promise<void> | null } promise
13+ * @property {((...args: any[]) => any) } callback
14+ * @property {Buffer | null } frame
15+ */
16+
917class SendQueue {
10- #queued = new Set ( )
11- #size = 0
18+ /**
19+ * @type {FixedQueue }
20+ */
21+ #queue = new FixedQueue ( )
22+
23+ /**
24+ * @type {boolean }
25+ */
26+ #running = false
1227
13- /** @type {import('net').Socket } */
28+ /** @type {import('node: net').Socket } */
1429 #socket
1530
1631 constructor ( socket ) {
@@ -19,66 +34,70 @@ class SendQueue {
1934
2035 add ( item , cb , hint ) {
2136 if ( hint !== sendHints . blob ) {
22- const data = clone ( item , hint )
23-
24- if ( this . #size === 0 ) {
25- this . #dispatch ( data , cb , hint )
37+ const frame = createFrame ( item , hint )
38+ if ( ! this . #running ) {
39+ // fast-path
40+ this . #socket . write ( frame , cb )
2641 } else {
27- this . #queued. add ( [ data , cb , true , hint ] )
28- this . #size++
29-
30- this . #run( )
42+ /** @type {SendQueueNode } */
43+ const node = {
44+ promise : null ,
45+ callback : cb ,
46+ frame
47+ }
48+ this . #queue. push ( node )
3149 }
32-
3350 return
3451 }
3552
36- const promise = item . arrayBuffer ( )
37- const queue = [ null , cb , false , hint ]
38- promise . then ( ( ab ) => {
39- queue [ 0 ] = clone ( ab , hint )
40- queue [ 2 ] = true
53+ /** @type {SendQueueNode } */
54+ const node = {
55+ promise : item . arrayBuffer ( ) . then ( ( ab ) => {
56+ node . promise = null
57+ node . frame = createFrame ( ab , hint )
58+ } ) ,
59+ callback : cb ,
60+ frame : null
61+ }
4162
42- this . #run( )
43- } )
63+ this . #queue. push ( node )
4464
45- this . #queued. add ( queue )
46- this . #size++
65+ if ( ! this . #running) {
66+ this . #run( )
67+ }
4768 }
4869
49- #run ( ) {
50- for ( const queued of this . #queued) {
51- const [ data , cb , done , hint ] = queued
52-
53- if ( ! done ) return
54-
55- this . #queued. delete ( queued )
56- this . #size--
57-
58- this . #dispatch( data , cb , hint )
70+ async #run ( ) {
71+ this . #running = true
72+ const queue = this . #queue
73+ while ( ! queue . isEmpty ( ) ) {
74+ const node = queue . shift ( )
75+ // wait pending promise
76+ if ( node . promise !== null ) {
77+ await node . promise
78+ }
79+ // write
80+ this . #socket. write ( node . frame , node . callback )
81+ // cleanup
82+ node . callback = node . frame = null
5983 }
84+ this . #running = false
6085 }
86+ }
6187
62- #dispatch ( data , cb , hint ) {
63- const frame = new WebsocketFrameSend ( )
64- const opcode = hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY
65-
66- frame . frameData = data
67- const buffer = frame . createFrame ( opcode )
68-
69- this . #socket. write ( buffer , cb )
70- }
88+ function createFrame ( data , hint ) {
89+ return new WebsocketFrameSend ( toBuffer ( data , hint ) ) . createFrame ( hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY )
7190}
7291
73- function clone ( data , hint ) {
92+ function toBuffer ( data , hint ) {
7493 switch ( hint ) {
7594 case sendHints . string :
7695 return Buffer . from ( data )
7796 case sendHints . arrayBuffer :
7897 case sendHints . blob :
7998 return new FastBuffer ( data )
8099 case sendHints . typedArray :
81- return Buffer . copyBytesFrom ( data )
100+ return new FastBuffer ( data . buffer , data . byteOffset , data . byteLength )
82101 }
83102}
84103
0 commit comments