Skip to content

Commit a21357d

Browse files
---
yaml --- r: 2297 b: refs/heads/pubsub-alpha c: 95a1f90 h: refs/heads/master i: 2295: 5d81b0a
1 parent 05e85d7 commit a21357d

11 files changed

Lines changed: 254 additions & 58 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ refs/heads/master: 689bbb466df4b2d5d2483d6edb8ac5c7c7f7c6fa
33
refs/heads/travis: e21ee7b88a5edc3f3d8c71f90c3fc32abf7e8dd6
44
refs/heads/gh-pages: 4e0561bb4504bf647db669a14417b2b2c87ba45d
55
refs/heads/bigquery: 762fa5830e6c398c0396177e3e7fd243bd62cfc3
6-
refs/heads/pubsub-alpha: e32afa64efce44a03e14c27f1fc7d57de0e4c041
6+
refs/heads/pubsub-alpha: 95a1f9070aa33cd0354591733588ac380f2cdf6d
77
refs/heads/resource-manager: ebf4adc5ee835cd2086c4ac5b4e78d01a5a005a7
88
refs/heads/update-datastore: 482954f2c5055231e5b3122ea91d2ba00ce8187c
99
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

branches/pubsub-alpha/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333

3434
package com.google.gcloud.pubsub.spi.v1;
3535

36-
import com.google.api.gax.grpc.ApiCallSettings;
3736
import com.google.api.gax.grpc.ApiCallable;
37+
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
38+
import com.google.api.gax.grpc.BundlerFactory;
3839
import com.google.api.gax.protobuf.PathTemplate;
3940
import com.google.protobuf.Empty;
4041
import com.google.pubsub.v1.DeleteTopicRequest;
@@ -207,7 +208,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
207208
this.channel = settings.getChannel();
208209

209210
this.createTopicCallable = settings.createTopicMethod().build(settings);
210-
this.publishCallable = settings.publishMethod().build(settings);
211+
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
212+
settings.publishMethod().buildBundlable(settings);
213+
this.publishCallable = bundlablePublish.getApiCallable();
214+
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
215+
bundlablePublish.getBundlerFactory();
216+
if (publishBundlerFactory != null) {
217+
this.closeables.add(publishBundlerFactory);
218+
}
211219
this.getTopicCallable = settings.getTopicMethod().build(settings);
212220
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
213221
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);

branches/pubsub-alpha/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java

Lines changed: 84 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@
3838
import com.google.api.gax.core.RetryParams;
3939
import com.google.api.gax.grpc.ApiCallSettings;
4040
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
41+
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder;
4142
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
42-
import com.google.api.gax.grpc.PageDescriptor;
43+
import com.google.api.gax.grpc.BundlingDescriptor;
44+
import com.google.api.gax.grpc.PageStreamingDescriptor;
45+
import com.google.api.gax.grpc.RequestIssuer;
4346
import com.google.api.gax.grpc.ServiceApiSettings;
4447
import com.google.common.collect.ImmutableList;
4548
import com.google.common.collect.ImmutableMap;
@@ -56,8 +59,12 @@
5659
import com.google.pubsub.v1.PublishRequest;
5760
import com.google.pubsub.v1.PublishResponse;
5861
import com.google.pubsub.v1.PublisherGrpc;
62+
import com.google.pubsub.v1.PubsubMessage;
5963
import com.google.pubsub.v1.Topic;
6064
import io.grpc.Status;
65+
import java.util.ArrayList;
66+
import java.util.Collection;
67+
import java.util.List;
6168

6269
// Manually-added imports: add custom (non-generated) imports after this point.
6370

@@ -134,7 +141,7 @@ public class PublisherSettings extends ServiceApiSettings {
134141

135142
private static class MethodBuilders {
136143
private final ApiCallableBuilder<Topic, Topic> createTopicMethod;
137-
private final ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
144+
private final BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
138145
private final ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod;
139146
private final PageStreamingApiCallableBuilder<ListTopicsRequest, ListTopicsResponse, Topic>
140147
listTopicsMethod;
@@ -149,7 +156,8 @@ public MethodBuilders() {
149156
createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"));
150157
createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));
151158

152-
publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH);
159+
publishMethod =
160+
new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC);
153161
publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"));
154162
publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));
155163

@@ -223,7 +231,7 @@ protected PublisherSettings(MethodBuilders methods) {
223231
}
224232

225233
/**
226-
* Returns the ApiCallableBuilder for the API method createTopic.
234+
* Returns the builder for the API method createTopic.
227235
*
228236
* <!-- manual edit -->
229237
* <!-- end manual edit -->
@@ -233,17 +241,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
233241
}
234242

235243
/**
236-
* Returns the ApiCallableBuilder for the API method publish.
244+
* Returns the builder for the API method publish.
237245
*
238246
* <!-- manual edit -->
239247
* <!-- end manual edit -->
240248
*/
241-
public ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
249+
public BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
242250
return methods.publishMethod;
243251
}
244252

