Skip to content

Commit c51165f

Browse files
committed
First process the pending messages in recv channel
Signed-off-by: Iceber Gu <[email protected]>
1 parent 0ca69a9 commit c51165f

1 file changed

Lines changed: 17 additions & 10 deletions

File tree

client.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -483,23 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
483483
}
484484
defer c.deleteStream(s)
485485

486+
var msg *streamMessage
486487
select {
487488
case <-ctx.Done():
488489
return ctx.Err()
489490
case <-c.ctx.Done():
490491
return ErrClosed
491492
case <-s.recvClose:
492-
return s.recvErr
493-
case msg := <-s.recv:
494-
if msg.header.Type == messageTypeResponse {
495-
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
496-
} else {
497-
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
493+
// If recv has a pending message, process that first
494+
select {
495+
case msg = <-s.recv:
496+
default:
497+
return s.recvErr
498498
}
499+
case msg = <-s.recv:
500+
}
499501

500-
// return the payload buffer for reuse
501-
c.channel.putmbuf(msg.payload)
502-
503-
return err
502+
if msg.header.Type == messageTypeResponse {
503+
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
504+
} else {
505+
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
504506
}
507+
508+
// return the payload buffer for reuse
509+
c.channel.putmbuf(msg.payload)
510+
511+
return err
505512
}

0 commit comments

Comments
 (0)