11'use strict' ;
22
3+ const assert = require ( 'assert' ) ;
34const util = require ( 'util' ) ;
45const Socket = require ( 'net' ) . Socket ;
56const JSStream = process . binding ( 'js_stream' ) . JSStream ;
67const uv = process . binding ( 'uv' ) ;
8+ const debug = util . debuglog ( 'stream_wrap' ) ;
79
810function StreamWrap ( stream ) {
9- var handle = new JSStream ( ) ;
11+ const handle = new JSStream ( ) ;
1012
1113 this . stream = stream ;
1214
13- var self = this ;
15+ this . _list = null ;
16+
17+ const self = this ;
1418 handle . close = function ( cb ) {
15- cb ( ) ;
19+ debug ( 'close' ) ;
20+ self . doClose ( cb ) ;
1621 } ;
1722 handle . isAlive = function ( ) {
1823 return self . isAlive ( ) ;
@@ -27,21 +32,25 @@ function StreamWrap(stream) {
2732 return self . readStop ( ) ;
2833 } ;
2934 handle . onshutdown = function ( req ) {
30- return self . shutdown ( req ) ;
35+ return self . doShutdown ( req ) ;
3136 } ;
3237 handle . onwrite = function ( req , bufs ) {
33- return self . write ( req , bufs ) ;
38+ return self . doWrite ( req , bufs ) ;
3439 } ;
3540
3641 this . stream . pause ( ) ;
42+ this . stream . on ( 'error' , function ( err ) {
43+ self . emit ( 'error' , err ) ;
44+ } ) ;
3745 this . stream . on ( 'data' , function ( chunk ) {
38- self . _handle . readBuffer ( chunk ) ;
46+ debug ( 'data' , chunk . length ) ;
47+ if ( self . _handle )
48+ self . _handle . readBuffer ( chunk ) ;
3949 } ) ;
4050 this . stream . once ( 'end' , function ( ) {
41- self . _handle . emitEOF ( ) ;
42- } ) ;
43- this . stream . on ( 'error' , function ( err ) {
44- self . emit ( 'error' , err ) ;
51+ debug ( 'end' ) ;
52+ if ( self . _handle )
53+ self . _handle . emitEOF ( ) ;
4554 } ) ;
4655
4756 Socket . call ( this , {
@@ -55,11 +64,11 @@ module.exports = StreamWrap;
5564StreamWrap . StreamWrap = StreamWrap ;
5665
5766StreamWrap . prototype . isAlive = function isAlive ( ) {
58- return this . readable && this . writable ;
67+ return true ;
5968} ;
6069
6170StreamWrap . prototype . isClosing = function isClosing ( ) {
62- return ! this . isAlive ( ) ;
71+ return ! this . readable || ! this . writable ;
6372} ;
6473
6574StreamWrap . prototype . readStart = function readStart ( ) {
@@ -72,21 +81,31 @@ StreamWrap.prototype.readStop = function readStop() {
7281 return 0 ;
7382} ;
7483
75- StreamWrap . prototype . shutdown = function shutdown ( req ) {
76- var self = this ;
84+ StreamWrap . prototype . doShutdown = function doShutdown ( req ) {
85+ const self = this ;
86+ const handle = this . _handle ;
87+ const item = this . _enqueue ( 'shutdown' , req ) ;
7788
7889 this . stream . end ( function ( ) {
7990 // Ensure that write was dispatched
8091 setImmediate ( function ( ) {
81- self . _handle . finishShutdown ( req , 0 ) ;
92+ if ( ! self . _dequeue ( item ) )
93+ return ;
94+
95+ handle . finishShutdown ( req , 0 ) ;
8296 } ) ;
8397 } ) ;
8498 return 0 ;
8599} ;
86100
87- StreamWrap . prototype . write = function write ( req , bufs ) {
101+ StreamWrap . prototype . doWrite = function doWrite ( req , bufs ) {
102+ const self = this ;
103+ const handle = self . _handle ;
104+
88105 var pending = bufs . length ;
89- var self = this ;
106+
107+ // Queue the request to be able to cancel it
108+ const item = self . _enqueue ( 'write' , req ) ;
90109
91110 self . stream . cork ( ) ;
92111 bufs . forEach ( function ( buf ) {
@@ -103,6 +122,10 @@ StreamWrap.prototype.write = function write(req, bufs) {
103122
104123 // Ensure that write was dispatched
105124 setImmediate ( function ( ) {
125+ // Do not invoke callback twice
126+ if ( ! self . _dequeue ( item ) )
127+ return ;
128+
106129 var errCode = 0 ;
107130 if ( err ) {
108131 if ( err . code && uv [ 'UV_' + err . code ] )
@@ -111,10 +134,83 @@ StreamWrap.prototype.write = function write(req, bufs) {
111134 errCode = uv . UV_EPIPE ;
112135 }
113136
114- self . _handle . doAfterWrite ( req ) ;
115- self . _handle . finishWrite ( req , errCode ) ;
137+ handle . doAfterWrite ( req ) ;
138+ handle . finishWrite ( req , errCode ) ;
116139 } ) ;
117140 }
118141
119142 return 0 ;
120143} ;
144+
145+ function QueueItem ( type , req ) {
146+ this . type = type ;
147+ this . req = req ;
148+ this . prev = this ;
149+ this . next = this ;
150+ }
151+
152+ StreamWrap . prototype . _enqueue = function enqueue ( type , req ) {
153+ const item = new QueueItem ( type , req ) ;
154+ if ( this . _list === null ) {
155+ this . _list = item ;
156+ return item ;
157+ }
158+
159+ item . next = this . _list . next ;
160+ item . prev = this . _list ;
161+ item . next . prev = item ;
162+ item . prev . next = item ;
163+
164+ return item ;
165+ } ;
166+
167+ StreamWrap . prototype . _dequeue = function dequeue ( item ) {
168+ assert ( item instanceof QueueItem ) ;
169+
170+ var next = item . next ;
171+ var prev = item . prev ;
172+
173+ if ( next === null && prev === null )
174+ return false ;
175+
176+ item . next = null ;
177+ item . prev = null ;
178+
179+ if ( next === item ) {
180+ prev = null ;
181+ next = null ;
182+ } else {
183+ prev . next = next ;
184+ next . prev = prev ;
185+ }
186+
187+ if ( this . _list === item )
188+ this . _list = next ;
189+
190+ return true ;
191+ } ;
192+
193+ StreamWrap . prototype . doClose = function doClose ( cb ) {
194+ const self = this ;
195+ const handle = self . _handle ;
196+
197+ setImmediate ( function ( ) {
198+ while ( self . _list !== null ) {
199+ const item = self . _list ;
200+ const req = item . req ;
201+ self . _dequeue ( item ) ;
202+
203+ const errCode = uv . UV_ECANCELED ;
204+ if ( item . type === 'write' ) {
205+ handle . doAfterWrite ( req ) ;
206+ handle . finishWrite ( req , errCode ) ;
207+ } else if ( item . type === 'shutdown' ) {
208+ handle . finishShutdown ( req , errCode ) ;
209+ }
210+ }
211+
212+ // Should be already set by net.js
213+ assert ( self . _handle === null ) ;
214+ cb ( ) ;
215+ } ) ;
216+ } ;
0 commit comments