245253
/**
246-
* Returns the ApiCallableBuilder for the API method getTopic.
254+
* Returns the builder for the API method getTopic.
247255
*
248256
* <!-- manual edit -->
249257
* <!-- end manual edit -->
@@ -253,7 +261,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
253261
}
254262

255263
/**
256-
* Returns the PageStreamingApiCallableBuilder for the API method listTopics.
264+
* Returns the builder for the API method listTopics.
257265
*
258266
* <!-- manual edit -->
259267
* <!-- end manual edit -->
@@ -264,7 +272,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
264272
}
265273

266274
/**
267-
* Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
275+
* Returns the builder for the API method listTopicSubscriptions.
268276
*
269277
* <!-- manual edit -->
270278
* <!-- end manual edit -->
@@ -276,7 +284,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
276284
}
277285

278286
/**
279-
* Returns the ApiCallableBuilder for the API method deleteTopic.
287+
* Returns the builder for the API method deleteTopic.
280288
*
281289
* <!-- manual edit -->
282290
* <!-- end manual edit -->
@@ -285,9 +293,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
285293
return methods.deleteTopicMethod;
286294
}
287295

288-
private static PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
296+
private static PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
289297
LIST_TOPICS_PAGE_STR_DESC =
290-
new PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
298+
new PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
291299
@Override
292300
public Object emptyToken() {
293301
return "";
@@ -309,10 +317,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
309317
}
310318
};
311319

312-
private static PageDescriptor<
320+
private static PageStreamingDescriptor<
313321
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>
314322
LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
315-
new PageDescriptor<
323+
new PageStreamingDescriptor<
316324
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() {
317325
@Override
318326
public Object emptyToken() {
@@ -337,4 +345,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
337345
return payload.getSubscriptionsList();
338346
}
339347
};
348+
349+
private static BundlingDescriptor<PublishRequest, PublishResponse> PUBLISH_BUNDLING_DESC =
350+
new BundlingDescriptor<PublishRequest, PublishResponse>() {
351+
@Override
352+
public String getBundlePartitionKey(PublishRequest request) {
353+
return request.getTopic();
354+
}
355+
356+
@Override
357+
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
358+
PublishRequest firstRequest = requests.iterator().next();
359+
360+
List<PubsubMessage> elements = new ArrayList<>();
361+
for (PublishRequest request : requests) {
362+
elements.addAll(request.getMessagesList());
363+
}
364+
365+
PublishRequest bundleRequest =
366+
PublishRequest.newBuilder()
367+
.setTopic(firstRequest.getTopic())
368+
.addAllMessages(elements)
369+
.build();
370+
return bundleRequest;
371+
}
372+
373+
@Override
374+
public void splitResponse(
375+
PublishResponse bundleResponse,
376+
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
377+
int bundleMessageIndex = 0;
378+
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
379+
List<String> subresponseElements = new ArrayList<>();
380+
int subresponseCount = responder.getRequest().getMessagesCount();
381+
for (int i = 0; i < subresponseCount; i++) {
382+
subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
383+
bundleMessageIndex += 1;
384+
}
385+
PublishResponse response =
386+
PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build();
387+
responder.setResponse(response);
388+
}
389+
}
390+
391+
@Override
392+
public void splitException(
393+
Throwable throwable,
394+
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
395+
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
396+
responder.setException(throwable);
397+
}
398+
}
399+
400+
@Override
401+
public long countElements(PublishRequest request) {
402+
return request.getMessagesCount();
403+
}
404+
405+
@Override
406+
public long countBytes(PublishRequest request) {
407+
return request.getSerializedSize();
408+
}
409+
};
340410
}

branches/pubsub-alpha/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberApi.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
package com.google.gcloud.pubsub.spi.v1;
3535

36-
import com.google.api.gax.grpc.ApiCallSettings;
3736
import com.google.api.gax.grpc.ApiCallable;
3837
import com.google.api.gax.protobuf.PathTemplate;
3938
import com.google.protobuf.Empty;

branches/pubsub-alpha/gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/SubscriberSettings.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import com.google.api.gax.grpc.ApiCallSettings;
4040
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
4141
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
42-
import com.google.api.gax.grpc.PageDescriptor;
42+
import com.google.api.gax.grpc.PageStreamingDescriptor;
4343
import com.google.api.gax.grpc.ServiceApiSettings;
4444
import com.google.common.collect.ImmutableList;
4545
import com.google.common.collect.ImmutableMap;
@@ -234,7 +234,7 @@ protected SubscriberSettings(MethodBuilders methods) {
234234
}
235235

236236
/**
237-
* Returns the ApiCallableBuilder for the API method createSubscription.
237+
* Returns the builder for the API method createSubscription.
238238
*
239239
* <!-- manual edit -->
240240
* <!-- end manual edit -->
@@ -244,7 +244,7 @@ public ApiCallableBuilder<Subscription, Subscription> createSubscriptionMethod()
244244
}
245245

246246
/**
247-
* Returns the ApiCallableBuilder for the API method getSubscription.
247+
* Returns the builder for the API method getSubscription.
248248
*
249249
* <!-- manual edit -->
250250
* <!-- end manual edit -->
@@ -254,7 +254,7 @@ public ApiCallableBuilder<GetSubscriptionRequest, Subscription> getSubscriptionM
254254
}
255255

256256
/**
257-
* Returns the PageStreamingApiCallableBuilder for the API method listSubscriptions.
257+
* Returns the builder for the API method listSubscriptions.
258258
*
259259
* <!-- manual edit -->
260260
* <!-- end manual edit -->
@@ -266,7 +266,7 @@ public ApiCallableBuilder<GetSubscriptionRequest, Subscription> getSubscriptionM
266266
}
267267

268268
/**
269-
* Returns the ApiCallableBuilder for the API method deleteSubscription.
269+
* Returns the builder for the API method deleteSubscription.
270270
*
271271
* <!-- manual edit -->
272272
* <!-- end manual edit -->
@@ -276,7 +276,7 @@ public ApiCallableBuilder<DeleteSubscriptionRequest, Empty> deleteSubscriptionMe
276276
}
277277

278278
/**
279-
* Returns the ApiCallableBuilder for the API method modifyAckDeadline.
279+
* Returns the builder for the API method modifyAckDeadline.
280280
*
281281
* <!-- manual edit -->
282282
* <!-- end manual edit -->
@@ -286,7 +286,7 @@ public ApiCallableBuilder<ModifyAckDeadlineRequest, Empty> modifyAckDeadlineMeth
286286
}
287287

288288
/**
289-
* Returns the ApiCallableBuilder for the API method acknowledge.
289+
* Returns the builder for the API method acknowledge.
290290
*
291291
* <!-- manual edit -->
292292
* <!-- end manual edit -->
@@ -296,7 +296,7 @@ public ApiCallableBuilder<AcknowledgeRequest, Empty> acknowledgeMethod() {
296296
}
297297

298298
/**
299-
* Returns the ApiCallableBuilder for the API method pull.
299+
* Returns the builder for the API method pull.
300300
*
301301
* <!-- manual edit -->
302302
* <!-- end manual edit -->
@@ -306,7 +306,7 @@ public ApiCallableBuilder<PullRequest, PullResponse> pullMethod() {
306306
}
307307

308308
/**
309-
* Returns the ApiCallableBuilder for the API method modifyPushConfig.
309+
* Returns the builder for the API method modifyPushConfig.
310310
*
311311
* <!-- manual edit -->
312312
* <!-- end manual edit -->
@@ -315,9 +315,11 @@ public ApiCallableBuilder<ModifyPushConfigRequest, Empty> modifyPushConfigMethod
315315
return methods.modifyPushConfigMethod;
316316
}
317317

318-
private static PageDescriptor<ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>
318+
private static PageStreamingDescriptor<
319+
ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>
319320
LIST_SUBSCRIPTIONS_PAGE_STR_DESC =
320-
new PageDescriptor<ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() {
321+
new PageStreamingDescriptor<
322+
ListSubscriptionsRequest, ListSubscriptionsResponse, Subscription>() {
321323
@Override
322324
public Object emptyToken() {
323325
return "";

branches/pubsub-alpha/gcloud-java-pubsub/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<dependency>
2020
<groupId>com.google.api</groupId>
2121
<artifactId>gax</artifactId>
22-
<version>0.0.4</version>
22+
<version>0.0.5-SNAPSHOT</version>
2323
</dependency>
2424
<dependency>
2525
<groupId>com.google.api.grpc</groupId>

branches/pubsub-alpha/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333

3434
package com.google.gcloud.pubsub.spi.v1;
3535

36-
import com.google.api.gax.grpc.ApiCallSettings;
3736
import com.google.api.gax.grpc.ApiCallable;
37+
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
38+
import com.google.api.gax.grpc.BundlerFactory;
3839
import com.google.api.gax.protobuf.PathTemplate;
3940
import com.google.protobuf.Empty;
4041
import com.google.pubsub.v1.DeleteTopicRequest;
@@ -207,7 +208,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
207208
this.channel = settings.getChannel();
208209

209210
this.createTopicCallable = settings.createTopicMethod().build(settings);
210-
this.publishCallable = settings.publishMethod().build(settings);
211+
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
212+
settings.publishMethod().buildBundlable(settings);
213+
this.publishCallable = bundlablePublish.getApiCallable();
214+
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
215+
bundlablePublish.getBundlerFactory();
216+
if (publishBundlerFactory != null) {
217+
this.closeables.add(publishBundlerFactory);
218+
}
211219
this.getTopicCallable = settings.getTopicMethod().build(settings);
212220
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
213221
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);

0 commit comments

Comments
 (0)