Skip to content

Commit ca31a82

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents d2c24a9 + 6c76a82 commit ca31a82

12 files changed

Lines changed: 240 additions & 235 deletions

File tree

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void main(String... args) throws Exception {
4646
@Override
4747
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
4848
System.out.println("got message: " + message.getData().toStringUtf8());
49-
consumer.accept(AckReply.ACK, null);
49+
consumer.accept(AckReply.ACK);
5050
}
5151
};
5252
Subscriber subscriber = null;

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public MessageReceiver messageReceiver() {
5151
MessageReceiver receiver = new MessageReceiver() {
5252
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
5353
if (blockingQueue.offer(message)) {
54-
consumer.accept(AckReply.ACK, null);
54+
consumer.accept(AckReply.ACK);
5555
} else {
56-
consumer.accept(AckReply.NACK, null);
56+
consumer.accept(AckReply.NACK);
5757
}
5858
}
5959
};

google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingSettings.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
import static com.google.cloud.logging.spi.v2.PagedResponseWrappers.ListMonitoredResourceDescriptorsPagedResponse;
2121

2222
import com.google.api.MonitoredResourceDescriptor;
23-
import com.google.api.gax.bundling.BundlingSettings;
24-
import com.google.api.gax.bundling.RequestBuilder;
23+
import com.google.api.gax.batching.BatchingSettings;
24+
import com.google.api.gax.batching.RequestBuilder;
2525
import com.google.api.gax.core.FlowControlSettings;
2626
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
2727
import com.google.api.gax.core.GoogleCredentialsProvider;
2828
import com.google.api.gax.core.RetrySettings;
29-
import com.google.api.gax.grpc.BundledRequestIssuer;
30-
import com.google.api.gax.grpc.BundlingCallSettings;
31-
import com.google.api.gax.grpc.BundlingDescriptor;
29+
import com.google.api.gax.grpc.BatchedRequestIssuer;
30+
import com.google.api.gax.grpc.BatchingCallSettings;
31+
import com.google.api.gax.grpc.BatchingDescriptor;
3232
import com.google.api.gax.grpc.CallContext;
3333
import com.google.api.gax.grpc.ChannelProvider;
3434
import com.google.api.gax.grpc.ClientSettings;
@@ -114,7 +114,7 @@ public class LoggingSettings extends ClientSettings {
114114
private static final String DEFAULT_GAPIC_VERSION = "";
115115

116116
private final SimpleCallSettings<DeleteLogRequest, Empty> deleteLogSettings;
117-
private final BundlingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
117+
private final BatchingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
118118
writeLogEntriesSettings;
119119
private final PagedCallSettings<
120120
ListLogEntriesRequest, ListLogEntriesResponse, ListLogEntriesPagedResponse>
@@ -132,7 +132,7 @@ public SimpleCallSettings<DeleteLogRequest, Empty> deleteLogSettings() {
132132
}
133133

134134
/** Returns the object with the settings used for calls to writeLogEntries. */
135-
public BundlingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
135+
public BatchingCallSettings<WriteLogEntriesRequest, WriteLogEntriesResponse>
136136
writeLogEntriesSettings() {
137137
return writeLogEntriesSettings;
138138
}
@@ -385,11 +385,11 @@ public ListLogsPagedResponse createPagedListResponse(
385385
}
386386
};
387387

