4848import java .util .LinkedList ;
4949import java .util .List ;
5050import java .util .concurrent .ScheduledExecutorService ;
51- import java .util .concurrent .ScheduledFuture ;
5251import java .util .logging .Level ;
5352import java .util .logging .Logger ;
5453import javax .annotation .Nullable ;
@@ -94,6 +93,7 @@ public class Subscriber extends AbstractApiService {
9493 @ InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600 ;
9594 @ InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10 ;
9695 private static final Duration UNARY_TIMEOUT = Duration .ofSeconds (60 );
96+ private static final Duration ACK_EXPIRATION_PADDING = Duration .ofSeconds (5 );
9797
9898 private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
9999 InstantiatingExecutorProvider .newBuilder ().setExecutorThreadCount (6 ).build ().getExecutor ();
@@ -102,7 +102,6 @@ public class Subscriber extends AbstractApiService {
102102
103103 private final String subscriptionName ;
104104 private final FlowControlSettings flowControlSettings ;
105- private final Duration ackExpirationPadding ;
106105 private final Duration maxAckExtensionPeriod ;
107106 private final ScheduledExecutorService executor ;
108107 @ Nullable private final ScheduledExecutorService alarmsExecutor ;
@@ -120,20 +119,12 @@ public class Subscriber extends AbstractApiService {
120119 new LinkedList <>();
121120 private final ApiClock clock ;
122121 private final List <AutoCloseable > closeables = new ArrayList <>();
123- private ScheduledFuture <?> ackDeadlineUpdater ;
124122
125123 private Subscriber (Builder builder ) {
126124 receiver = builder .receiver ;
127125 flowControlSettings = builder .flowControlSettings ;
128126 subscriptionName = builder .subscriptionName ;
129127
130- Preconditions .checkArgument (
131- builder .ackExpirationPadding .compareTo (Duration .ZERO ) > 0 , "padding must be positive" );
132- Preconditions .checkArgument (
133- builder .ackExpirationPadding .compareTo (Duration .ofSeconds (MIN_ACK_DEADLINE_SECONDS )) < 0 ,
134- "padding must be less than %s seconds" ,
135- MIN_ACK_DEADLINE_SECONDS );
136- ackExpirationPadding = builder .ackExpirationPadding ;
137128 maxAckExtensionPeriod = builder .maxAckExtensionPeriod ;
138129 clock = builder .clock .isPresent () ? builder .clock .get () : CurrentMillisClock .getDefaultClock ();
139130
@@ -228,12 +219,6 @@ public String getSubscriptionNameString() {
228219 return subscriptionName ;
229220 }
230221
231- /** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */
232- @ InternalApi
233- Duration getAckExpirationPadding () {
234- return ackExpirationPadding ;
235- }
236-
237222 /** The flow control settings the Subscriber is configured with. */
238223 public FlowControlSettings getFlowControlSettings () {
239224 return flowControlSettings ;
@@ -331,14 +316,14 @@ public void run() {
331316 .start ();
332317 }
333318
334- private void startStreamingConnections () throws IOException {
319+ private void startStreamingConnections () {
335320 synchronized (streamingSubscriberConnections ) {
336321 for (int i = 0 ; i < numPullers ; i ++) {
337322 streamingSubscriberConnections .add (
338323 new StreamingSubscriberConnection (
339324 subscriptionName ,
340325 receiver ,
341- ackExpirationPadding ,
326+ ACK_EXPIRATION_PADDING ,
342327 maxAckExtensionPeriod ,
343328 ackLatencyDistribution ,
344329 subStub ,
@@ -372,9 +357,6 @@ public void failed(State from, Throwable failure) {
372357
373358 private void stopAllStreamingConnections () {
374359 stopConnections (streamingSubscriberConnections );
375- if (ackDeadlineUpdater != null ) {
376- ackDeadlineUpdater .cancel (true );
377- }
378360 }
379361
380362 private void startConnections (
@@ -410,8 +392,6 @@ private void stopConnections(List<? extends ApiService> connections) {
410392
411393 /** Builder of {@link Subscriber Subscribers}. */
412394 public static final class Builder {
413- private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration .ofMillis (100 );
414- private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration .ofSeconds (5 );
415395 private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration .ofMinutes (60 );
416396
417397 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -423,7 +403,6 @@ public static final class Builder {
423403 String subscriptionName ;
424404 MessageReceiver receiver ;
425405
426- Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING ;
427406 Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD ;
428407
429408 FlowControlSettings flowControlSettings =
@@ -437,8 +416,6 @@ public static final class Builder {
437416 .setKeepAliveTime (Duration .ofMinutes (5 ))
438417 .build ();
439418 HeaderProvider headerProvider = new NoHeaderProvider ();
440- HeaderProvider internalHeaderProvider =
441- SubscriptionAdminSettings .defaultApiClientHeaderProviderBuilder ().build ();
442419 CredentialsProvider credentialsProvider =
443420 SubscriptionAdminSettings .defaultCredentialsProviderBuilder ().build ();
444421 Optional <ApiClock > clock = Optional .absent ();
@@ -478,21 +455,6 @@ public Builder setHeaderProvider(HeaderProvider headerProvider) {
478455 return this ;
479456 }
480457
481- /**
482- * Sets the static header provider for getting internal (library-defined) headers. The header
483- * provider will be called during client construction only once. The headers returned by the
484- * provider will be cached and supplied as is for each request issued by the constructed client.
485- * Some reserved headers can be overridden (e.g. Content-Type) or merged with the default value
486- * (e.g. User-Agent) by the underlying transport layer.
487- *
488- * @param internalHeaderProvider the internal header provider
489- * @return the builder
490- */
491- Builder setInternalHeaderProvider (HeaderProvider internalHeaderProvider ) {
492- this .internalHeaderProvider = Preconditions .checkNotNull (internalHeaderProvider );
493- return this ;
494- }
495-
496458 /**
497459 * Sets the flow control settings.
498460 *
@@ -523,25 +485,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
523485 return this ;
524486 }
525487
526- /**
527- * Set acknowledgement expiration padding.
528- *
529- * <p>This is the time accounted before a message expiration is to happen, so the {@link
530- * Subscriber} is able to send an ack extension beforehand.
531- *
532- * <p>This padding duration is configurable so you can account for network latency. A reasonable
533- * number must be provided so messages don't expire because of network latency between when the
534- * ack extension is required and when it reaches the Pub/Sub service.
535- *
536- * @param ackExpirationPadding must be greater or equal to {@link #MIN_ACK_EXPIRATION_PADDING}
537- */
538- @ InternalApi
539- Builder setAckExpirationPadding (Duration ackExpirationPadding ) {
540- Preconditions .checkArgument (ackExpirationPadding .compareTo (MIN_ACK_EXPIRATION_PADDING ) >= 0 );
541- this .ackExpirationPadding = ackExpirationPadding ;
542- return this ;
543- }
544-
545488 /**
546489 * Set the maximum period a message ack deadline will be extended. Defaults to one hour.
547490 *
0 commit comments