3838import com .google .api .gax .core .RetryParams ;
3939import com .google .api .gax .grpc .ApiCallSettings ;
4040import com .google .api .gax .grpc .ApiCallable .ApiCallableBuilder ;
41+ import com .google .api .gax .grpc .ApiCallable .BundlableApiCallableBuilder ;
4142import com .google .api .gax .grpc .ApiCallable .PageStreamingApiCallableBuilder ;
42- import com .google .api .gax .grpc .PageDescriptor ;
43+ import com .google .api .gax .grpc .BundlingDescriptor ;
44+ import com .google .api .gax .grpc .PageStreamingDescriptor ;
45+ import com .google .api .gax .grpc .RequestIssuer ;
4346import com .google .api .gax .grpc .ServiceApiSettings ;
4447import com .google .common .collect .ImmutableList ;
4548import com .google .common .collect .ImmutableMap ;
5659import com .google .pubsub .v1 .PublishRequest ;
5760import com .google .pubsub .v1 .PublishResponse ;
5861import com .google .pubsub .v1 .PublisherGrpc ;
62+ import com .google .pubsub .v1 .PubsubMessage ;
5963import com .google .pubsub .v1 .Topic ;
6064import io .grpc .Status ;
65+ import java .util .ArrayList ;
66+ import java .util .Collection ;
67+ import java .util .List ;
6168
6269// Manually-added imports: add custom (non-generated) imports after this point.
6370
@@ -134,7 +141,7 @@ public class PublisherSettings extends ServiceApiSettings {
134141
135142 private static class MethodBuilders {
136143 private final ApiCallableBuilder <Topic , Topic > createTopicMethod ;
137- private final ApiCallableBuilder <PublishRequest , PublishResponse > publishMethod ;
144+ private final BundlableApiCallableBuilder <PublishRequest , PublishResponse > publishMethod ;
138145 private final ApiCallableBuilder <GetTopicRequest , Topic > getTopicMethod ;
139146 private final PageStreamingApiCallableBuilder <ListTopicsRequest , ListTopicsResponse , Topic >
140147 listTopicsMethod ;
@@ -149,7 +156,8 @@ public MethodBuilders() {
149156 createTopicMethod .setRetryableCodes (RETRYABLE_CODE_DEFINITIONS .get ("idempotent" ));
150157 createTopicMethod .setRetryParams (RETRY_PARAM_DEFINITIONS .get ("default" ));
151158
152- publishMethod = new ApiCallableBuilder <>(PublisherGrpc .METHOD_PUBLISH );
159+ publishMethod =
160+ new BundlableApiCallableBuilder <>(PublisherGrpc .METHOD_PUBLISH , PUBLISH_BUNDLING_DESC );
153161 publishMethod .setRetryableCodes (RETRYABLE_CODE_DEFINITIONS .get ("non_idempotent" ));
154162 publishMethod .setRetryParams (RETRY_PARAM_DEFINITIONS .get ("default" ));
155163
@@ -223,7 +231,7 @@ protected PublisherSettings(MethodBuilders methods) {
223231 }
224232
225233 /**
226- * Returns the ApiCallableBuilder for the API method createTopic.
234+ * Returns the builder for the API method createTopic.
227235 *
228236 * <!-- manual edit -->
229237 * <!-- end manual edit -->
@@ -233,17 +241,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
233241 }
234242
235243 /**
236- * Returns the ApiCallableBuilder for the API method publish.
244+ * Returns the builder for the API method publish.
237245 *
238246 * <!-- manual edit -->
239247 * <!-- end manual edit -->
240248 */
241- public ApiCallableBuilder <PublishRequest , PublishResponse > publishMethod () {
249+ public BundlableApiCallableBuilder <PublishRequest , PublishResponse > publishMethod () {
242250 return methods .publishMethod ;
243251 }
244252
245253 /**
246- * Returns the ApiCallableBuilder for the API method getTopic.
254+ * Returns the builder for the API method getTopic.
247255 *
248256 * <!-- manual edit -->
249257 * <!-- end manual edit -->
@@ -253,7 +261,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
253261 }
254262
255263 /**
256- * Returns the PageStreamingApiCallableBuilder for the API method listTopics.
264+ * Returns the builder for the API method listTopics.
257265 *
258266 * <!-- manual edit -->
259267 * <!-- end manual edit -->
@@ -264,7 +272,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
264272 }
265273
266274 /**
267- * Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
275+ * Returns the builder for the API method listTopicSubscriptions.
268276 *
269277 * <!-- manual edit -->
270278 * <!-- end manual edit -->
@@ -276,7 +284,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
276284 }
277285
278286 /**
279- * Returns the ApiCallableBuilder for the API method deleteTopic.
287+ * Returns the builder for the API method deleteTopic.
280288 *
281289 * <!-- manual edit -->
282290 * <!-- end manual edit -->
@@ -285,9 +293,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
285293 return methods .deleteTopicMethod ;
286294 }
287295
288- private static PageDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >
296+ private static PageStreamingDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >
289297 LIST_TOPICS_PAGE_STR_DESC =
290- new PageDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >() {
298+ new PageStreamingDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >() {
291299 @ Override
292300 public Object emptyToken () {
293301 return "" ;
@@ -309,10 +317,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
309317 }
310318 };
311319
312- private static PageDescriptor <
320+ private static PageStreamingDescriptor <
313321 ListTopicSubscriptionsRequest , ListTopicSubscriptionsResponse , String >
314322 LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
315- new PageDescriptor <
323+ new PageStreamingDescriptor <
316324 ListTopicSubscriptionsRequest , ListTopicSubscriptionsResponse , String >() {
317325 @ Override
318326 public Object emptyToken () {
@@ -337,4 +345,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
337345 return payload .getSubscriptionsList ();
338346 }
339347 };
348+
349+ private static BundlingDescriptor <PublishRequest , PublishResponse > PUBLISH_BUNDLING_DESC =
350+ new BundlingDescriptor <PublishRequest , PublishResponse >() {
351+ @ Override
352+ public String getBundlePartitionKey (PublishRequest request ) {
353+ return request .getTopic ();
354+ }
355+
356+ @ Override
357+ public PublishRequest mergeRequests (Collection <PublishRequest > requests ) {
358+ PublishRequest firstRequest = requests .iterator ().next ();
359+
360+ List <PubsubMessage > elements = new ArrayList <>();
361+ for (PublishRequest request : requests ) {
362+ elements .addAll (request .getMessagesList ());
363+ }
364+
365+ PublishRequest bundleRequest =
366+ PublishRequest .newBuilder ()
367+ .setTopic (firstRequest .getTopic ())
368+ .addAllMessages (elements )
369+ .build ();
370+ return bundleRequest ;
371+ }
372+
373+ @ Override
374+ public void splitResponse (
375+ PublishResponse bundleResponse ,
376+ Collection <? extends RequestIssuer <PublishRequest , PublishResponse >> bundle ) {
377+ int bundleMessageIndex = 0 ;
378+ for (RequestIssuer <PublishRequest , PublishResponse > responder : bundle ) {
379+ List <String > subresponseElements = new ArrayList <>();
380+ int subresponseCount = responder .getRequest ().getMessagesCount ();
381+ for (int i = 0 ; i < subresponseCount ; i ++) {
382+ subresponseElements .add (bundleResponse .getMessageIds (bundleMessageIndex ));
383+ bundleMessageIndex += 1 ;
384+ }
385+ PublishResponse response =
386+ PublishResponse .newBuilder ().addAllMessageIds (subresponseElements ).build ();
387+ responder .setResponse (response );
388+ }
389+ }
390+
391+ @ Override
392+ public void splitException (
393+ Throwable throwable ,
394+ Collection <? extends RequestIssuer <PublishRequest , PublishResponse >> bundle ) {
395+ for (RequestIssuer <PublishRequest , PublishResponse > responder : bundle ) {
396+ responder .setException (throwable );
397+ }
398+ }
399+
400+ @ Override
401+ public long countElements (PublishRequest request ) {
402+ return request .getMessagesCount ();
403+ }
404+
405+ @ Override
406+ public long countBytes (PublishRequest request ) {
407+ return request .getSerializedSize ();
408+ }
409+ };
340410}
0 commit comments