2424import com .google .api .core .ApiFuture ;
2525import com .google .api .core .ApiFutureCallback ;
2626import com .google .api .core .ApiFutures ;
27+ import com .google .api .gax .batching .BatchingSettings ;
28+ import com .google .api .gax .batching .FlowControlSettings ;
29+ import com .google .api .gax .batching .FlowController .LimitExceededBehavior ;
30+ import com .google .api .gax .core .CredentialsProvider ;
31+ import com .google .api .gax .core .FixedCredentialsProvider ;
32+ import com .google .api .gax .grpc .ChannelProvider ;
33+ import com .google .api .gax .grpc .ExecutorProvider ;
34+ import com .google .api .gax .grpc .InstantiatingExecutorProvider ;
35+ import com .google .api .gax .retrying .RetrySettings ;
36+ import com .google .auth .oauth2 .ServiceAccountCredentials ;
2737import com .google .cloud .pubsub .spi .v1 .Publisher ;
38+ import com .google .cloud .pubsub .spi .v1 .TopicAdminSettings ;
2839import com .google .protobuf .ByteString ;
2940import com .google .pubsub .v1 .PubsubMessage ;
3041import com .google .pubsub .v1 .TopicName ;
42+ import org .threeten .bp .Duration ;
43+
44+ import java .io .FileInputStream ;
3145
3246/** This class contains snippets for the {@link Publisher} interface. */
3347public class PublisherSnippets {
@@ -41,7 +55,6 @@ public PublisherSnippets(Publisher publisher) {
4155 // [TARGET publish(PubsubMessage)]
4256 // [VARIABLE "my_message"]
4357 public ApiFuture <String > publish (String message ) {
44- // [START publish]
4558 ByteString data = ByteString .copyFromUtf8 (message );
4659 PubsubMessage pubsubMessage = PubsubMessage .newBuilder ().setData (data ).build ();
4760 ApiFuture <String > messageIdFuture = publisher .publish (pubsubMessage );
@@ -54,24 +67,112 @@ public void onFailure(Throwable t) {
5467 System .out .println ("failed to publish: " + t );
5568 }
5669 });
57- // [END publish]
5870 return messageIdFuture ;
5971 }
6072
6173 /** Example of creating a {@code Publisher}. */
6274 // [TARGET newBuilder(TopicName)]
6375 // [VARIABLE "my_project"]
6476 // [VARIABLE "my_topic"]
65- public static void newBuilder (String projectName , String topicName ) throws Exception {
66- // [START newBuilder]
67- TopicName topic = TopicName .create (projectName , topicName );
77+ public static void newBuilder (String projectId , String topicId ) throws Exception {
78+ TopicName topic = TopicName .create (projectId , topicId );
6879 Publisher publisher = Publisher .defaultBuilder (topic ).build ();
6980 try {
7081 // ...
7182 } finally {
7283 // When finished with the publisher, make sure to shutdown to free up resources.
7384 publisher .shutdown ();
7485 }
75- // [END newBuilder]
86+ }
87+
88+ public Publisher getPublisherWithCustomBatchSettings (TopicName topicName ) throws Exception {
89+ // [START publisherBatchSettings]
90+ // Batch settings control how the publisher batches messages
91+ long requestBytesThreshold = 5000L ; // default : 1kb
92+ long messageCountBatchSize = 10L ; // default : 100
93+
94+ Duration publishDelayThreshold = Duration .ofMillis (100 ); // default : 1 ms
95+
96+ // Publish request get triggered based on request size, messages count & time since last publish
97+ BatchingSettings batchingSettings = BatchingSettings .newBuilder ()
98+ .setElementCountThreshold (messageCountBatchSize )
99+ .setRequestByteThreshold (requestBytesThreshold )
100+ .setDelayThreshold (publishDelayThreshold )
101+ .build ();
102+
103+ Publisher publisher = Publisher .defaultBuilder (topicName )
104+ .setBatchingSettings (batchingSettings ).build ();
105+ // [END publisherBatchSettings]
106+ return publisher ;
107+ }
108+
109+ public Publisher getPublisherWithCustomRetrySettings (TopicName topicName ) throws Exception {
110+ // [START publisherRetrySettings]
111+ // Retry settings control how the publisher handles retryable failures
112+ Duration retryDelay = Duration .ofMillis (100 ); // default : 1 ms
113+ double retryDelayMultiplier = 2.0 ; // back off for repeated failures
114+ Duration maxRetryDelay = Duration .ofSeconds (5 ); // default : 10 seconds
115+
116+ RetrySettings retrySettings = RetrySettings .newBuilder ()
117+ .setInitialRetryDelay (retryDelay )
118+ .setRetryDelayMultiplier (retryDelayMultiplier )
119+ .setMaxRetryDelay (maxRetryDelay )
120+ .build ();
121+
122+ Publisher publisher = Publisher .defaultBuilder (topicName )
123+ .setRetrySettings (retrySettings ).build ();
124+ // [END publisherRetrySettings]
125+ return publisher ;
126+ }
127+
128+ public Publisher getPublisherWithCustomFlowControlSettings (TopicName topicName ) throws Exception {
129+ // [START publisherFlowControlSettings]
130+
131+ // Flow control settings restrict the number of outstanding publish requests
132+ int maxOutstandingBatches = 20 ;
133+ int maxOutstandingRequestBytes = 500000 ;
134+
135+ // override behavior on limits exceeded if needed, default behavior is to block
136+ LimitExceededBehavior limitExceededBehavior = LimitExceededBehavior .ThrowException ;
137+
138+ FlowControlSettings flowControlSettings = FlowControlSettings .newBuilder ()
139+ .setMaxOutstandingElementCount (maxOutstandingBatches )
140+ .setMaxOutstandingRequestBytes (maxOutstandingRequestBytes )
141+ .setLimitExceededBehavior (limitExceededBehavior )
142+ .build ();
143+
144+ Publisher publisher = Publisher .defaultBuilder (topicName )
145+ .setFlowControlSettings (flowControlSettings ).build ();
146+ // [END publisherFlowControlSettings]
147+ return publisher ;
148+ }
149+
150+ public Publisher getSingleThreadedPublisher (TopicName topicName ) throws Exception {
151+ // [START singleThreadedPublisher]
152+ // create a publisher with a single threaded executor
153+ ExecutorProvider executorProvider = InstantiatingExecutorProvider .newBuilder ()
154+ .setExecutorThreadCount (1 ).build ();
155+ Publisher publisher = Publisher .defaultBuilder (topicName )
156+ .setExecutorProvider (executorProvider ).build ();
157+ // [END singleThreadedPublisher]
158+ return publisher ;
159+ }
160+
161+ private Publisher createPublisherWithCustomCredentials (TopicName topicName ) throws Exception {
162+ // [START publisherWithCustomCredentials]
163+ // read service account credentials from file
164+ CredentialsProvider credentialsProvider =
165+ FixedCredentialsProvider
166+ .create (ServiceAccountCredentials .fromStream (
167+ new FileInputStream ("credentials.json" )));
168+ ChannelProvider channelProvider =
169+ TopicAdminSettings .defaultChannelProviderBuilder ()
170+ .setCredentialsProvider (credentialsProvider ).build ();
171+
172+ Publisher publisher = Publisher .defaultBuilder (topicName )
173+ .setChannelProvider (channelProvider )
174+ .build ();
175+ // [START publisherWithCustomCredentials]
176+ return publisher ;
76177 }
77178}
0 commit comments