@@ -278,7 +278,7 @@ public void run() {
278278 @ Override
279279 protected void doStop () {
280280 stopAllStreamingConnections ();
281- stopAllPollingConnections ();
281+ // stopAllPollingConnections();
282282 try {
283283 for (AutoCloseable closeable : closeables ) {
284284 closeable .close ();
@@ -339,37 +339,33 @@ public void failed(State from, Throwable failure) {
339339 new Runnable () {
340340 @ Override
341341 public void run () {
342- // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
343- long ackLatency =
344- ackLatencyDistribution .getNthPercentile (PERCENTILE_FOR_ACK_DEADLINE_UPDATES );
345- if (ackLatency > 0 ) {
346- long ackExpirationPaddingMillis = ackExpirationPadding .toMillis ();
347- int possibleStreamAckDeadlineSeconds =
348- Math .max (
349- MIN_ACK_DEADLINE_SECONDS ,
350- Ints .saturatedCast (
351- Math .max (
352- ackLatency ,
353- TimeUnit .MILLISECONDS .toSeconds (ackExpirationPaddingMillis ))));
354- if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds ) {
355- streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds ;
356- logger .log (
357- Level .FINER ,
358- "Updating stream deadline to {0} seconds." ,
359- streamAckDeadlineSeconds );
360- for (StreamingSubscriberConnection subscriberConnection :
361- streamingSubscriberConnections ) {
362- subscriberConnection .updateStreamAckDeadline (streamAckDeadlineSeconds );
363- }
364- }
365- }
342+ updateAckDeadline ();
366343 }
367344 },
368345 ACK_DEADLINE_UPDATE_PERIOD .toMillis (),
369346 ACK_DEADLINE_UPDATE_PERIOD .toMillis (),
370347 TimeUnit .MILLISECONDS );
371348 }
372349
350+ private void updateAckDeadline () {
351+ // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
352+ long ackLatency = ackLatencyDistribution .getNthPercentile (PERCENTILE_FOR_ACK_DEADLINE_UPDATES );
353+ if (ackLatency > 0 ) {
354+ int possibleStreamAckDeadlineSeconds =
355+ Math .max (
356+ MIN_ACK_DEADLINE_SECONDS ,
357+ Ints .saturatedCast (Math .max (ackLatency , ackExpirationPadding .getSeconds ())));
358+ if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds ) {
359+ streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds ;
360+ logger .log (
361+ Level .FINER , "Updating stream deadline to {0} seconds." , streamAckDeadlineSeconds );
362+ for (StreamingSubscriberConnection subscriberConnection : streamingSubscriberConnections ) {
363+ subscriberConnection .updateStreamAckDeadline (streamAckDeadlineSeconds );
364+ }
365+ }
366+ }
367+ }
368+
373369 private void stopAllStreamingConnections () {
374370 stopConnections (streamingSubscriberConnections );
375371 if (ackDeadlineUpdater != null ) {
0 commit comments