Skip to content

Commit 1a11675

Browse files
authored
feat(pubsub): add support for cloud storage ingestion topics (#10959)
* feat(pubsub): add support for cloud storage ingestion topics * use getters for go protos * make interface private, add export strings * fix comment on pubsub avro format
1 parent 3377a3c commit 1a11675

File tree

2 files changed

+215
-0
lines changed

2 files changed

+215
-0
lines changed

pubsub/topic.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"google.golang.org/protobuf/proto"
4545
"google.golang.org/protobuf/types/known/durationpb"
4646
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
47+
"google.golang.org/protobuf/types/known/timestamppb"
4748
)
4849

4950
const (
@@ -350,6 +351,9 @@ type TopicConfigToUpdate struct {
350351
// IngestionDataSourceSettings are settings for ingestion from a
351352
// data source into this topic.
352353
//
354+
// When changing this value, the entire data source settings object must be applied,
355+
// rather than just the differences.
356+
//
353357
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
354358
IngestionDataSourceSettings *IngestionDataSourceSettings
355359
}
@@ -495,6 +499,97 @@ func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
495499
return true
496500
}
497501

502+
// CloudStorageIngestionState denotes the possible states for ingestion from Cloud Storage.
503+
type CloudStorageIngestionState int
504+
505+
const (
506+
// CloudStorageIngestionStateUnspecified is the default value. This value is unused.
507+
CloudStorageIngestionStateUnspecified = iota
508+
509+
// CloudStorageIngestionStateActive means ingestion is active.
510+
CloudStorageIngestionStateActive
511+
512+
// CloudStorageIngestionPermissionDenied means encountering an error while calling the Cloud Storage API.
513+
// This can happen if the Pub/Sub SA has not been granted the
514+
// [appropriate permissions](https://cloud.google.com/storage/docs/access-control/iam-permissions):
515+
// - storage.objects.list: to list the objects in a bucket.
516+
// - storage.objects.get: to read the objects in a bucket.
517+
// - storage.buckets.get: to verify the bucket exists.
518+
CloudStorageIngestionPermissionDenied
519+
520+
// CloudStorageIngestionPublishPermissionDenied means encountering an error when publishing to the topic.
521+
// This can happen if the Pub/Sub SA has not been granted the [appropriate publish
522+
// permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher)
523+
CloudStorageIngestionPublishPermissionDenied
524+
525+
// CloudStorageIngestionBucketNotFound means the provided bucket doesn't exist.
526+
CloudStorageIngestionBucketNotFound
527+
528+
// CloudStorageIngestionTooManyObjects means the bucket has too many objects, ingestion will be paused.
529+
CloudStorageIngestionTooManyObjects
530+
)
531+
532+
// IngestionDataSourceCloudStorage are ingestion settings for Cloud Storage.
533+
type IngestionDataSourceCloudStorage struct {
534+
// State is an output-only field indicating the state of the Cloud storage ingestion source.
535+
State CloudStorageIngestionState
536+
537+
// Bucket is the Cloud Storage bucket. The bucket name must be without any
538+
// prefix like "gs://". See the bucket naming requirements (https://cloud.google.com/storage/docs/buckets#naming).
539+
Bucket string
540+
541+
// InputFormat is the format of objects in Cloud Storage.
542+
// Defaults to TextFormat.
543+
InputFormat ingestionDataSourceCloudStorageInputFormat
544+
545+
// MinimumObjectCreateTime means objects with a larger or equal creation timestamp will be
546+
// ingested.
547+
MinimumObjectCreateTime time.Time
548+
549+
// MatchGlob is the pattern used to match objects that will be ingested. If
550+
// empty, all objects will be ingested. See the [supported
551+
// patterns](https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-objects-and-prefixes-using-glob).
552+
MatchGlob string
553+
}
554+
555+
var _ IngestionDataSource = (*IngestionDataSourceCloudStorage)(nil)
556+
557+
func (i *IngestionDataSourceCloudStorage) isIngestionDataSource() bool {
558+
return true
559+
}
560+
561+
type ingestionDataSourceCloudStorageInputFormat interface {
562+
isCloudStorageIngestionInputFormat() bool
563+
}
564+
565+
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageTextFormat)(nil)
566+
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageAvroFormat)(nil)
567+
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStoragePubSubAvroFormat)(nil)
568+
569+
// IngestionDataSourceCloudStorageTextFormat means Cloud Storage data will be interpreted as text.
570+
type IngestionDataSourceCloudStorageTextFormat struct {
571+
Delimiter string
572+
}
573+
574+
func (i *IngestionDataSourceCloudStorageTextFormat) isCloudStorageIngestionInputFormat() bool {
575+
return true
576+
}
577+
578+
// IngestionDataSourceCloudStorageAvroFormat means Cloud Storage data will be interpreted in Avro format.
579+
type IngestionDataSourceCloudStorageAvroFormat struct{}
580+
581+
func (i *IngestionDataSourceCloudStorageAvroFormat) isCloudStorageIngestionInputFormat() bool {
582+
return true
583+
}
584+
585+
// IngestionDataSourceCloudStoragePubSubAvroFormat is used assuming the data was written using Cloud
586+
// Storage subscriptions https://cloud.google.com/pubsub/docs/cloudstorage.
587+
type IngestionDataSourceCloudStoragePubSubAvroFormat struct{}
588+
589+
func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestionInputFormat() bool {
590+
return true
591+
}
592+
498593
func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
499594
if pbs == nil {
500595
return nil
@@ -509,6 +604,25 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
509604
AWSRoleARN: k.GetAwsRoleArn(),
510605
GCPServiceAccount: k.GetGcpServiceAccount(),
511606
}
607+
} else if cs := pbs.GetCloudStorage(); cs != nil {
608+
var format ingestionDataSourceCloudStorageInputFormat
609+
switch t := cs.InputFormat.(type) {
610+
case *pb.IngestionDataSourceSettings_CloudStorage_TextFormat_:
611+
format = &IngestionDataSourceCloudStorageTextFormat{
612+
Delimiter: *t.TextFormat.Delimiter,
613+
}
614+
case *pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_:
615+
format = &IngestionDataSourceCloudStorageAvroFormat{}
616+
case *pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat:
617+
format = &IngestionDataSourceCloudStoragePubSubAvroFormat{}
618+
}
619+
s.Source = &IngestionDataSourceCloudStorage{
620+
State: CloudStorageIngestionState(cs.GetState()),
621+
Bucket: cs.GetBucket(),
622+
InputFormat: format,
623+
MinimumObjectCreateTime: cs.GetMinimumObjectCreateTime().AsTime(),
624+
MatchGlob: cs.GetMatchGlob(),
625+
}
512626
}
513627
return s
514628
}
@@ -534,6 +648,48 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
534648
},
535649
}
536650
}
651+
if cs, ok := out.(*IngestionDataSourceCloudStorage); ok {
652+
switch format := cs.InputFormat.(type) {
653+
case *IngestionDataSourceCloudStorageTextFormat:
654+
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
655+
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
656+
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
657+
Bucket: cs.Bucket,
658+
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
659+
TextFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat{
660+
Delimiter: &format.Delimiter,
661+
},
662+
},
663+
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
664+
MatchGlob: cs.MatchGlob,
665+
},
666+
}
667+
case *IngestionDataSourceCloudStorageAvroFormat:
668+
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
669+
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
670+
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
671+
Bucket: cs.Bucket,
672+
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_{
673+
AvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat{},
674+
},
675+
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
676+
MatchGlob: cs.MatchGlob,
677+
},
678+
}
679+
case *IngestionDataSourceCloudStoragePubSubAvroFormat:
680+
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
681+
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
682+
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
683+
Bucket: cs.Bucket,
684+
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat{
685+
PubsubAvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubSubAvroFormat{},
686+
},
687+
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
688+
MatchGlob: cs.MatchGlob,
689+
},
690+
}
691+
}
692+
}
537693
}
538694
return pbs
539695
}

