@@ -31,6 +31,7 @@ import (
3131 "github.com/containerd/ttrpc"
3232 "github.com/containerd/ttrpc/integration/streaming"
3333 "github.com/golang/protobuf/ptypes/empty"
34+ "google.golang.org/protobuf/types/known/emptypb"
3435)
3536
3637func runService (ctx context.Context , t testing.TB , service streaming.TTRPCStreamingService ) (streaming.TTRPCStreamingClient , func ()) {
@@ -190,6 +191,18 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
190191 return sendErr
191192}
192193
194+ func (tss * testStreamingService ) EmptyPayloadStream (_ context.Context , _ * emptypb.Empty , streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer ) error {
195+ if err := streamer .Send (& streaming.EchoPayload {Seq : 1 }); err != nil {
196+ return err
197+ }
198+
199+ if err := streamer .Send (& streaming.EchoPayload {Seq : 2 }); err != nil {
200+ return err
201+ }
202+
203+ return nil
204+ }
205+
193206func TestStreamingService (t * testing.T ) {
194207 ctx , cancel := context .WithCancel (context .Background ())
195208 defer cancel ()
@@ -203,6 +216,7 @@ func TestStreamingService(t *testing.T) {
203216 t .Run ("DivideStream" , divideStreamTest (ctx , client ))
204217 t .Run ("EchoNull" , echoNullTest (ctx , client ))
205218 t .Run ("EchoNullStream" , echoNullStreamTest (ctx , client ))
219+ t .Run ("EmptyPayloadStream" , emptyPayloadStream (ctx , client ))
206220}
207221
208222func echoTest (ctx context.Context , client streaming.TTRPCStreamingClient ) func (t * testing.T ) {
@@ -385,6 +399,36 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
385399 }
386400}
387401
402+ func emptyPayloadStream (ctx context.Context , client streaming.TTRPCStreamingClient ) func (t * testing.T ) {
403+ return func (t * testing.T ) {
404+ ctx , cancel := context .WithTimeout (ctx , 5 * time .Second )
405+ defer cancel ()
406+
407+ stream , err := client .EmptyPayloadStream (ctx , nil )
408+ if err != nil {
409+ t .Fatal (err )
410+ }
411+
412+ for i := uint32 (1 ); i < 3 ; i ++ {
413+ first , err := stream .Recv ()
414+ if err != nil {
415+ t .Fatal (err )
416+ }
417+
418+ if first .Seq != i {
419+ t .Fatalf ("unexpected seq: %d != %d" , first .Seq , i )
420+ }
421+ }
422+
423+ if err := stream .CloseSend (); err != nil {
424+ t .Fatal (err )
425+ }
426+ if _ , err := stream .Recv (); err != io .EOF {
427+ t .Fatalf ("Expected io.EOF, got %v" , err )
428+ }
429+ }
430+ }
431+
388432func assertNextEcho (t testing.TB , a , b * streaming.EchoPayload ) {
389433 t .Helper ()
390434 if a .Msg != b .Msg {
0 commit comments