Skip to content

Commit a72daef

Browse files
authored
---
yaml --- r: 33121 b: refs/heads/autosynth-kms c: a560bb5 h: refs/heads/master i: 33119: 8eb715b
1 parent 9708706 commit a72daef

3 files changed

Lines changed: 52 additions & 7 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ refs/heads/autosynth-dialogflow: 73974cc32e5212aec0126472e0bc1442886fedaf
132132
refs/heads/autosynth-errorreporting: effe8001d110ad584187b30aafc473db0dd4a15f
133133
refs/heads/autosynth-firestore: e79eeb26930dfae4439424ad2fda5874eeca54c8
134134
refs/heads/autosynth-iot: 044be280805a59e06d09658688c9ee474a9815ad
135-
refs/heads/autosynth-kms: 288cc4b2df9737de805ed224cd08e723b735e2eb
135+
refs/heads/autosynth-kms: a560bb578f1f75af30281421403aee7f35799fe5
136136
refs/heads/autosynth-language: e73905aa7672afa47240e65b25c087207f4594f9
137137
refs/heads/autosynth-os-login: 123ba209c5769d0ee067e0ce5848bec13b42a4f4
138138
refs/heads/autosynth-redis: 6bedce4d7c7c6ca6a22e83ad1780e08fdc565a9e

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class MessageDispatcher {
8686
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
8787
private final Lock jobLock;
8888
private ScheduledFuture<?> backgroundJob;
89+
private ScheduledFuture<?> setExtendedDeadlineFuture;
8990

9091
// To keep track of number of seconds the receiver takes to process messages.
9192
private final Distribution ackLatencyDistribution;
@@ -239,11 +240,15 @@ public void run() {
239240
int newDeadlineSec = computeDeadlineSeconds();
240241
messageDeadlineSeconds.set(newDeadlineSec);
241242
extendDeadlines();
242-
// Don't bother cancelling this when we stop. It'd just set an atomic boolean.
243-
systemExecutor.schedule(
244-
setExtendDeadline,
245-
newDeadlineSec - ackExpirationPadding.getSeconds(),
246-
TimeUnit.SECONDS);
243+
if (setExtendedDeadlineFuture != null && !backgroundJob.isDone()) {
244+
setExtendedDeadlineFuture.cancel(true);
245+
}
246+
247+
setExtendedDeadlineFuture =
248+
systemExecutor.schedule(
249+
setExtendDeadline,
250+
newDeadlineSec - ackExpirationPadding.getSeconds(),
251+
TimeUnit.SECONDS);
247252
}
248253
processOutstandingAckOperations();
249254
} catch (Throwable t) {
@@ -266,8 +271,12 @@ void stop() {
266271
try {
267272
if (backgroundJob != null) {
268273
backgroundJob.cancel(false);
269-
backgroundJob = null;
270274
}
275+
if (setExtendedDeadlineFuture != null) {
276+
setExtendedDeadlineFuture.cancel(true);
277+
}
278+
backgroundJob = null;
279+
setExtendedDeadlineFuture = null;
271280
} finally {
272281
jobLock.unlock();
273282
}

branches/autosynth-kms/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertTrue;
2121

22+
import com.google.api.gax.core.ExecutorProvider;
2223
import com.google.api.gax.core.FixedExecutorProvider;
2324
import com.google.api.gax.core.InstantiatingExecutorProvider;
2425
import com.google.api.gax.core.NoCredentialsProvider;
@@ -36,6 +37,9 @@
3637
import io.grpc.StatusException;
3738
import io.grpc.inprocess.InProcessChannelBuilder;
3839
import io.grpc.inprocess.InProcessServerBuilder;
40+
import java.util.concurrent.Executors;
41+
import java.util.concurrent.ScheduledExecutorService;
42+
import java.util.concurrent.TimeUnit;
3943
import org.junit.After;
4044
import org.junit.Before;
4145
import org.junit.Rule;
@@ -112,6 +116,38 @@ public void testFailedChannel_recoverableError_channelReopened() throws Exceptio
112116
subscriber.stopAsync().awaitTerminated();
113117
}
114118

119+
@Test
120+
public void testFailedChannel_testTerminated() throws Exception {
121+
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
122+
ExecutorProvider provider =
123+
new ExecutorProvider() {
124+
@Override
125+
public boolean shouldAutoClose() {
126+
return true;
127+
}
128+
129+
@Override
130+
public ScheduledExecutorService getExecutor() {
131+
return scheduledExecutorService;
132+
}
133+
};
134+
135+
try {
136+
Subscriber subscriber =
137+
startSubscriber(
138+
getTestSubscriberBuilder(testReceiver).setSystemExecutorProvider(provider));
139+
140+
// wait long enough for the MessageDispatcher to set up, which at one point
141+
// caused shutdown problems.
142+
Thread.sleep(100);
143+
subscriber.stopAsync().awaitTerminated();
144+
145+
assertTrue(scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS));
146+
} finally {
147+
scheduledExecutorService.shutdownNow();
148+
}
149+
}
150+
115151
@Test(expected = IllegalStateException.class)
116152
public void testFailedChannel_fatalError_subscriberFails() throws Exception {
117153
Subscriber subscriber =

0 commit comments

Comments
 (0)