3535import java .util .concurrent .ExecutorService ;
3636import java .util .concurrent .Executors ;
3737import java .util .concurrent .Future ;
38- import java .util .concurrent .ScheduledExecutorService ;
39- import java .util .concurrent .ScheduledThreadPoolExecutor ;
4038import java .util .concurrent .atomic .AtomicInteger ;
4139
4240/**
@@ -46,17 +44,15 @@ final class MessageConsumerImpl implements MessageConsumer {
4644
4745 private static final int MAX_QUEUED_CALLBACKS = 100 ;
4846 // shared scheduled executor, used to schedule pulls
49- private static final SharedResourceHolder .Resource <ScheduledExecutorService > TIMER =
50- new SharedResourceHolder .Resource <ScheduledExecutorService >() {
47+ private static final SharedResourceHolder .Resource <ExecutorService > CONSUMER_EXECUTOR =
48+ new SharedResourceHolder .Resource <ExecutorService >() {
5149 @ Override
52- public ScheduledExecutorService create () {
53- ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor (1 );
54- timer .setRemoveOnCancelPolicy (true );
55- return timer ;
50+ public ExecutorService create () {
51+ return Executors .newSingleThreadExecutor ();
5652 }
5753
5854 @ Override
59- public void close (ScheduledExecutorService instance ) {
55+ public void close (ExecutorService instance ) {
6056 instance .shutdown ();
6157 }
6258 };
@@ -67,7 +63,7 @@ public void close(ScheduledExecutorService instance) {
6763 private final AckDeadlineRenewer deadlineRenewer ;
6864 private final String subscription ;
6965 private final MessageProcessor messageProcessor ;
70- private final ScheduledExecutorService timer ;
66+ private final ExecutorService consumerExecutor ;
7167 private final ExecutorFactory <ExecutorService > executorFactory ;
7268 private final ExecutorService executor ;
7369 private final AtomicInteger queuedCallbacks ;
@@ -192,7 +188,7 @@ private MessageConsumerImpl(Builder builder) {
192188 this .pubsub = pubsubOptions .service ();
193189 this .deadlineRenewer = builder .deadlineRenewer ;
194190 this .queuedCallbacks = new AtomicInteger ();
195- this .timer = SharedResourceHolder .get (TIMER );
191+ this .consumerExecutor = SharedResourceHolder .get (CONSUMER_EXECUTOR );
196192 this .executorFactory =
197193 builder .executorFactory != null ? builder .executorFactory : new DefaultExecutorFactory ();
198194 this .executor = executorFactory .get ();
@@ -209,7 +205,7 @@ private void pullIfNeeded() {
209205 if (closed || scheduledFuture != null || !pullPolicy .shouldPull (queuedCallbacks .get ())) {
210206 return ;
211207 }
212- scheduledFuture = timer .submit (consumerRunnable );
208+ scheduledFuture = consumerExecutor .submit (consumerRunnable );
213209 }
214210 }
215211
@@ -219,7 +215,7 @@ private void nextPull() {
219215 scheduledFuture = null ;
220216 return ;
221217 }
222- scheduledFuture = timer .submit (consumerRunnable );
218+ scheduledFuture = consumerExecutor .submit (consumerRunnable );
223219 }
224220 }
225221
@@ -237,7 +233,7 @@ public void close() {
237233 pullerFuture .cancel (true );
238234 }
239235 }
240- SharedResourceHolder .release (TIMER , timer );
236+ SharedResourceHolder .release (CONSUMER_EXECUTOR , consumerExecutor );
241237 executorFactory .release (executor );
242238 }
243239
0 commit comments