@@ -44,6 +44,7 @@ import (
4444 "google.golang.org/protobuf/proto"
4545 "google.golang.org/protobuf/types/known/durationpb"
4646 fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
47+ "google.golang.org/protobuf/types/known/timestamppb"
4748)
4849
4950const (
@@ -350,6 +351,9 @@ type TopicConfigToUpdate struct {
350351 // IngestionDataSourceSettings are settings for ingestion from a
351352 // data source into this topic.
352353 //
354+ // When changing this value, the entire data source settings object must be applied,
355+ // rather than just the differences.
356+ //
353357 // Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
354358 IngestionDataSourceSettings * IngestionDataSourceSettings
355359}
@@ -495,6 +499,97 @@ func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
495499 return true
496500}
497501
502+ // CloudStorageIngestionState denotes the possible states for ingestion from Cloud Storage.
503+ type CloudStorageIngestionState int
504+
505+ const (
506+ // CloudStorageIngestionStateUnspecified is the default value. This value is unused.
507+ CloudStorageIngestionStateUnspecified = iota
508+
509+ // CloudStorageIngestionStateActive means ingestion is active.
510+ CloudStorageIngestionStateActive
511+
512+ // CloudStorageIngestionPermissionDenied means encountering an error while calling the Cloud Storage API.
513+ // This can happen if the Pub/Sub SA has not been granted the
514+ // [appropriate permissions](https://cloud.google.com/storage/docs/access-control/iam-permissions):
515+ // - storage.objects.list: to list the objects in a bucket.
516+ // - storage.objects.get: to read the objects in a bucket.
517+ // - storage.buckets.get: to verify the bucket exists.
518+ CloudStorageIngestionPermissionDenied
519+
520+ // CloudStorageIngestionPublishPermissionDenied means encountering an error when publishing to the topic.
521+ // This can happen if the Pub/Sub SA has not been granted the [appropriate publish
522+ // permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher)
523+ CloudStorageIngestionPublishPermissionDenied
524+
525+ // CloudStorageIngestionBucketNotFound means the provided bucket doesn't exist.
526+ CloudStorageIngestionBucketNotFound
527+
528+ // CloudStorageIngestionTooManyObjects means the bucket has too many objects, ingestion will be paused.
529+ CloudStorageIngestionTooManyObjects
530+ )
531+
532+ // IngestionDataSourceCloudStorage are ingestion settings for Cloud Storage.
533+ type IngestionDataSourceCloudStorage struct {
534+ // State is an output-only field indicating the state of the Cloud storage ingestion source.
535+ State CloudStorageIngestionState
536+
537+ // Bucket is the Cloud Storage bucket. The bucket name must be without any
538+ // prefix like "gs://". See the bucket naming requirements (https://cloud.google.com/storage/docs/buckets#naming).
539+ Bucket string
540+
541+ // InputFormat is the format of objects in Cloud Storage.
542+ // Defaults to TextFormat.
543+ InputFormat ingestionDataSourceCloudStorageInputFormat
544+
545+ // MinimumObjectCreateTime means objects with a larger or equal creation timestamp will be
546+ // ingested.
547+ MinimumObjectCreateTime time.Time
548+
549+ // MatchGlob is the pattern used to match objects that will be ingested. If
550+ // empty, all objects will be ingested. See the [supported
551+ // patterns](https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-objects-and-prefixes-using-glob).
552+ MatchGlob string
553+ }
554+
555+ var _ IngestionDataSource = (* IngestionDataSourceCloudStorage )(nil )
556+
557+ func (i * IngestionDataSourceCloudStorage ) isIngestionDataSource () bool {
558+ return true
559+ }
560+
561+ type ingestionDataSourceCloudStorageInputFormat interface {
562+ isCloudStorageIngestionInputFormat () bool
563+ }
564+
565+ var _ ingestionDataSourceCloudStorageInputFormat = (* IngestionDataSourceCloudStorageTextFormat )(nil )
566+ var _ ingestionDataSourceCloudStorageInputFormat = (* IngestionDataSourceCloudStorageAvroFormat )(nil )
567+ var _ ingestionDataSourceCloudStorageInputFormat = (* IngestionDataSourceCloudStoragePubSubAvroFormat )(nil )
568+
569+ // IngestionDataSourceCloudStorageTextFormat means Cloud Storage data will be interpreted as text.
570+ type IngestionDataSourceCloudStorageTextFormat struct {
571+ Delimiter string
572+ }
573+
574+ func (i * IngestionDataSourceCloudStorageTextFormat ) isCloudStorageIngestionInputFormat () bool {
575+ return true
576+ }
577+
578+ // IngestionDataSourceCloudStorageAvroFormat means Cloud Storage data will be interpreted in Avro format.
579+ type IngestionDataSourceCloudStorageAvroFormat struct {}
580+
581+ func (i * IngestionDataSourceCloudStorageAvroFormat ) isCloudStorageIngestionInputFormat () bool {
582+ return true
583+ }
584+
585+ // IngestionDataSourceCloudStoragePubSubAvroFormat is used assuming the data was written using Cloud
586+ // Storage subscriptions https://cloud.google.com/pubsub/docs/cloudstorage.
587+ type IngestionDataSourceCloudStoragePubSubAvroFormat struct {}
588+
589+ func (i * IngestionDataSourceCloudStoragePubSubAvroFormat ) isCloudStorageIngestionInputFormat () bool {
590+ return true
591+ }
592+
498593func protoToIngestionDataSourceSettings (pbs * pb.IngestionDataSourceSettings ) * IngestionDataSourceSettings {
499594 if pbs == nil {
500595 return nil
@@ -509,6 +604,25 @@ func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *In
509604 AWSRoleARN : k .GetAwsRoleArn (),
510605 GCPServiceAccount : k .GetGcpServiceAccount (),
511606 }
607+ } else if cs := pbs .GetCloudStorage (); cs != nil {
608+ var format ingestionDataSourceCloudStorageInputFormat
609+ switch t := cs .InputFormat .(type ) {
610+ case * pb.IngestionDataSourceSettings_CloudStorage_TextFormat_ :
611+ format = & IngestionDataSourceCloudStorageTextFormat {
612+ Delimiter : * t .TextFormat .Delimiter ,
613+ }
614+ case * pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_ :
615+ format = & IngestionDataSourceCloudStorageAvroFormat {}
616+ case * pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat :
617+ format = & IngestionDataSourceCloudStoragePubSubAvroFormat {}
618+ }
619+ s .Source = & IngestionDataSourceCloudStorage {
620+ State : CloudStorageIngestionState (cs .GetState ()),
621+ Bucket : cs .GetBucket (),
622+ InputFormat : format ,
623+ MinimumObjectCreateTime : cs .GetMinimumObjectCreateTime ().AsTime (),
624+ MatchGlob : cs .GetMatchGlob (),
625+ }
512626 }
513627 return s
514628}
@@ -534,6 +648,48 @@ func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings
534648 },
535649 }
536650 }
651+ if cs , ok := out .(* IngestionDataSourceCloudStorage ); ok {
652+ switch format := cs .InputFormat .(type ) {
653+ case * IngestionDataSourceCloudStorageTextFormat :
654+ pbs .Source = & pb.IngestionDataSourceSettings_CloudStorage_ {
655+ CloudStorage : & pb.IngestionDataSourceSettings_CloudStorage {
656+ State : pb .IngestionDataSourceSettings_CloudStorage_State (cs .State ),
657+ Bucket : cs .Bucket ,
658+ InputFormat : & pb.IngestionDataSourceSettings_CloudStorage_TextFormat_ {
659+ TextFormat : & pb.IngestionDataSourceSettings_CloudStorage_TextFormat {
660+ Delimiter : & format .Delimiter ,
661+ },
662+ },
663+ MinimumObjectCreateTime : timestamppb .New (cs .MinimumObjectCreateTime ),
664+ MatchGlob : cs .MatchGlob ,
665+ },
666+ }
667+ case * IngestionDataSourceCloudStorageAvroFormat :
668+ pbs .Source = & pb.IngestionDataSourceSettings_CloudStorage_ {
669+ CloudStorage : & pb.IngestionDataSourceSettings_CloudStorage {
670+ State : pb .IngestionDataSourceSettings_CloudStorage_State (cs .State ),
671+ Bucket : cs .Bucket ,
672+ InputFormat : & pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_ {
673+ AvroFormat : & pb.IngestionDataSourceSettings_CloudStorage_AvroFormat {},
674+ },
675+ MinimumObjectCreateTime : timestamppb .New (cs .MinimumObjectCreateTime ),
676+ MatchGlob : cs .MatchGlob ,
677+ },
678+ }
679+ case * IngestionDataSourceCloudStoragePubSubAvroFormat :
680+ pbs .Source = & pb.IngestionDataSourceSettings_CloudStorage_ {
681+ CloudStorage : & pb.IngestionDataSourceSettings_CloudStorage {
682+ State : pb .IngestionDataSourceSettings_CloudStorage_State (cs .State ),
683+ Bucket : cs .Bucket ,
684+ InputFormat : & pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat {
685+ PubsubAvroFormat : & pb.IngestionDataSourceSettings_CloudStorage_PubSubAvroFormat {},
686+ },
687+ MinimumObjectCreateTime : timestamppb .New (cs .MinimumObjectCreateTime ),
688+ MatchGlob : cs .MatchGlob ,
689+ },
690+ }
691+ }
692+ }
537693 }
538694 return pbs
539695}
0 commit comments