Skip to content

Commit b3a1757

Browse files
authored
---
yaml --- r: 8351 b: refs/heads/snehashah-snippets c: 91ad4f4 h: refs/heads/master i: 8349: 2c1a949 8347: 044ce7f 8343: daffa25 8335: cd781de 8319: 934293b
1 parent ac85bc0 commit b3a1757

6 files changed

Lines changed: 160 additions & 54 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5656
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5757
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5858
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
59-
refs/heads/snehashah-snippets: a9099c57595ac5af1ebd6630502b73443ad8800d
59+
refs/heads/snehashah-snippets: 91ad4f465ed5cbd2f7853290a33555b00e5c9aed
6060
refs/tags/v0.20.2: 5a53aa06f268b74dc192204f4f83e1a04d40f65d
6161
refs/tags/v0.20.3: 269025fdc14af0b68df214a4518be5379b2fe569
6262
refs/tags/v0.21.0: f88b200e00e41ba6262ee88a92abba38b1e2417e

branches/snehashah-snippets/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import com.google.api.client.http.HttpRequest;
2020
import com.google.api.client.http.HttpRequestInitializer;
2121
import com.google.api.client.http.HttpTransport;
22-
import com.google.cloud.http.HttpTransportOptions;
2322
import com.google.cloud.datastore.DatastoreException;
2423
import com.google.cloud.datastore.DatastoreOptions;
24+
import com.google.cloud.http.HttpTransportOptions;
2525
import com.google.datastore.v1.AllocateIdsRequest;
2626
import com.google.datastore.v1.AllocateIdsResponse;
2727
import com.google.datastore.v1.BeginTransactionRequest;

branches/snehashah-snippets/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,13 @@ private class AckHandler implements FutureCallback<AckReply> {
173173
private final String ackId;
174174
private final int outstandingBytes;
175175
private final AtomicBoolean acked;
176-
private final Instant receivedTime;
176+
private final long receivedTimeMillis;
177177

178178
AckHandler(String ackId, int outstandingBytes) {
179179
this.ackId = ackId;
180180
this.outstandingBytes = outstandingBytes;
181181
acked = new AtomicBoolean(false);
182-
receivedTime = Instant.ofEpochMilli(clock.millisTime());
182+
receivedTimeMillis = clock.millisTime();
183183
}
184184

185185
@Override
@@ -207,7 +207,6 @@ public void onSuccess(AckReply reply) {
207207
pendingAcks.add(ackId);
208208
}
209209
// Record the latency rounded to the next closest integer.
210-
long receivedTimeMillis = TimeUnit.NANOSECONDS.toMillis(receivedTime.getNano());
211210
ackLatencyDistribution.record(
212211
Ints.saturatedCast(
213212
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));

branches/snehashah-snippets/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@
2929
import com.google.common.util.concurrent.SettableFuture;
3030
import com.google.pubsub.v1.StreamingPullRequest;
3131
import com.google.pubsub.v1.StreamingPullResponse;
32-
import com.google.pubsub.v1.SubscriberGrpc;
33-
import io.grpc.CallOptions;
34-
import io.grpc.Channel;
32+
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
3533
import io.grpc.Status;
3634
import io.grpc.stub.ClientCallStreamObserver;
37-
import io.grpc.stub.ClientCalls;
3835
import io.grpc.stub.ClientResponseObserver;
3936
import java.util.ArrayList;
4037
import java.util.List;
@@ -55,7 +52,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5552

5653
private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
5754

58-
private final Channel channel;
55+
private final SubscriberStub asyncStub;
5956

6057
private final String subscription;
6158
private final ScheduledExecutorService executor;
@@ -69,14 +66,14 @@ public StreamingSubscriberConnection(
6966
Duration maxAckExtensionPeriod,
7067
int streamAckDeadlineSeconds,
7168
Distribution ackLatencyDistribution,
72-
Channel channel,
69+
SubscriberStub asyncStub,
7370
FlowController flowController,
7471
ScheduledExecutorService executor,
7572
@Nullable ScheduledExecutorService alarmsExecutor,
7673
ApiClock clock) {
7774
this.subscription = subscription;
7875
this.executor = executor;
79-
this.channel = channel;
76+
this.asyncStub = asyncStub;
8077
this.messageDispatcher =
8178
new MessageDispatcher(
8279
receiver,
@@ -101,8 +98,8 @@ protected void doStart() {
10198
@Override
10299
protected void doStop() {
103100
messageDispatcher.stop();
104-
notifyStopped();
105101
requestObserver.onError(Status.CANCELLED.asException());
102+
notifyStopped();
106103
}
107104

108105
private class StreamingPullResponseObserver
@@ -137,7 +134,6 @@ public void run() {
137134

138135
@Override
139136
public void onError(Throwable t) {
140-
logger.log(Level.WARNING, "Terminated streaming with exception", t);
141137
errorFuture.setException(t);
142138
}
143139

@@ -154,9 +150,7 @@ private void initialize() {
154150
new StreamingPullResponseObserver(errorFuture);
155151
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
156152
(ClientCallStreamObserver<StreamingPullRequest>)
157-
(ClientCalls.asyncBidiStreamingCall(
158-
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT),
159-
responseObserver));
153+
(asyncStub.streamingPull(responseObserver));
160154
logger.log(
161155
Level.FINER,
162156
"Initializing stream to subscription {0} with deadline {1}",
@@ -173,6 +167,9 @@ private void initialize() {
173167
new FutureCallback<Void>() {
174168
@Override
175169
public void onSuccess(@Nullable Void result) {
170+
if (!isAlive()) {
171+
return;
172+
}
176173
channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
177174
// The stream was closed. And any case we want to reopen it to continue receiving
178175
// messages.
@@ -186,6 +183,7 @@ public void onFailure(Throwable cause) {
186183
logger.log(Level.FINE, "pull failure after service no longer running", cause);
187184
return;
188185
}
186+
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
189187
if (StatusUtil.isRetryable(cause)) {
190188
long backoffMillis = channelReconnectBackoff.toMillis();
191189
channelReconnectBackoff = channelReconnectBackoff.plusMillis(backoffMillis);

0 commit comments

Comments
 (0)