Skip to content

Commit 1432b0e

Browse files
authored
---
yaml --- r: 28883 b: refs/heads/autosynth-dlp c: a560bb5 h: refs/heads/master i: 28881: 793eeb7 28879: 64fb085
1 parent dd5193a commit 1432b0e

3 files changed

Lines changed: 52 additions & 7 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ refs/tags/v0.60.0: 4cd518d0612329f8a8e53484eef4cd1651e32855
103103
refs/tags/v0.61.0: e4b526656bb1bf5eefd0ee578b7405147821225e
104104
refs/tags/v0.62.0: bbede7385d48ba08f487bdd29ec10668ace96396
105105
refs/heads/0.60.0-alpha: 10939381ffe0b8da32db4fe3087c86e3aa7f3e55
106-
refs/heads/autosynth-dlp: 288cc4b2df9737de805ed224cd08e723b735e2eb
106+
refs/heads/autosynth-dlp: a560bb578f1f75af30281421403aee7f35799fe5
107107
refs/heads/autosynth-logging: eca54b98c8cf82050bbdfc5c19139673dff9e5b8
108108
refs/heads/dupes: 3478c5d81fd242d0e985656645a679420a2060c2
109109
refs/tags/v0.63.0: 94f19b71d40f46b36120e7b9d78a1a3d41bfcbd6

branches/autosynth-dlp/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class MessageDispatcher {
8686
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
8787
private final Lock jobLock;
8888
private ScheduledFuture<?> backgroundJob;
89+
private ScheduledFuture<?> setExtendedDeadlineFuture;
8990

9091
// To keep track of number of seconds the receiver takes to process messages.
9192
private final Distribution ackLatencyDistribution;
@@ -239,11 +240,15 @@ public void run() {
239240
int newDeadlineSec = computeDeadlineSeconds();
240241
messageDeadlineSeconds.set(newDeadlineSec);
241242
extendDeadlines();
242-
// Don't bother cancelling this when we stop. It'd just set an atomic boolean.
243-
systemExecutor.schedule(
244-
setExtendDeadline,
245-
newDeadlineSec - ackExpirationPadding.getSeconds(),
246-
TimeUnit.SECONDS);
243+
if (setExtendedDeadlineFuture != null && !backgroundJob.isDone()) {
244+
setExtendedDeadlineFuture.cancel(true);
245+
}
246+
247+
setExtendedDeadlineFuture =
248+
systemExecutor.schedule(
249+
setExtendDeadline,
250+
newDeadlineSec - ackExpirationPadding.getSeconds(),
251+
TimeUnit.SECONDS);
247252
}
248253
processOutstandingAckOperations();
249254
} catch (Throwable t) {
@@ -266,8 +271,12 @@ void stop() {
266271
try {
267272
if (backgroundJob != null) {
268273
backgroundJob.cancel(false);
269-
backgroundJob = null;
270274
}
275+
if (setExtendedDeadlineFuture != null) {
276+
setExtendedDeadlineFuture.cancel(true);
277+
}
278+
backgroundJob = null;
279+
setExtendedDeadlineFuture = null;
271280
} finally {
272281
jobLock.unlock();
273282
}

branches/autosynth-dlp/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertTrue;
2121

22+
import com.google.api.gax.core.ExecutorProvider;
2223
import com.google.api.gax.core.FixedExecutorProvider;
2324
import com.google.api.gax.core.InstantiatingExecutorProvider;
2425
import com.google.api.gax.core.NoCredentialsProvider;
@@ -36,6 +37,9 @@
3637
import io.grpc.StatusException;
3738
import io.grpc.inprocess.InProcessChannelBuilder;
3839
import io.grpc.inprocess.InProcessServerBuilder;
40+
import java.util.concurrent.Executors;
41+
import java.util.concurrent.ScheduledExecutorService;
42+
import java.util.concurrent.TimeUnit;
3943
import org.junit.After;
4044
import org.junit.Before;
4145
import org.junit.Rule;
@@ -112,6 +116,38 @@ public void testFailedChannel_recoverableError_channelReopened() throws Exceptio
112116
subscriber.stopAsync().awaitTerminated();
113117
}
114118

119+
@Test
120+
public void testFailedChannel_testTerminated() throws Exception {
121+
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
122+
ExecutorProvider provider =
123+
new ExecutorProvider() {
124+
@Override
125+
public boolean shouldAutoClose() {
126+
return true;
127+
}
128+
129+
@Override
130+
public ScheduledExecutorService getExecutor() {
131+
return scheduledExecutorService;
132+
}
133+
};
134+
135+
try {
136+
Subscriber subscriber =
137+
startSubscriber(
138+
getTestSubscriberBuilder(testReceiver).setSystemExecutorProvider(provider));
139+
140+
// wait long enough for the MessageDispatcher to set up, which at one point
141+
// caused shutdown problems.
142+
Thread.sleep(100);
143+
subscriber.stopAsync().awaitTerminated();
144+
145+
assertTrue(scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS));
146+
} finally {
147+
scheduledExecutorService.shutdownNow();
148+
}
149+
}
150+
115151
@Test(expected = IllegalStateException.class)
116152
public void testFailedChannel_fatalError_subscriberFails() throws Exception {
117153
Subscriber subscriber =

0 commit comments

Comments
 (0)