1717package com .google .cloud .pubsub .spi .v1 ;
1818
1919import com .google .api .core .ApiClock ;
20- import com .google .api .gax .core .Distribution ;
2120import com .google .api .gax .batching .FlowController ;
2221import com .google .api .gax .batching .FlowController .FlowControlException ;
23- import com .google .api .gax .grpc . InstantiatingExecutorProvider ;
22+ import com .google .api .gax .core . Distribution ;
2423import com .google .cloud .pubsub .spi .v1 .MessageDispatcher .OutstandingMessagesBatch .OutstandingMessage ;
2524import com .google .common .annotations .VisibleForTesting ;
2625import com .google .common .collect .Lists ;
4645import java .util .concurrent .locks .ReentrantLock ;
4746import java .util .logging .Level ;
4847import java .util .logging .Logger ;
49- import javax .annotation .Nullable ;
5048import org .threeten .bp .Duration ;
5149import org .threeten .bp .Instant ;
5250
@@ -61,11 +59,8 @@ class MessageDispatcher {
6159 @ VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration .ofMillis (100 );
6260 private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60 ; // 10m
6361
64- private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
65- InstantiatingExecutorProvider .newBuilder ().setExecutorThreadCount (2 ).build ().getExecutor ();
66-
6762 private final ScheduledExecutorService executor ;
68- private final ScheduledExecutorService alarmsExecutor ;
63+ private final ScheduledExecutorService systemExecutor ;
6964 private final ApiClock clock ;
7065
7166 private final Duration ackExpirationPadding ;
@@ -245,10 +240,10 @@ void sendAckOperations(
245240 Distribution ackLatencyDistribution ,
246241 FlowController flowController ,
247242 ScheduledExecutorService executor ,
248- @ Nullable ScheduledExecutorService alarmsExecutor ,
243+ ScheduledExecutorService systemExecutor ,
249244 ApiClock clock ) {
250245 this .executor = executor ;
251- this .alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor ;
246+ this .systemExecutor = systemExecutor ;
252247 this .ackExpirationPadding = ackExpirationPadding ;
253248 this .maxAckExtensionPeriod = maxAckExtensionPeriod ;
254249 this .receiver = receiver ;
@@ -428,7 +423,7 @@ private void setupPendingAcksAlarm() {
428423 try {
429424 if (pendingAcksAlarm == null ) {
430425 pendingAcksAlarm =
431- alarmsExecutor .schedule (
426+ systemExecutor .schedule (
432427 new Runnable () {
433428 @ Override
434429 public void run () {
@@ -557,7 +552,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
557552 nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime ;
558553
559554 ackDeadlineExtensionAlarm =
560- alarmsExecutor .schedule (
555+ systemExecutor .schedule (
561556 new AckDeadlineAlarm (),
562557 nextAckDeadlineExtensionAlarmTime .toEpochMilli () - clock .millisTime (),
563558 TimeUnit .MILLISECONDS );
0 commit comments