@@ -282,85 +282,6 @@ protected void doStop() {
282282 }
283283 }
284284
285- // private void startStreamingConnections() {
286- // synchronized (streamingSubscriberConnections) {
287- // for (int i = 0; i < numChannels; i++) {
288- // streamingSubscriberConnections.add(
289- // new StreamingSubscriberConnection(
290- // cachedSubscriptionNameString,
291- // receiver,
292- // ackExpirationPadding,
293- // maxAckExtensionPeriod,
294- // streamAckDeadlineSeconds,
295- // ackLatencyDistribution,
296- // channels.get(i),
297- // flowController,
298- // executor,
299- // alarmsExecutor,
300- // clock));
301- // }
302- // startConnections(
303- // streamingSubscriberConnections,
304- // new Listener() {
305- // @Override
306- // public void failed(State from, Throwable failure) {
307- // // If a connection failed is because of a fatal error, we should fail the
308- // // whole subscriber.
309- // stopAllStreamingConnections();
310- // if (failure instanceof StatusRuntimeException
311- // && ((StatusRuntimeException) failure).getStatus().getCode()
312- // == Status.Code.UNIMPLEMENTED) {
313- // logger.info("Unable to open streaming connections, falling back to polling.");
314- // startPollingConnections();
315- // return;
316- // }
317- // notifyFailed(failure);
318- // }
319- // });
320- // }
321- //
322- // ackDeadlineUpdater =
323- // executor.scheduleAtFixedRate(
324- // new Runnable() {
325- // @Override
326- // public void run() {
327- // // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
328- // long ackLatency =
329- // ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
330- // if (ackLatency > 0) {
331- // long ackExpirationPaddingMillis = ackExpirationPadding.toMillis();
332- // int possibleStreamAckDeadlineSeconds =
333- // Math.max(
334- // MIN_ACK_DEADLINE_SECONDS,
335- // Ints.saturatedCast(
336- // Math.max(ackLatency,
337- // TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis))));
338- // if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
339- // streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
340- // logger.log(
341- // Level.FINER,
342- // "Updating stream deadline to {0} seconds.",
343- // streamAckDeadlineSeconds);
344- // for (StreamingSubscriberConnection subscriberConnection :
345- // streamingSubscriberConnections) {
346- // subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
347- // }
348- // }
349- // }
350- // }
351- // },
352- // ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
353- // ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
354- // TimeUnit.MILLISECONDS);
355- // }
356- //
357- // private void stopAllStreamingConnections() {
358- // stopConnections(streamingSubscriberConnections);
359- // if (ackDeadlineUpdater != null) {
360- // ackDeadlineUpdater.cancel(true);
361- // }
362- // }
363-
364285 private void startPollingConnections () throws IOException {
365286 synchronized (pollingSubscriberConnections ) {
366287 Credentials credentials = credentialsProvider .getCredentials ();
0 commit comments