Skip to content

Commit 89b7c3c

Browse files
authored
---
yaml --- r: 9231 b: refs/heads/spanner-gapic-migration c: 7f88505 h: refs/heads/master i: 9229: f6c22fb 9227: ef1c6f8 9223: 4b8c938 9215: 241f8f3
1 parent 8a2f545 commit 89b7c3c

4 files changed

Lines changed: 79 additions & 236 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ refs/tags/v0.35.0: c28951c5f4cc97a1be07900d19df6984115a4bd6
8181
refs/tags/v0.36.0: 6b75c61f73e6827b3ca379bd54f88f750290162f
8282
refs/tags/v0.37.0: db2e142f92601709fdd48db159776f905742e30f
8383
refs/heads/mrschmidt-sizefix: 627a3bfa30bb6f4f76af47b228c38b208dd921e0
84-
refs/heads/spanner-gapic-migration: d70feecf65afebb90a7a01eb603ad5cb7896b75b
84+
refs/heads/spanner-gapic-migration: 7f88505725a528ec14d45829d37a8f4741b32b36
8585
refs/tags/v0.38.0: c235ee4df5e1248e1769dae3f86a0d7ab7fd8301
8686
refs/tags/v0.39.0: ab231c9d22475242a43d6d9554aa4a3f736dab01
8787
refs/tags/v0.40.0: a1d5b05206cce7734365f1b910396a2c9d6605ec

branches/spanner-gapic-migration/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 47 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -17,56 +17,49 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
2022
import com.google.api.core.BetaApi;
21-
import com.google.api.core.InternalApi;
2223
import com.google.api.core.SettableApiFuture;
2324
import com.google.api.gax.batching.BatchingSettings;
2425
import com.google.api.gax.core.CredentialsProvider;
2526
import com.google.api.gax.core.ExecutorAsBackgroundResource;
2627
import com.google.api.gax.core.ExecutorProvider;
28+
import com.google.api.gax.core.FixedExecutorProvider;
2729
import com.google.api.gax.core.InstantiatingExecutorProvider;
2830
import com.google.api.gax.grpc.GrpcStatusCode;
29-
import com.google.api.gax.grpc.GrpcTransportChannel;
3031
import com.google.api.gax.retrying.RetrySettings;
3132
import com.google.api.gax.rpc.ApiException;
3233
import com.google.api.gax.rpc.ApiExceptionFactory;
3334
import com.google.api.gax.rpc.HeaderProvider;
3435
import com.google.api.gax.rpc.NoHeaderProvider;
36+
import com.google.api.gax.rpc.StatusCode;
3537
import com.google.api.gax.rpc.TransportChannelProvider;
36-
import com.google.auth.Credentials;
3738
import com.google.auth.oauth2.GoogleCredentials;
39+
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
40+
import com.google.cloud.pubsub.v1.stub.PublisherStub;
41+
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
3842
import com.google.common.base.Preconditions;
3943
import com.google.common.collect.ImmutableList;
40-
import com.google.common.collect.ImmutableMap;
41-
import com.google.common.util.concurrent.FutureCallback;
42-
import com.google.common.util.concurrent.Futures;
4344
import com.google.pubsub.v1.PublishRequest;
4445
import com.google.pubsub.v1.PublishResponse;
45-
import com.google.pubsub.v1.PublisherGrpc;
46-
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
4746
import com.google.pubsub.v1.PubsubMessage;
4847
import com.google.pubsub.v1.TopicName;
4948
import com.google.pubsub.v1.TopicNames;
50-
import io.grpc.CallCredentials;
51-
import io.grpc.Channel;
5249
import io.grpc.Status;
53-
import io.grpc.auth.MoreCallCredentials;
5450
import java.io.IOException;
55-
import java.util.ArrayList;
51+
import java.util.Collections;
5652
import java.util.Iterator;
5753
import java.util.LinkedList;
5854
import java.util.List;
59-
import java.util.Map;
6055
import java.util.concurrent.ScheduledExecutorService;
6156
import java.util.concurrent.ScheduledFuture;
62-
import java.util.concurrent.ThreadLocalRandom;
6357
import java.util.concurrent.TimeUnit;
6458
import java.util.concurrent.atomic.AtomicBoolean;
6559
import java.util.concurrent.locks.Lock;
6660
import java.util.concurrent.locks.ReentrantLock;
6761
import java.util.logging.Level;
6862
import java.util.logging.Logger;
69-
import javax.annotation.Nullable;
7063
import org.threeten.bp.Duration;
7164

