Skip to content

Commit 3475c5e

Browse files
committed
pr comment
1 parent bb73a70 commit 3475c5e

File tree

1 file changed

+21
-25
lines changed
  • google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

1 file changed

+21
-25
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public void run() {
278278
@Override
279279
protected void doStop() {
280280
stopAllStreamingConnections();
281-
stopAllPollingConnections();
281+
// stopAllPollingConnections();
282282
try {
283283
for (AutoCloseable closeable : closeables) {
284284
closeable.close();
@@ -339,37 +339,33 @@ public void failed(State from, Throwable failure) {
339339
new Runnable() {
340340
@Override
341341
public void run() {
342-
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
343-
long ackLatency =
344-
ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
345-
if (ackLatency > 0) {
346-
long ackExpirationPaddingMillis = ackExpirationPadding.toMillis();
347-
int possibleStreamAckDeadlineSeconds =
348-
Math.max(
349-
MIN_ACK_DEADLINE_SECONDS,
350-
Ints.saturatedCast(
351-
Math.max(
352-
ackLatency,
353-
TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis))));
354-
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
355-
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
356-
logger.log(
357-
Level.FINER,
358-
"Updating stream deadline to {0} seconds.",
359-
streamAckDeadlineSeconds);
360-
for (StreamingSubscriberConnection subscriberConnection :
361-
streamingSubscriberConnections) {
362-
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
363-
}
364-
}
365-
}
342+
updateAckDeadline();
366343
}
367344
},
368345
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
369346
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
370347
TimeUnit.MILLISECONDS);
371348
}
372349

350+
private void updateAckDeadline() {
351+
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
352+
long ackLatency = ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
353+
if (ackLatency > 0) {
354+
int possibleStreamAckDeadlineSeconds =
355+
Math.max(
356+
MIN_ACK_DEADLINE_SECONDS,
357+
Ints.saturatedCast(Math.max(ackLatency, ackExpirationPadding.getSeconds())));
358+
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
359+
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
360+
logger.log(
361+
Level.FINER, "Updating stream deadline to {0} seconds.", streamAckDeadlineSeconds);
362+
for (StreamingSubscriberConnection subscriberConnection : streamingSubscriberConnections) {
363+
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
364+
}
365+
}
366+
}
367+
}
368+
373369
private void stopAllStreamingConnections() {
374370
stopConnections(streamingSubscriberConnections);
375371
if (ackDeadlineUpdater != null) {

0 commit comments

Comments
 (0)