Skip to content

Commit a14a023

Browse files
Praful Makanisduskis
authored andcommitted
---
yaml --- r: 35859 b: refs/heads/autosynth-dataproc c: 59c609f h: refs/heads/master i: 35857: 66d9a50 35855: 7be8ce8
1 parent eb91eaa commit a14a023

3 files changed

Lines changed: 45 additions & 14 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ refs/tags/v0.78.0: 62d4bd30605ab3578f9a08d84487fb0b33ac2ff5
165165
refs/tags/v0.79.0: 82287b570708748c411d05c40f3932cff9606feb
166166
refs/tags/v0.80.0: f745e744d38e4fe636f34d0e04795ba3d014287d
167167
refs/tags/v0.81.0: ed3a0c85339ea6b73560b9a570abfbb76b93a263
168-
refs/heads/autosynth-dataproc: a6be0c14e89e0dadbde8bec8652afbf5e5607c42
168+
refs/heads/autosynth-dataproc: 59c609fc9dd9d865a1466969410e280b7bdf62dd
169169
refs/heads/autosynth-securitycenter: b24087060036e623e57d2454ba5dabeaf1e530c5
170170
refs/heads/autosynth-talent: 4ca901879f86aab61091cea52e8a9b653639df24
171171
refs/tags/v0.82.0: 7b9807d5d0a400c757b8905fee768be4c85eba25

branches/autosynth-dataproc/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.google.api.core.BetaApi;
2626
import com.google.api.core.SettableApiFuture;
2727
import com.google.api.gax.batching.BatchingSettings;
28+
import com.google.api.gax.core.BackgroundResource;
29+
import com.google.api.gax.core.BackgroundResourceAggregation;
2830
import com.google.api.gax.core.CredentialsProvider;
2931
import com.google.api.gax.core.ExecutorAsBackgroundResource;
3032
import com.google.api.gax.core.ExecutorProvider;
@@ -47,7 +49,6 @@
4749
import com.google.pubsub.v1.TopicNames;
4850
import java.io.IOException;
4951
import java.util.ArrayList;
50-
import java.util.Collections;
5152
import java.util.Iterator;
5253
import java.util.LinkedList;
5354
import java.util.List;
@@ -94,7 +95,7 @@ public class Publisher {
9495

9596
private final ScheduledExecutorService executor;
9697
private final AtomicBoolean shutdown;
97-
private final List<AutoCloseable> closeables;
98+
private final BackgroundResource backgroundResources;
9899
private final MessageWaiter messagesWaiter;
99100
private ScheduledFuture<?> currentAlarmFuture;
100101
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;
@@ -119,11 +120,9 @@ private Publisher(Builder builder) throws IOException {
119120
messagesBatchLock = new ReentrantLock();
120121
activeAlarm = new AtomicBoolean(false);
121122
executor = builder.executorProvider.getExecutor();
123+
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
122124
if (builder.executorProvider.shouldAutoClose()) {
123-
closeables =
124-
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
125-
} else {
126-
closeables = Collections.emptyList();
125+
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
127126
}
128127

129128
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
@@ -151,7 +150,8 @@ private Publisher(Builder builder) throws IOException {
151150
.setRetrySettings(retrySettings)
152151
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
153152
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
154-
153+
backgroundResourceList.add(publisherStub);
154+
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
155155
shutdown = new AtomicBoolean(false);
156156
messagesWaiter = new MessageWaiter();
157157
}
@@ -397,11 +397,7 @@ public void shutdown() throws Exception {
397397
currentAlarmFuture.cancel(false);
398398
}
399399
publishAllOutstanding();
400-
messagesWaiter.waitNoMessages();
401-
for (AutoCloseable closeable : closeables) {
402-
closeable.close();
403-
}
404-
publisherStub.shutdown();
400+
backgroundResources.shutdown();
405401
}
406402

407403
/**
@@ -411,7 +407,7 @@ public void shutdown() throws Exception {
411407
* <p>Call this method to make sure all resources are freed properly.
412408
*/
413409
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
414-
return publisherStub.awaitTermination(duration, unit);
410+
return backgroundResources.awaitTermination(duration, unit);
415411
}
416412

417413
private boolean hasBatchingBytes() {

branches/autosynth-dataproc/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.grpc.inprocess.InProcessServerBuilder;
4444
import java.util.concurrent.ExecutionException;
4545
import java.util.concurrent.TimeUnit;
46+
import org.easymock.EasyMock;
4647
import org.junit.After;
4748
import org.junit.Before;
4849
import org.junit.Test;
@@ -570,6 +571,40 @@ public void testBuilderInvalidArguments() {
570571
}
571572
}
572573

574+
@Test
575+
public void testAwaitTermination() throws Exception {
576+
Publisher publisher =
577+
getTestPublisherBuilder()
578+
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
579+
.setRetrySettings(
580+
Publisher.Builder.DEFAULT_RETRY_SETTINGS
581+
.toBuilder()
582+
.setTotalTimeout(Duration.ofSeconds(10))
583+
.setMaxAttempts(0)
584+
.build())
585+
.build();
586+
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
587+
publisher.shutdown();
588+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
589+
}
590+
591+
@Test
592+
public void testShutDown() throws Exception {
593+
ApiFuture apiFuture = EasyMock.mock(ApiFuture.class);
594+
Publisher publisher = EasyMock.mock(Publisher.class);
595+
EasyMock.expect(
596+
publisher.publish(
597+
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build()))
598+
.andReturn(apiFuture);
599+
EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true);
600+
publisher.shutdown();
601+
EasyMock.expectLastCall().once();
602+
EasyMock.replay(publisher);
603+
sendTestMessage(publisher, "A");
604+
publisher.shutdown();
605+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
606+
}
607+
573608
private Builder getTestPublisherBuilder() {
574609
return Publisher.newBuilder(TEST_TOPIC)
575610
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))

0 commit comments

Comments
 (0)