Skip to content

Commit 1e47334

Browse files
authored
Decouple transport flow control from application read. (#1265)
* Decouple transport flow control from application read. * post-review update * Added comment in http2_server as well. * Added another test * Fixed typos in comments.
1 parent a113590 commit 1e47334

4 files changed

Lines changed: 209 additions & 81 deletions

File tree

transport/control.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,3 @@ func (f *inFlow) onRead(n uint32) uint32 {
240240
}
241241
return 0
242242
}
243-
244-
func (f *inFlow) resetPendingData() uint32 {
245-
f.mu.Lock()
246-
defer f.mu.Unlock()
247-
n := f.pendingData
248-
f.pendingData = 0
249-
return n
250-
}

transport/http2_client.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -595,11 +595,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
595595
s.mu.Lock()
596596
rstStream = s.rstStream
597597
rstError = s.rstError
598-
if q := s.fc.resetPendingData(); q > 0 {
599-
if n := t.fc.onRead(q); n > 0 {
600-
t.controlBuf.put(&windowUpdate{0, n})
601-
}
602-
}
603598
if s.state == streamDone {
604599
s.mu.Unlock()
605600
return
@@ -831,9 +826,6 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
831826
if s.state == streamDone {
832827
return
833828
}
834-
if w := t.fc.onRead(n); w > 0 {
835-
t.controlBuf.put(&windowUpdate{0, w})
836-
}
837829
if w := s.fc.onRead(n); w > 0 {
838830
t.controlBuf.put(&windowUpdate{s.id, w})
839831
}
@@ -845,22 +837,26 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
845837
t.notifyError(connectionErrorf(true, err, "%v", err))
846838
return
847839
}
840+
// Decouple connection's flow control from application's read.
841+
// An update on connection's flow control should not depend on
842+
// whether user application has read the data or not. Such a
843+
// restriction is already imposed on the stream's flow control,
844+
// and therefore the sender will be blocked anyways.
845+
// Decoupling the connection flow control will prevent other
846+
// active(fast) streams from starving in presence of slow or
847+
// inactive streams.
848+
if w := t.fc.onRead(uint32(size)); w > 0 {
849+
t.controlBuf.put(&windowUpdate{0, w})
850+
}
848851
// Select the right stream to dispatch.
849852
s, ok := t.getStream(f)
850853
if !ok {
851-
if w := t.fc.onRead(uint32(size)); w > 0 {
852-
t.controlBuf.put(&windowUpdate{0, w})
853-
}
854854
return
855855
}
856856
if size > 0 {
857857
s.mu.Lock()
858858
if s.state == streamDone {
859859
s.mu.Unlock()
860-
// The stream has been closed. Release the corresponding quota.
861-
if w := t.fc.onRead(uint32(size)); w > 0 {
862-
t.controlBuf.put(&windowUpdate{0, w})
863-
}
864860
return
865861
}
866862
if err := s.fc.onData(uint32(size)); err != nil {
@@ -872,9 +868,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
872868
return
873869
}
874870
if f.Header().Flags.Has(http2.FlagDataPadded) {
875-
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
876-
t.controlBuf.put(&windowUpdate{0, w})
877-
}
878871
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
879872
t.controlBuf.put(&windowUpdate{s.id, w})
880873
}

transport/http2_server.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -462,9 +462,6 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
462462
if s.state == streamDone {
463463
return
464464
}
465-
if w := t.fc.onRead(n); w > 0 {
466-
t.controlBuf.put(&windowUpdate{0, w})
467-
}
468465
if w := s.fc.onRead(n); w > 0 {
469466
t.controlBuf.put(&windowUpdate{s.id, w})
470467
}
@@ -477,22 +474,26 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
477474
t.Close()
478475
return
479476
}
477+
// Decouple connection's flow control from application's read.
478+
// An update on connection's flow control should not depend on
479+
// whether user application has read the data or not. Such a
480+
// restriction is already imposed on the stream's flow control,
481+
// and therefore the sender will be blocked anyways.
482+
// Decoupling the connection flow control will prevent other
483+
// active(fast) streams from starving in presence of slow or
484+
// inactive streams.
485+
if w := t.fc.onRead(uint32(size)); w > 0 {
486+
t.controlBuf.put(&windowUpdate{0, w})
487+
}
480488
// Select the right stream to dispatch.
481489
s, ok := t.getStream(f)
482490
if !ok {
483-
if w := t.fc.onRead(uint32(size)); w > 0 {
484-
t.controlBuf.put(&windowUpdate{0, w})
485-
}
486491
return
487492
}
488493
if size > 0 {
489494
s.mu.Lock()
490495
if s.state == streamDone {
491496
s.mu.Unlock()
492-
// The stream has been closed. Release the corresponding quota.
493-
if w := t.fc.onRead(uint32(size)); w > 0 {
494-
t.controlBuf.put(&windowUpdate{0, w})
495-
}
496497
return
497498
}
498499
if err := s.fc.onData(uint32(size)); err != nil {
@@ -502,9 +503,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
502503
return
503504
}
504505
if f.Header().Flags.Has(http2.FlagDataPadded) {
505-
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
506-
t.controlBuf.put(&windowUpdate{0, w})
507-
}
508506
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
509507
t.controlBuf.put(&windowUpdate{s.id, w})
510508
}
@@ -1066,11 +1064,6 @@ func (t *http2Server) closeStream(s *Stream) {
10661064
// called to interrupt the potential blocking on other goroutines.
10671065
s.cancel()
10681066
s.mu.Lock()
1069-
if q := s.fc.resetPendingData(); q > 0 {
1070-
if w := t.fc.onRead(q); w > 0 {
1071-
t.controlBuf.put(&windowUpdate{0, w})
1072-
}
1073-
}
10741067
if s.state == streamDone {
10751068
s.mu.Unlock()
10761069
return

0 commit comments

Comments
 (0)