388-
private static final BundlingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>
389-
WRITE_LOG_ENTRIES_BUNDLING_DESC =
390-
new BundlingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>() {
388+
private static final BatchingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>
389+
WRITE_LOG_ENTRIES_BATCHING_DESC =
390+
new BatchingDescriptor<WriteLogEntriesRequest, WriteLogEntriesResponse>() {
391391
@Override
392-
public String getBundlePartitionKey(WriteLogEntriesRequest request) {
392+
public String getBatchPartitionKey(WriteLogEntriesRequest request) {
393393
return request.getLogName()
394394
+ "|"
395395
+ request.getResource()
@@ -421,10 +421,10 @@ public WriteLogEntriesRequest build() {
421421

422422
@Override
423423
public void splitResponse(
424-
WriteLogEntriesResponse bundleResponse,
425-
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
426-
int bundleMessageIndex = 0;
427-
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
424+
WriteLogEntriesResponse batchResponse,
425+
Collection<? extends BatchedRequestIssuer<WriteLogEntriesResponse>> batch) {
426+
int batchMessageIndex = 0;
427+
for (BatchedRequestIssuer<WriteLogEntriesResponse> responder : batch) {
428428
WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build();
429429
responder.setResponse(response);
430430
}
@@ -433,8 +433,8 @@ public void splitResponse(
433433
@Override
434434
public void splitException(
435435
Throwable throwable,
436-
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
437-
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
436+
Collection<? extends BatchedRequestIssuer<WriteLogEntriesResponse>> batch) {
437+
for (BatchedRequestIssuer<WriteLogEntriesResponse> responder : batch) {
438438
responder.setException(throwable);
439439
}
440440
}
@@ -455,7 +455,7 @@ public static class Builder extends ClientSettings.Builder {
455455
private final ImmutableList<UnaryCallSettings.Builder> unaryMethodSettingsBuilders;
456456

457457
private final SimpleCallSettings.Builder<DeleteLogRequest, Empty> deleteLogSettings;
458-
private final BundlingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
458+
private final BatchingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
459459
writeLogEntriesSettings;
460460
private final PagedCallSettings.Builder<
461461
ListLogEntriesRequest, ListLogEntriesResponse, ListLogEntriesPagedResponse>
@@ -517,9 +517,9 @@ private Builder() {
517517
deleteLogSettings = SimpleCallSettings.newBuilder(LoggingServiceV2Grpc.METHOD_DELETE_LOG);
518518

519519
writeLogEntriesSettings =
520-
BundlingCallSettings.newBuilder(
521-
LoggingServiceV2Grpc.METHOD_WRITE_LOG_ENTRIES, WRITE_LOG_ENTRIES_BUNDLING_DESC)
522-
.setBundlingSettingsBuilder(BundlingSettings.newBuilder());
520+
BatchingCallSettings.newBuilder(
521+
LoggingServiceV2Grpc.METHOD_WRITE_LOG_ENTRIES, WRITE_LOG_ENTRIES_BATCHING_DESC)
522+
.setBatchingSettingsBuilder(BatchingSettings.newBuilder());
523523

524524
listLogEntriesSettings =
525525
PagedCallSettings.newBuilder(
@@ -553,7 +553,7 @@ private static Builder createDefault() {
553553

554554
builder
555555
.writeLogEntriesSettings()
556-
.getBundlingSettingsBuilder()
556+
.getBatchingSettingsBuilder()
557557
.setElementCountThreshold(1000)
558558
.setRequestByteThreshold(1048576)
559559
.setDelayThreshold(Duration.millis(50))
@@ -635,7 +635,7 @@ public SimpleCallSettings.Builder<DeleteLogRequest, Empty> deleteLogSettings() {
635635
}
636636

637637
/** Returns the builder for the settings used for calls to writeLogEntries. */
638-
public BundlingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
638+
public BatchingCallSettings.Builder<WriteLogEntriesRequest, WriteLogEntriesResponse>
639639
writeLogEntriesSettings() {
640640
return writeLogEntriesSettings;
641641
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
/**
2020
* Accepts a reply, sending it to the service.
2121
*
22-
* <p>Both the interface and its method is named after the Java 8's {@code BiConsumer} interface
22+
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
2323
* to make migration to Java 8 and adopting its patterns easier.
2424
*/
2525
public interface AckReplyConsumer {
26-
void accept(AckReply ackReply, Throwable t);
26+
void accept(AckReply ackReply);
2727
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,20 +285,20 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
285285
final AckReplyConsumer consumer =
286286
new AckReplyConsumer() {
287287
@Override
288-
public void accept(AckReply reply, Throwable t) {
289-
if (reply != null) {
290-
response.set(reply);
291-
} else {
292-
response.setException(t);
293-
}
288+
public void accept(AckReply reply) {
289+
response.set(reply);
294290
}
295291
};
296292
Futures.addCallback(response, ackHandler);
297293
executor.submit(
298294
new Runnable() {
299295
@Override
300296
public void run() {
301-
receiver.receiveMessage(message, consumer);
297+
try {
298+
receiver.receiveMessage(message, consumer);
299+
} catch (Exception e) {
300+
response.setException(e);
301+
}
302302
}
303303
});
304304
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private boolean isAlive() {
199199
@Override
200200
public void sendAckOperations(
201201
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
202-
// Send the modify ack deadlines in bundles as not to exceed the max request
202+
// Send the modify ack deadlines in batches as not to exceed the max request
203203
// size.
204204
for (PendingModifyAckDeadline modifyAckDeadline : ackDeadlineExtensions) {
205205
for (List<String> ackIdChunk :

0 commit comments

Comments
 (0)