Skip to content

Commit ef57342

Browse files
authored
Merge pull request #168 from kevpar/deadlock
client: Fix deadlock when writing to pipe blocks
2 parents aa5f2d4 + 1b4f6f8 commit ef57342

1 file changed

Lines changed: 28 additions & 9 deletions

File tree

client.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error {
386386
// createStream creates a new stream and registers it with the client
387387
// Introduce stream types for multiple or single response
388388
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
389-
c.streamLock.Lock()
389+
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
390+
// This ensures that new stream IDs sent on the wire are always increasing, which is a
391+
// requirement of the TTRPC protocol.
392+
// This use of sendLock could be split into another mutex that covers stream creation + first send,
393+
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
394+
c.sendLock.Lock()
395+
defer c.sendLock.Unlock()
390396

391397
// Check if closed since lock acquired to prevent adding
392398
// anything after cleanup completes
393399
select {
394400
case <-c.ctx.Done():
395-
c.streamLock.Unlock()
396401
return nil, ErrClosed
397402
default:
398403
}
399404

400-
// Stream ID should be allocated at same time
401-
s := newStream(c.nextStreamID, c)
402-
c.streams[s.id] = s
403-
c.nextStreamID = c.nextStreamID + 2
405+
var s *stream
406+
if err := func() error {
407+
// In the future this could be replaced with a sync.Map instead of streamLock+map.
408+
c.streamLock.Lock()
409+
defer c.streamLock.Unlock()
404410

405-
c.sendLock.Lock()
406-
defer c.sendLock.Unlock()
407-
c.streamLock.Unlock()
411+
// Check if closed since lock acquired to prevent adding
412+
// anything after cleanup completes
413+
select {
414+
case <-c.ctx.Done():
415+
return ErrClosed
416+
default:
417+
}
418+
419+
s = newStream(c.nextStreamID, c)
420+
c.streams[s.id] = s
421+
c.nextStreamID = c.nextStreamID + 2
422+
423+
return nil
424+
}(); err != nil {
425+
return nil, err
426+
}
408427

409428
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
410429
return s, filterCloseErr(err)

0 commit comments

Comments
 (0)