Skip to content

Commit 90d421e

Browse files
authored
Merge pull request #157 from mxpv/empty_payload
Fix streaming with empty payloads
2 parents baadfd8 + 44ca009 commit 90d421e

5 files changed

Lines changed: 120 additions & 15 deletions

File tree

integration/streaming/test.pb.go

Lines changed: 22 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/streaming/test.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ service Streaming {
3434
rpc DivideStream(Sum) returns (stream Part);
3535
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
3636
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
37+
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
3738
}
3839

3940
message EchoPayload {

integration/streaming/test_ttrpc.pb.go

Lines changed: 55 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/streaming_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/containerd/ttrpc"
3232
"github.com/containerd/ttrpc/integration/streaming"
3333
"github.com/golang/protobuf/ptypes/empty"
34+
"google.golang.org/protobuf/types/known/emptypb"
3435
)
3536

3637
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
@@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
190191
return sendErr
191192
}
192193

194+
func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
195+
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
196+
return err
197+
}
198+
199+
return streamer.Send(&streaming.EchoPayload{Seq: 2})
200+
}
201+
193202
func TestStreamingService(t *testing.T) {
194203
ctx, cancel := context.WithCancel(context.Background())
195204
defer cancel()
@@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) {
203212
t.Run("DivideStream", divideStreamTest(ctx, client))
204213
t.Run("EchoNull", echoNullTest(ctx, client))
205214
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
215+
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
206216
}
207217

208218
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
@@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
385395
}
386396
}
387397

398+
func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
399+
return func(t *testing.T) {
400+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
401+
defer cancel()
402+
403+
stream, err := client.EmptyPayloadStream(ctx, nil)
404+
if err != nil {
405+
t.Fatal(err)
406+
}
407+
408+
for i := uint32(1); i < 3; i++ {
409+
first, err := stream.Recv()
410+
if err != nil {
411+
t.Fatal(err)
412+
}
413+
414+
if first.Seq != i {
415+
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
416+
}
417+
}
418+
419+
if _, err := stream.Recv(); err != io.EOF {
420+
t.Fatalf("Expected io.EOF, got %v", err)
421+
}
422+
}
423+
}
424+
388425
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
389426
t.Helper()
390427
if a.Msg != b.Msg {

services.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
140140
respond(st, p, stream.StreamingServer, true)
141141
}()
142142

143-
if req.Payload != nil {
143+
// Empty proto messages serialized to 0 payloads,
144+
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
145+
// don't get invoked here, which causes hang on client side.
146+
// See https://github.com/containerd/ttrpc/issues/126
147+
if req.Payload != nil || !info.StreamingClient {
144148
unmarshal := func(obj interface{}) error {
145149
return protoUnmarshal(req.Payload, obj)
146150
}

0 commit comments

Comments
 (0)