Skip to content

Commit f321666

Browse files
authored
refactor(pubsub): update proto library dependency for linter (#3793)
1 parent 653082d commit f321666

15 files changed

Lines changed: 62 additions & 99 deletions

pubsub/apiv1/mock_test.go

Lines changed: 3 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pubsub/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ require (
1414
google.golang.org/api v0.41.0
1515
google.golang.org/genproto v0.0.0-20210311153111-e2979279ddde
1616
google.golang.org/grpc v1.36.0
17+
google.golang.org/protobuf v1.25.0
1718
)

pubsub/integration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"cloud.google.com/go/internal/version"
3333
kms "cloud.google.com/go/kms/apiv1"
3434
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
35-
"github.com/golang/protobuf/proto"
3635
gax "github.com/googleapis/gax-go/v2"
3736
"golang.org/x/oauth2/google"
3837
"google.golang.org/api/iterator"
@@ -43,6 +42,8 @@ import (
4342
"google.golang.org/grpc/codes"
4443
"google.golang.org/grpc/metadata"
4544
"google.golang.org/grpc/status"
45+
"google.golang.org/protobuf/encoding/protowire"
46+
"google.golang.org/protobuf/proto"
4647
)
4748

4849
var (
@@ -292,7 +293,7 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
292293
})
293294
if err != nil {
294295
if c := status.Convert(err); c.Code() == codes.Canceled {
295-
if time.Now().Sub(now) >= time.Minute {
296+
if time.Since(now) >= time.Minute {
296297
t.Fatal("pullN took too long")
297298
}
298299
} else {
@@ -390,8 +391,8 @@ func TestIntegration_LargePublishSize(t *testing.T) {
390391
length := MaxPublishRequestBytes - calcFieldSizeString(topic.String())
391392
// Next, account for the overhead from encoding an individual PubsubMessage,
392393
// and the inner PubsubMessage.Data field.
393-
pbMsgOverhead := 1 + proto.SizeVarint(uint64(length))
394-
dataOverhead := 1 + proto.SizeVarint(uint64(length-pbMsgOverhead))
394+
pbMsgOverhead := 1 + protowire.SizeVarint(uint64(length))
395+
dataOverhead := 1 + protowire.SizeVarint(uint64(length-pbMsgOverhead))
395396
maxLengthSingleMessage := length - pbMsgOverhead - dataOverhead
396397

397398
publishReq := &pb.PublishRequest{

pubsub/iterator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import (
2323

2424
vkit "cloud.google.com/go/pubsub/apiv1"
2525
"cloud.google.com/go/pubsub/internal/distribution"
26-
"github.com/golang/protobuf/proto"
2726
gax "github.com/googleapis/gax-go/v2"
2827
pb "google.golang.org/genproto/googleapis/pubsub/v1"
2928
"google.golang.org/grpc"
3029
"google.golang.org/grpc/codes"
3130
"google.golang.org/grpc/status"
31+
"google.golang.org/protobuf/encoding/protowire"
3232
)
3333

3434
// Between message receipt and ack (that is, the time spent processing a message) we want to extend the message
@@ -536,7 +536,7 @@ func (it *messageIterator) pingStream() {
536536
func calcFieldSizeString(fields ...string) int {
537537
overhead := 0
538538
for _, field := range fields {
539-
overhead += 1 + len(field) + proto.SizeVarint(uint64(len(field)))
539+
overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field)))
540540
}
541541
return overhead
542542
}
@@ -546,7 +546,7 @@ func calcFieldSizeString(fields ...string) int {
546546
func calcFieldSizeInt(fields ...int) int {
547547
overhead := 0
548548
for _, field := range fields {
549-
overhead += 1 + proto.SizeVarint(uint64(field))
549+
overhead += 1 + protowire.SizeVarint(uint64(field))
550550
}
551551
return overhead
552552
}

pubsub/loadtest/loadtest.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131

3232
"cloud.google.com/go/pubsub"
3333
pb "cloud.google.com/go/pubsub/loadtest/pb"
34-
"github.com/golang/protobuf/ptypes"
3534
"golang.org/x/time/rate"
3635
)
3736

@@ -56,10 +55,7 @@ func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartR
5655
if err != nil {
5756
return nil, err
5857
}
59-
dur, err := ptypes.Duration(req.PublishBatchDuration)
60-
if err != nil {
61-
return nil, err
62-
}
58+
dur := req.PublishBatchDuration.AsDuration()
6359
l.init(c, req.Topic, req.MessageSize, req.PublishBatchSize, dur)
6460
log.Println("started")
6561
return &pb.StartResponse{}, nil

pubsub/message.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"time"
2020

2121
ipubsub "cloud.google.com/go/internal/pubsub"
22-
"github.com/golang/protobuf/ptypes"
2322
pb "google.golang.org/genproto/googleapis/pubsub/v1"
2423
)
2524

@@ -72,10 +71,7 @@ func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDon
7271
return msg, nil
7372
}
7473

75-
pubTime, err := ptypes.Timestamp(resp.Message.PublishTime)
76-
if err != nil {
77-
return nil, err
78-
}
74+
pubTime := resp.Message.PublishTime.AsTime()
7975

8076
var deliveryAttempt *int
8177
if resp.DeliveryAttempt > 0 {

pubsub/mock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
"time"
2424

2525
"cloud.google.com/go/internal/testutil"
26-
emptypb "github.com/golang/protobuf/ptypes/empty"
2726
pb "google.golang.org/genproto/googleapis/pubsub/v1"
27+
"google.golang.org/protobuf/types/known/emptypb"
2828
)
2929

3030
type mockServer struct {

pubsub/pstest/fake.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ import (
3434
"time"
3535

3636
"cloud.google.com/go/internal/testutil"
37-
"github.com/golang/protobuf/ptypes"
38-
durpb "github.com/golang/protobuf/ptypes/duration"
39-
emptypb "github.com/golang/protobuf/ptypes/empty"
4037
pb "google.golang.org/genproto/googleapis/pubsub/v1"
4138
"google.golang.org/grpc/codes"
4239
"google.golang.org/grpc/status"
40+
durpb "google.golang.org/protobuf/types/known/durationpb"
41+
"google.golang.org/protobuf/types/known/emptypb"
42+
"google.golang.org/protobuf/types/known/timestamppb"
4343
)
4444

4545
// ReactorOptions is a map that Server uses to look up reactors.
@@ -455,11 +455,11 @@ const (
455455
maxMessageRetentionDuration = 168 * time.Hour
456456
)
457457

458-
var defaultMessageRetentionDuration = ptypes.DurationProto(maxMessageRetentionDuration)
458+
var defaultMessageRetentionDuration = durpb.New(maxMessageRetentionDuration)
459459

460460
func checkMRD(pmrd *durpb.Duration) error {
461-
mrd, err := ptypes.Duration(pmrd)
462-
if err != nil || mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
461+
mrd := pmrd.AsDuration()
462+
if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
463463
return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd)
464464
}
465465
return nil
@@ -619,10 +619,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis
619619
s.nextID++
620620
pm.MessageId = id
621621
pubTime := s.timeNowFunc()
622-
tsPubTime, err := ptypes.TimestampProto(pubTime)
623-
if err != nil {
624-
return nil, status.Errorf(codes.Internal, err.Error())
625-
}
622+
tsPubTime := timestamppb.New(pubTime)
626623
pm.PublishTime = tsPubTime
627624
m := &Message{
628625
ID: id,
@@ -833,11 +830,7 @@ func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekRespon
833830
case nil:
834831
return nil, status.Errorf(codes.InvalidArgument, "missing Seek target type")
835832
case *pb.SeekRequest_Time:
836-
var err error
837-
target, err = ptypes.Timestamp(v.Time)
838-
if err != nil {
839-
return nil, status.Errorf(codes.InvalidArgument, "bad Time target: %v", err)
840-
}
833+
target = v.Time.AsTime()
841834
default:
842835
return nil, status.Errorf(codes.Unimplemented, "unhandled Seek target type %T", v)
843836
}
@@ -985,10 +978,7 @@ func (s *subscription) maintainMessages(now time.Time) {
985978
if m.outstanding() && now.After(m.ackDeadline) {
986979
m.makeAvailable()
987980
}
988-
pubTime, err := ptypes.Timestamp(m.proto.Message.PublishTime)
989-
if err != nil {
990-
panic(err)
991-
}
981+
pubTime := m.proto.Message.PublishTime.AsTime()
992982
// Remove messages that have been undelivered for a long time.
993983
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
994984
delete(s.msgs, id)

pubsub/pstest/fake_test.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ import (
2525
"time"
2626

2727
"cloud.google.com/go/internal/testutil"
28-
"github.com/golang/protobuf/ptypes"
2928
pb "google.golang.org/genproto/googleapis/pubsub/v1"
3029
"google.golang.org/genproto/protobuf/field_mask"
3130
"google.golang.org/grpc"
3231
"google.golang.org/grpc/codes"
3332
"google.golang.org/grpc/status"
33+
"google.golang.org/protobuf/types/known/durationpb"
34+
"google.golang.org/protobuf/types/known/timestamppb"
3435
)
3536

3637
func TestTopics(t *testing.T) {
@@ -185,7 +186,7 @@ func TestSubscriptionErrors(t *testing.T) {
185186
checkCode(err, codes.NotFound)
186187
_, err = sclient.Seek(ctx, &pb.SeekRequest{})
187188
checkCode(err, codes.InvalidArgument)
188-
srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}
189+
srt := &pb.SeekRequest_Time{Time: timestamppb.Now()}
189190
_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt})
190191
checkCode(err, codes.InvalidArgument)
191192
_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"})
@@ -278,10 +279,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages
278279
if err != nil {
279280
t.Fatal(err)
280281
}
281-
tsPubTime, err := ptypes.TimestampProto(pubTime)
282-
if err != nil {
283-
t.Fatal(err)
284-
}
282+
tsPubTime := timestamppb.New(pubTime)
285283
want := map[string]*pb.PubsubMessage{}
286284
for i, id := range res.MessageIds {
287285
want[id] = &pb.PubsubMessage{
@@ -639,7 +637,7 @@ func TestSeek(t *testing.T) {
639637
Topic: top.Name,
640638
AckDeadlineSeconds: 10,
641639
})
642-
ts := ptypes.TimestampNow()
640+
ts := timestamppb.Now()
643641
_, err := sclient.Seek(context.Background(), &pb.SeekRequest{
644642
Subscription: sub.Name,
645643
Target: &pb.SeekRequest_Time{Time: ts},
@@ -700,7 +698,7 @@ func TestTimeNowFunc(t *testing.T) {
700698

701699
m := s.Message(id)
702700
if m == nil {
703-
t.Error("got nil, want a message")
701+
t.Fatalf("got nil, want a message")
704702
}
705703
if got, want := m.PublishTime, timeFunc(); got != want {
706704
t.Fatalf("got %v, want %v", got, want)
@@ -797,8 +795,8 @@ func TestUpdateRetryPolicy(t *testing.T) {
797795
Name: "projects/P/subscriptions/S",
798796
Topic: top.Name,
799797
RetryPolicy: &pb.RetryPolicy{
800-
MinimumBackoff: ptypes.DurationProto(10 * time.Second),
801-
MaximumBackoff: ptypes.DurationProto(60 * time.Second),
798+
MinimumBackoff: durationpb.New(10 * time.Second),
799+
MaximumBackoff: durationpb.New(60 * time.Second),
802800
},
803801
})
804802

@@ -807,8 +805,8 @@ func TestUpdateRetryPolicy(t *testing.T) {
807805
Name: sub.Name,
808806
Topic: top.Name,
809807
RetryPolicy: &pb.RetryPolicy{
810-
MinimumBackoff: ptypes.DurationProto(20 * time.Second),
811-
MaximumBackoff: ptypes.DurationProto(100 * time.Second),
808+
MinimumBackoff: durationpb.New(20 * time.Second),
809+
MaximumBackoff: durationpb.New(100 * time.Second),
812810
},
813811
}
814812

@@ -1007,7 +1005,7 @@ func TestErrorInjection(t *testing.T) {
10071005
},
10081006
{
10091007
funcName: "Seek",
1010-
param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}},
1008+
param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: timestamppb.Now()}},
10111009
},
10121010
}
10131011

pubsub/pullstream.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import (
2828
// A pullStream supports the methods of a StreamingPullClient, but re-opens
2929
// the stream on a retryable error.
3030
type pullStream struct {
31-
ctx context.Context
32-
open func() (pb.Subscriber_StreamingPullClient, error)
31+
ctx context.Context
32+
open func() (pb.Subscriber_StreamingPullClient, error)
33+
cancel context.CancelFunc
3334

3435
mu sync.Mutex
3536
spc *pb.Subscriber_StreamingPullClient
@@ -41,8 +42,10 @@ type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_S
4142

4243
func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
4344
ctx = withSubscriptionKey(ctx, subName)
45+
ctx, cancel := context.WithCancel(ctx)
4446
return &pullStream{
45-
ctx: ctx,
47+
ctx: ctx,
48+
cancel: cancel,
4649
open: func() (pb.Subscriber_StreamingPullClient, error) {
4750
spc, err := streamingPull(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
4851
if err == nil {

0 commit comments

Comments
 (0)