2525import com .google .api .core .BetaApi ;
2626import com .google .api .core .SettableApiFuture ;
2727import com .google .api .gax .batching .BatchingSettings ;
28+ import com .google .api .gax .core .BackgroundResource ;
29+ import com .google .api .gax .core .BackgroundResourceAggregation ;
2830import com .google .api .gax .core .CredentialsProvider ;
2931import com .google .api .gax .core .ExecutorAsBackgroundResource ;
3032import com .google .api .gax .core .ExecutorProvider ;
4749import com .google .pubsub .v1 .TopicNames ;
4850import java .io .IOException ;
4951import java .util .ArrayList ;
50- import java .util .Collections ;
5152import java .util .Iterator ;
5253import java .util .LinkedList ;
5354import java .util .List ;
@@ -94,7 +95,7 @@ public class Publisher {
9495
9596 private final ScheduledExecutorService executor ;
9697 private final AtomicBoolean shutdown ;
97- private final List < AutoCloseable > closeables ;
98+ private final BackgroundResource backgroundResources ;
9899 private final MessageWaiter messagesWaiter ;
99100 private ScheduledFuture <?> currentAlarmFuture ;
100101 private final ApiFunction <PubsubMessage , PubsubMessage > messageTransform ;
@@ -119,11 +120,9 @@ private Publisher(Builder builder) throws IOException {
119120 messagesBatchLock = new ReentrantLock ();
120121 activeAlarm = new AtomicBoolean (false );
121122 executor = builder .executorProvider .getExecutor ();
123+ List <BackgroundResource > backgroundResourceList = new ArrayList <>();
122124 if (builder .executorProvider .shouldAutoClose ()) {
123- closeables =
124- Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
125- } else {
126- closeables = Collections .emptyList ();
125+ backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
127126 }
128127
129128 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
@@ -151,7 +150,8 @@ private Publisher(Builder builder) throws IOException {
151150 .setRetrySettings (retrySettings )
152151 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
153152 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
154-
153+ backgroundResourceList .add (publisherStub );
154+ backgroundResources = new BackgroundResourceAggregation (backgroundResourceList );
155155 shutdown = new AtomicBoolean (false );
156156 messagesWaiter = new MessageWaiter ();
157157 }
@@ -397,11 +397,7 @@ public void shutdown() throws Exception {
397397 currentAlarmFuture .cancel (false );
398398 }
399399 publishAllOutstanding ();
400- messagesWaiter .waitNoMessages ();
401- for (AutoCloseable closeable : closeables ) {
402- closeable .close ();
403- }
404- publisherStub .shutdown ();
400+ backgroundResources .shutdown ();
405401 }
406402
407403 /**
@@ -411,7 +407,7 @@ public void shutdown() throws Exception {
411407 * <p>Call this method to make sure all resources are freed properly.
412408 */
413409 public boolean awaitTermination (long duration , TimeUnit unit ) throws InterruptedException {
414- return publisherStub .awaitTermination (duration , unit );
410+ return backgroundResources .awaitTermination (duration , unit );
415411 }
416412
417413 private boolean hasBatchingBytes () {
0 commit comments