@@ -60,9 +60,10 @@ QuicStream::QuicStream(
6060 StreamBase(session->env ()),
6161 session_(session),
6262 stream_id_(stream_id),
63- flags_(QUIC_STREAM_FLAG_NONE ),
63+ flags_(QUICSTREAM_FLAG_INITIAL ),
6464 available_outbound_length_(0 ) {
6565 CHECK_NOT_NULL (session);
66+ SetInitialFlags ();
6667 session->AddStream (this );
6768 StreamBase::AttachToObject (GetObject ());
6869 PushStreamListener (&stream_listener_);
@@ -74,14 +75,43 @@ QuicStream::~QuicStream() {
7475 CHECK_EQ (0 , streambuf_.Length ());
7576}
7677
78+ inline void QuicStream::SetInitialFlags () {
79+ if (GetDirection () == QUIC_STREAM_UNIDIRECTIONAL) {
80+ if (session_->IsServer ()) {
81+ switch (GetOrigin ()) {
82+ case QUIC_STREAM_SERVER:
83+ SetReadClose ();
84+ break ;
85+ case QUIC_STREAM_CLIENT:
86+ SetWriteClose ();
87+ break ;
88+ default :
89+ UNREACHABLE ();
90+ }
91+ } else {
92+ switch (GetOrigin ()) {
93+ case QUIC_STREAM_SERVER:
94+ SetWriteClose ();
95+ break ;
96+ case QUIC_STREAM_CLIENT:
97+ SetReadClose ();
98+ break ;
99+ default :
100+ UNREACHABLE ();
101+ }
102+ }
103+ }
104+ }
105+
77106// QuicStream::Close() is called by the QuicSession when ngtcp2 detects that
78107// a stream has been closed. This, in turn, calls out to the JavaScript to
79108// start the process of tearing down and destroying the QuicStream instance.
80109void QuicStream::Close (uint16_t app_error_code) {
81110 Debug (this , " Stream %llu closed with code %d" , GetID (), app_error_code);
111+ SetReadClose ();
112+ SetWriteClose ();
82113 HandleScope scope (env ()->isolate ());
83114 Context::Scope context_context (env ()->context ());
84- flags_ |= QUIC_STREAM_FLAG_CLOSED;
85115 Local<Value> arg = Number::New (env ()->isolate (), app_error_code);
86116 MakeCallback (env ()->quic_on_stream_close_function (), 1 , &arg);
87117}
@@ -108,15 +138,23 @@ void QuicStream::Reset(uint64_t final_size, uint16_t app_error_code) {
108138}
109139
110140void QuicStream::Destroy () {
141+ SetReadClose ();
142+ SetWriteClose ();
111143 streambuf_.Cancel ();
112144 session_->RemoveStream (stream_id_);
113145 session_ = nullptr ;
114146}
115147
148+ // Do shutdown is called when the JS stream writable side is closed.
149+ // We want to mark the writable side closed and send pending data.
116150int QuicStream::DoShutdown (ShutdownWrap* req_wrap) {
117151 if (IsDestroyed ())
118152 return UV_EPIPE;
119- flags_ |= QUIC_STREAM_FLAG_SHUT;
153+ // Do nothing if the stream was already shutdown. Specifically,
154+ // we should not attempt to send anything on the QuicSession
155+ if (!IsWritable ())
156+ return 1 ;
157+ SetWriteClose ();
120158 session_->SendStreamData (this );
121159 return 1 ;
122160}
@@ -128,7 +166,9 @@ int QuicStream::DoWrite(
128166 uv_stream_t * send_handle) {
129167 CHECK_NULL (send_handle);
130168
131- if (IsDestroyed ()) {
169+ // A write should not have happened if we've been destroyed or
170+ // the QuicStream is no longer writable.
171+ if (IsDestroyed () || !IsWritable ()) {
132172 req_wrap->Done (UV_EOF);
133173 return 0 ;
134174 }
@@ -174,23 +214,17 @@ int QuicStream::DoWrite(
174214 // to be careful not to allow the internal buffer to grow
175215 // too large, or we'll run into several other problems.
176216
177- uint64_t len = streambuf_.Copy (bufs, nbufs);
217+ streambuf_.Copy (bufs, nbufs);
178218 req_wrap->Done (0 );
219+ session_->SendStreamData (this );
179220
180221 // IncrementAvailableOutboundLength(len);
181- session_->SendStreamData (this );
182222 return 0 ;
183223}
184224
185- uint64_t QuicStream::GetID () const {
186- return stream_id_;
187- }
188-
189- QuicSession* QuicStream::Session () {
190- return session_;
191- }
192-
193225void QuicStream::AckedDataOffset (uint64_t offset, size_t datalen) {
226+ if (IsDestroyed ())
227+ return ;
194228 streambuf_.Consume (datalen);
195229}
196230
@@ -212,40 +246,34 @@ inline void QuicStream::DecrementAvailableOutboundLength(size_t amount) {
212246 available_outbound_length_ -= amount;
213247}
214248
215- QuicStream* QuicStream::New (
216- QuicSession* session,
217- uint64_t stream_id) {
218- Local<Object> obj;
219- if (!session->env ()
220- ->quicserverstream_constructor_template ()
221- ->NewInstance (session->env ()->context ()).ToLocal (&obj)) {
222- return nullptr ;
223- }
224- return new QuicStream (session, obj, stream_id);
225- }
226-
227249int QuicStream::ReadStart () {
228250 CHECK (!this ->IsDestroyed ());
229- Debug ( this , " Reading started. " );
230- flags_ |= QUIC_STREAM_FLAG_READ_START ;
231- flags_ &= ~QUIC_STREAM_FLAG_READ_PAUSED ;
251+ CHECK ( IsReadable () );
252+ SetReadStart () ;
253+ SetReadResume () ;
232254 return 0 ;
233255}
234256
235257int QuicStream::ReadStop () {
236258 CHECK (!this ->IsDestroyed ());
237- if (!IsReading ())
238- return 0 ;
239- Debug (this , " Reading stopped" );
240- flags_ |= QUIC_STREAM_FLAG_READ_PAUSED;
259+ CHECK (IsReadable ());
260+ SetReadPause ();
241261 return 0 ;
242262}
243263
244264// Passes chunks of data on to the JavaScript side as soon as they are
245- // received. The caller of this must have a HandleScope.
265+ // received but only if we're still readable. The caller of this must have a
266+ // HandleScope.
267+ // TODO(@jasnell): There's currently no flow control here. The data is pushed
268+ // up to the JavaScript side regardless of whether the JS stream is flowing and
269+ // the connected peer is told to keep sending. We need to implement back
270+ // pressure.
246271void QuicStream::ReceiveData (int fin, const uint8_t * data, size_t datalen) {
247- Debug (this , " Receiving %d bytes of data. Final? %s" ,
248- datalen, fin ? " yes" : " no" );
272+ Debug (this , " Receiving %d bytes of data. Final? %s. Readable? %s" ,
273+ datalen, fin ? " yes" : " no" , IsReadable () ? " yes" : " no" );
274+
275+ if (!IsReadable ())
276+ return ;
249277
250278 while (datalen > 0 ) {
251279 uv_buf_t buf = EmitAlloc (datalen);
@@ -265,8 +293,24 @@ void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) {
265293 EmitRead (avail, buf);
266294 };
267295
268- if (fin)
296+ // When fin != 0, we've received that last chunk of data for this
297+ // stream, indicating that the stream is no longer readable.
298+ if (fin) {
299+ SetReadClose ();
269300 EmitRead (UV_EOF);
301+ }
302+ }
303+
304+ QuicStream* QuicStream::New (
305+ QuicSession* session,
306+ uint64_t stream_id) {
307+ Local<Object> obj;
308+ if (!session->env ()
309+ ->quicserverstream_constructor_template ()
310+ ->NewInstance (session->env ()->context ()).ToLocal (&obj)) {
311+ return nullptr ;
312+ }
313+ return new QuicStream (session, obj, stream_id);
270314}
271315
272316// JavaScript API
0 commit comments