@@ -2209,8 +2209,11 @@ void Session::StreamDataBlocked(stream_id id) {
22092209 IncrementStat (&SessionStats::block_count);
22102210
22112211 BaseObjectPtr<Stream> stream = FindStream (id);
2212- if (stream)
2212+ if (stream) {
22132213 stream->OnBlocked ();
2214+ } else {
2215+ Debug (this , " Stream %" PRId64 " not found to block" , id);
2216+ }
22142217}
22152218
22162219void Session::IncrementConnectionCloseAttempts () {
@@ -3268,26 +3271,34 @@ bool Session::Application::SendPendingData() {
32683271 uint8_t * pos = nullptr ;
32693272 size_t packets_sent = 0 ;
32703273 int err;
3274+ std::vector<stream_id> blocking;
3275+ bool ret = false ;
3276+ BaseObjectPtr<Stream> stream;
32713277
32723278 Debug (session (), " Start sending pending data" );
32733279 for (;;) {
3274- ssize_t ndatalen;
32753280 StreamData stream_data;
3281+ ssize_t ndatalen = 0 ;
32763282 err = GetStreamData (&stream_data);
32773283 if (err < 0 ) {
32783284 session ()->set_last_error (kQuicInternalError );
3279- return false ;
3285+ goto end ;
32803286 }
32813287
32823288 // If the packet was sent previously, then packet will have been reset.
32833289 if (!pos) {
32843290 packet = CreateStreamDataPacket ();
3291+ if (!packet) {
3292+ Debug (session (), " Failed to create packet for stream data" );
3293+ session ()->set_last_error (kQuicInternalError );
3294+ goto end;
3295+ }
32853296 pos = packet->data ();
32863297 }
32873298
32883299 // If stream_data.id is -1, then we're not serializing any data for any
32893300 // specific stream. We still need to process QUIC session packets tho.
3290- if (stream_data.id > - 1 ) {
3301+ if (stream_data.id >= 0 ) {
32913302 Debug (session (), " Serializing packets for stream id %" PRId64,
32923303 stream_data.id );
32933304 packet->AddRetained (stream_data.stream ->GetOutboundSource ());
@@ -3322,10 +3333,11 @@ bool Session::Application::SendPendingData() {
33223333 // CONNECTION_CLOSE since even those require a
33233334 // packet number.
33243335 session ()->Close (Session::SessionCloseFlags::SILENT);
3325- return false ;
3336+ goto end ;
33263337 case NGTCP2_ERR_STREAM_DATA_BLOCKED:
3327- Debug (session (), " Stream %lld blocked" , stream_data.id );
3338+ Debug (session (), " Stream %lld blocked session data left %lld " , stream_data.id , session ()-> max_data_left () );
33283339 session ()->StreamDataBlocked (stream_data.id );
3340+ blocking.push_back (stream_data.id );
33293341 if (session ()->max_data_left () == 0 ) {
33303342 if (stream_data.id >= 0 ) {
33313343 Debug (session (), " Resuming %llu after block" , stream_data.id );
@@ -3344,6 +3356,7 @@ bool Session::Application::SendPendingData() {
33443356 CHECK_LE (ndatalen, 0 );
33453357 continue ;
33463358 case NGTCP2_ERR_STREAM_NOT_FOUND:
3359+ Debug (session (), " Stream %lld no found" , stream_data.id );
33473360 continue ;
33483361 case NGTCP2_ERR_WRITE_MORE:
33493362 CHECK_GT (ndatalen, 0 );
@@ -3354,7 +3367,7 @@ bool Session::Application::SendPendingData() {
33543367 if (nwrite != 0 ){ // -ve response i.e error
33553368 packet.reset ();
33563369 session ()->set_last_error (kQuicInternalError );
3357- return false ;
3370+ goto end ;
33583371 }
33593372
33603373 // 0 bytes in this sending operation
@@ -3368,9 +3381,10 @@ bool Session::Application::SendPendingData() {
33683381 Debug (session (), " Congestion limited, but %" PRIu64 " bytes pending" ,
33693382 packet->length ());
33703383 if (!session ()->SendPacket (std::move (packet), path))
3371- return false ;
3384+ goto end ;
33723385 }
3373- return true ;
3386+ ret = true ;
3387+ goto end;
33743388 }
33753389
33763390 pos += nwrite;
@@ -3384,16 +3398,27 @@ bool Session::Application::SendPendingData() {
33843398 Debug (session (), " Sending %" PRIu64 " bytes in serialized packet" , nwrite);
33853399 if (!session ()->SendPacket (std::move (packet), path)) {
33863400 Debug (session (), " -- Failed to send packet" );
3387- return false ;
3401+ goto end ;
33883402 }
33893403 pos = nullptr ;
33903404 if (++packets_sent == kMaxPackets ) {
33913405 Debug (session (), " -- Max packets sent" );
33923406 break ;
33933407 }
33943408 Debug (session (), " -- Looping" );
3409+ } // end for
3410+
3411+
3412+ ret = true ;
3413+ end:
3414+ for (stream_id id : blocking) {
3415+ stream = session ()->FindStream (id);
3416+ if (stream) {
3417+ stream->Unblock ();
3418+ }
33953419 }
3396- return true ;
3420+
3421+ return ret;
33973422}
33983423
33993424void Session::Application::StreamClose (
@@ -3576,13 +3601,24 @@ bool DefaultApplication::ReceiveStreamData(
35763601}
35773602
35783603int DefaultApplication::GetStreamData (StreamData* stream_data) {
3604+ if (stream_queue_.IsEmpty ()) {
3605+ stream_data->id = -1 ;
3606+ return 0 ;
3607+ }
3608+
35793609 Stream* stream = stream_queue_.PopFront ();
3580- stream_data->stream .reset (stream);
35813610 if (stream == nullptr ) {
35823611 stream_data->id = -1 ;
35833612 return 0 ;
35843613 }
35853614 CHECK (!stream->is_destroyed ());
3615+ stream_data->stream .reset (stream);
3616+
3617+ if (stream->IsBlocked ()){
3618+ stream_data->id = -1 ;
3619+ return 0 ;
3620+ }
3621+
35863622 stream_data->id = stream->id ();
35873623 auto next = [&](
35883624 int status,
@@ -3600,7 +3636,6 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) {
36003636
36013637 stream_data->count = count;
36023638 if (count > 0 ) {
3603-
36043639 stream->Schedule (&stream_queue_);
36053640 stream_data->remaining = get_length (data, count);
36063641 } else {
0 commit comments