pubsub/topic_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,65 @@ func TestTopic_IngestionKinesis(t *testing.T) {
165165
}
166166
}
167167

168+
func TestTopic_IngestionCloudStorage(t *testing.T) {
169+
c, srv := newFake(t)
170+
defer c.Close()
171+
defer srv.Close()
172+
173+
id := "test-topic-storage-ingestion"
174+
want := TopicConfig{
175+
IngestionDataSourceSettings: &IngestionDataSourceSettings{
176+
Source: &IngestionDataSourceCloudStorage{
177+
Bucket: "fake-bucket",
178+
InputFormat: &IngestionDataSourceCloudStorageTextFormat{
179+
Delimiter: ",",
180+
},
181+
MinimumObjectCreateTime: time.Now().Add(-time.Hour),
182+
MatchGlob: "**.txt",
183+
},
184+
},
185+
}
186+
187+
topic := mustCreateTopicWithConfig(t, c, id, &want)
188+
got, err := topic.Config(context.Background())
189+
if err != nil {
190+
t.Fatalf("error getting topic config: %v", err)
191+
}
192+
want.State = TopicStateActive
193+
opt := cmpopts.IgnoreUnexported(TopicConfig{})
194+
if !testutil.Equal(got, want, opt) {
195+
t.Errorf("got %v, want %v", got, want)
196+
}
197+
198+
// Update ingestion settings.
199+
ctx := context.Background()
200+
settings := &IngestionDataSourceSettings{
201+
Source: &IngestionDataSourceCloudStorage{
202+
Bucket: "fake-bucket-2",
203+
InputFormat: &IngestionDataSourceCloudStoragePubSubAvroFormat{},
204+
MinimumObjectCreateTime: time.Now().Add(-2 * time.Hour),
205+
MatchGlob: "**.txt",
206+
},
207+
}
208+
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
209+
if err != nil {
210+
t.Fatal(err)
211+
}
212+
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
213+
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
214+
}
215+
216+
// Clear ingestion settings.
217+
settings = &IngestionDataSourceSettings{}
218+
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
219+
if err != nil {
220+
t.Fatal(err)
221+
}
222+
if config3.IngestionDataSourceSettings != nil {
223+
t.Errorf("got: %+v, want nil", config3.IngestionDataSourceSettings)
224+
}
225+
}
226+
168227
func TestListTopics(t *testing.T) {
169228
ctx := context.Background()
170229
c, srv := newFake(t)

0 commit comments

Comments
 (0)