@@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error {
386386// createStream creates a new stream and registers it with the client
387387// Introduce stream types for multiple or single response
388388func (c * Client ) createStream (flags uint8 , b []byte ) (* stream , error ) {
389- c .streamLock .Lock ()
389+ // sendLock must be held across both allocation of the stream ID and sending it across the wire.
390+ // This ensures that new stream IDs sent on the wire are always increasing, which is a
391+ // requirement of the TTRPC protocol.
392+ // This use of sendLock could be split into another mutex that covers stream creation + first send,
393+ // and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
394+ c .sendLock .Lock ()
395+ defer c .sendLock .Unlock ()
390396
391397 // Check if closed since lock acquired to prevent adding
392398 // anything after cleanup completes
393399 select {
394400 case <- c .ctx .Done ():
395- c .streamLock .Unlock ()
396401 return nil , ErrClosed
397402 default :
398403 }
399404
400- // Stream ID should be allocated at same time
401- s := newStream (c .nextStreamID , c )
402- c .streams [s .id ] = s
403- c .nextStreamID = c .nextStreamID + 2
405+ var s * stream
406+ if err := func () error {
407+ // In the future this could be replaced with a sync.Map instead of streamLock+map.
408+ c .streamLock .Lock ()
409+ defer c .streamLock .Unlock ()
404410
405- c .sendLock .Lock ()
406- defer c .sendLock .Unlock ()
407- c .streamLock .Unlock ()
411+ // Check if closed since lock acquired to prevent adding
412+ // anything after cleanup completes
413+ select {
414+ case <- c .ctx .Done ():
415+ return ErrClosed
416+ default :
417+ }
418+
419+ s = newStream (c .nextStreamID , c )
420+ c .streams [s .id ] = s
421+ c .nextStreamID = c .nextStreamID + 2
422+
423+ return nil
424+ }(); err != nil {
425+ return nil , err
426+ }
408427
409428 if err := c .channel .send (uint32 (s .id ), messageTypeRequest , flags , b ); err != nil {
410429 return s , filterCloseErr (err )
0 commit comments