7265
/**
@@ -93,21 +86,18 @@ public class Publisher {
9386
private final String topicName;
9487

9588
private final BatchingSettings batchingSettings;
96-
private final RetrySettings retrySettings;
97-
private final LongRandom longRandom;
9889

9990
private final Lock messagesBatchLock;
10091
private List<OutstandingPublish> messagesBatch;
10192
private int batchedBytes;
10293

10394
private final AtomicBoolean activeAlarm;
10495

105-
private final Channel channel;
106-
@Nullable private final CallCredentials callCredentials;
96+
private final PublisherStub publisherStub;
10797

10898
private final ScheduledExecutorService executor;
10999
private final AtomicBoolean shutdown;
110-
private final List<AutoCloseable> closeables = new ArrayList<>();
100+
private final List<AutoCloseable> closeables;
111101
private final MessageWaiter messagesWaiter;
112102
private ScheduledFuture<?> currentAlarmFuture;
113103

@@ -125,40 +115,43 @@ private Publisher(Builder builder) throws IOException {
125115
topicName = builder.topicName;
126116

127117
this.batchingSettings = builder.batchingSettings;
128-
this.retrySettings = builder.retrySettings;
129-
this.longRandom = builder.longRandom;
130118

131119
messagesBatch = new LinkedList<>();
132120
messagesBatchLock = new ReentrantLock();
133121
activeAlarm = new AtomicBoolean(false);
134122
executor = builder.executorProvider.getExecutor();
135123
if (builder.executorProvider.shouldAutoClose()) {
136-
closeables.add(new ExecutorAsBackgroundResource(executor));
124+
closeables =
125+
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
126+
} else {
127+
closeables = Collections.emptyList();
137128
}
138-
TransportChannelProvider channelProvider = builder.channelProvider;
139-
if (channelProvider.needsExecutor()) {
140-
channelProvider = channelProvider.withExecutor(executor);
141-
}
142-
if (channelProvider.needsHeaders()) {
143-
Map<String, String> headers =
144-
ImmutableMap.<String, String>builder()
145-
.putAll(builder.headerProvider.getHeaders())
146-
.putAll(builder.internalHeaderProvider.getHeaders())
147-
.build();
148-
channelProvider = channelProvider.withHeaders(headers);
149-
}
150-
if (channelProvider.needsEndpoint()) {
151-
channelProvider = channelProvider.withEndpoint(TopicAdminSettings.getDefaultEndpoint());
152-
}
153-
GrpcTransportChannel transportChannel =
154-
(GrpcTransportChannel) channelProvider.getTransportChannel();
155-
channel = transportChannel.getChannel();
156-
if (channelProvider.shouldAutoClose()) {
157-
closeables.add(transportChannel);
129+
130+
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
131+
// We post-process this here to keep backward-compatibility.
132+
RetrySettings retrySettings = builder.retrySettings;
133+
if (retrySettings.getMaxAttempts() == 0) {
134+
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
158135
}
159136

160-
Credentials credentials = builder.credentialsProvider.getCredentials();
161-
callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials);
137+
PublisherStubSettings.Builder stubSettings =
138+
PublisherStubSettings.newBuilder()
139+
.setCredentialsProvider(builder.credentialsProvider)
140+
.setExecutorProvider(FixedExecutorProvider.create(executor))
141+
.setTransportChannelProvider(builder.channelProvider);
142+
stubSettings
143+
.publishSettings()
144+
.setRetryableCodes(
145+
StatusCode.Code.ABORTED,
146+
StatusCode.Code.CANCELLED,
147+
StatusCode.Code.DEADLINE_EXCEEDED,
148+
StatusCode.Code.INTERNAL,
149+
StatusCode.Code.RESOURCE_EXHAUSTED,
150+
StatusCode.Code.UNKNOWN,
151+
StatusCode.Code.UNAVAILABLE)
152+
.setRetrySettings(retrySettings)
153+
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
154+
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
162155

163156
shutdown = new AtomicBoolean(false);
164157
messagesWaiter = new MessageWaiter();
@@ -320,21 +313,9 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
320313
publishRequest.addMessages(outstandingPublish.message);
321314
}
322315

323-
long rpcTimeoutMs =
324-
Math.round(
325-
retrySettings.getInitialRpcTimeout().toMillis()
326-
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1));
327-
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis());
328-
329-
PublisherFutureStub stub =
330-
PublisherGrpc.newFutureStub(channel).withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
331-
if (callCredentials != null) {
332-
stub = stub.withCallCredentials(callCredentials);
333-
}
334-
335-
Futures.addCallback(
336-
stub.publish(publishRequest.build()),
337-
new FutureCallback<PublishResponse>() {
316+
ApiFutures.addCallback(
317+
publisherStub.publishCallable().futureCall(publishRequest.build()),
318+
new ApiFutureCallback<PublishResponse>() {
338319
@Override
339320
public void onSuccess(PublishResponse result) {
340321
try {
@@ -364,37 +345,13 @@ public void onSuccess(PublishResponse result) {
364345

365346
@Override
366347
public void onFailure(Throwable t) {
367-
long nextBackoffDelay =
368-
computeNextBackoffDelayMs(outstandingBatch, retrySettings, longRandom);
369-
370-
if (!isRetryable(t)
371-
|| retrySettings.getMaxAttempts() > 0
372-
&& outstandingBatch.getAttempt() > retrySettings.getMaxAttempts()
373-
|| System.currentTimeMillis() + nextBackoffDelay
374-
> outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) {
375-
try {
376-
ApiException gaxException =
377-
ApiExceptionFactory.createException(
378-
t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false);
379-
for (OutstandingPublish outstandingPublish :
380-
outstandingBatch.outstandingPublishes) {
381-
outstandingPublish.publishResult.setException(gaxException);
382-
}
383-
} finally {
384-
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
348+
try {
349+
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
350+
outstandingPublish.publishResult.setException(t);
385351
}
386-
return;
352+
} finally {
353+
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
387354
}
388-
389-
executor.schedule(
390-
new Runnable() {
391-
@Override
392-
public void run() {
393-
publishOutstandingBatch(outstandingBatch);
394-
}
395-
},
396-
nextBackoffDelay,
397-
TimeUnit.MILLISECONDS);
398355
}
399356
});
400357
}
@@ -459,43 +416,13 @@ public void shutdown() throws Exception {
459416
for (AutoCloseable closeable : closeables) {
460417
closeable.close();
461418
}
419+
publisherStub.shutdown();
462420
}
463421

464422
private boolean hasBatchingBytes() {
465423
return getMaxBatchBytes() > 0;
466424
}
467425

468-
private static long computeNextBackoffDelayMs(
469-
OutstandingBatch outstandingBatch, RetrySettings retrySettings, LongRandom longRandom) {
470-
long delayMillis =
471-
Math.round(
472-
retrySettings.getInitialRetryDelay().toMillis()
473-
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBatch.attempt - 1));
474-
delayMillis = Math.min(retrySettings.getMaxRetryDelay().toMillis(), delayMillis);
475-
outstandingBatch.attempt++;
476-
return longRandom.nextLong(0, delayMillis);
477-
}
478-
479-
private boolean isRetryable(Throwable t) {
480-
Status status = Status.fromThrowable(t);
481-
switch (status.getCode()) {
482-
case ABORTED:
483-
case CANCELLED:
484-
case DEADLINE_EXCEEDED:
485-
case INTERNAL:
486-
case RESOURCE_EXHAUSTED:
487-
case UNKNOWN:
488-
case UNAVAILABLE:
489-
return true;
490-
default:
491-
return false;
492-
}
493-
}
494-
495-
interface LongRandom {
496-
long nextLong(long least, long bound);
497-
}
498-
499426
/**
500427
* Constructs a new {@link Builder} using the given topic.
501428
*
@@ -565,13 +492,6 @@ public static final class Builder {
565492
.setRpcTimeoutMultiplier(2)
566493
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
567494
.build();
568-
static final LongRandom DEFAULT_LONG_RANDOM =
569-
new LongRandom() {
570-
@Override
571-
public long nextLong(long least, long bound) {
572-
return ThreadLocalRandom.current().nextLong(least, bound);
573-
}
574-
};
575495

576496
private static final int THREADS_PER_CPU = 5;
577497
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -585,7 +505,6 @@ public long nextLong(long least, long bound) {
585505
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
586506

587507
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
588-
LongRandom longRandom = DEFAULT_LONG_RANDOM;
589508

590509
TransportChannelProvider channelProvider =
591510
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
@@ -673,12 +592,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
673592
return this;
674593
}
675594

676-
@InternalApi
677-
Builder setLongRandom(LongRandom longRandom) {
678-
this.longRandom = Preconditions.checkNotNull(longRandom);
679-
return this;
680-
}
681-
682595
/** Gives the ability to set a custom executor to be used by the library. */
683596
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
684597
this.executorProvider = Preconditions.checkNotNull(executorProvider);

branches/spanner-gapic-migration/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ public void setupScheduleExpectation(Duration delay) {
9393
expectedWorkQueue.add(delay);
9494
}
9595
}
96-
96+
9797
/**
98-
* Blocks the current thread until all the work
99-
* {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been
100-
* scheduled in the executor.
98+
* Blocks the current thread until all the work {@link
99+
* FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in
100+
* the executor.
101101
*/
102102
public void waitForExpectedWork() {
103103
synchronized (expectedWorkQueue) {
@@ -224,7 +224,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
224224
}
225225
expectedWorkQueue.notifyAll();
226226
}
227-
227+
228228
return callable.getScheduledFuture();
229229
}
230230

0 commit comments

Comments
 (0)