Skip to content

Commit 49bed38

Browse files
authored
fix: Flush messages in serial executor before handling admin seek (#1079)
Messages queued in the serial executor should not be delivered post-seek.
1 parent c293c24 commit 49bed38

5 files changed

Lines changed: 62 additions & 37 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
3232
If you are using Gradle without BOM, add this to your dependencies
3333

3434
```Groovy
35-
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.12'
35+
implementation 'com.google.cloud:google-cloud-pubsublite:1.5.0'
3636
```
3737

3838
If you are using SBT, add this to your dependencies
3939

4040
```Scala
41-
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.12"
41+
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.5.0"
4242
```
4343

4444
## Authentication

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,44 @@
1616

1717
package com.google.cloud.pubsublite.internal;
1818

19+
import com.google.common.util.concurrent.Monitor.Guard;
1920
import java.util.ArrayDeque;
2021
import java.util.Queue;
2122
import java.util.concurrent.Executor;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2323
import javax.annotation.concurrent.GuardedBy;
2424

2525
/** An executor that runs tasks sequentially. */
2626
public final class SerialExecutor implements AutoCloseable, Executor {
2727
private final Executor executor;
28-
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
2928

30-
@GuardedBy("this")
29+
private final CloseableMonitor monitor = new CloseableMonitor();
30+
private final Guard isInactive =
31+
new Guard(monitor.monitor) {
32+
@Override
33+
public boolean isSatisfied() {
34+
return !isTaskActive;
35+
}
36+
};
37+
38+
@GuardedBy("monitor.monitor")
3139
private final Queue<Runnable> tasks;
3240

33-
@GuardedBy("this")
41+
@GuardedBy("monitor.monitor")
3442
private boolean isTaskActive;
3543

44+
@GuardedBy("monitor.monitor")
45+
private boolean isShutdown;
46+
3647
public SerialExecutor(Executor executor) {
3748
this.executor = executor;
3849
this.tasks = new ArrayDeque<>();
3950
this.isTaskActive = false;
51+
this.isShutdown = false;
52+
}
53+
54+
/** Waits until there are no active tasks. */
55+
public void waitUntilInactive() {
56+
try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {}
4057
}
4158

4259
/**
@@ -45,34 +62,45 @@ public SerialExecutor(Executor executor) {
4562
*/
4663
@Override
4764
public void close() {
48-
isShutdown.set(true);
65+
try (CloseableMonitor.Hold h = monitor.enter()) {
66+
isShutdown = true;
67+
}
4968
}
5069

5170
@Override
52-
public synchronized void execute(Runnable r) {
53-
if (isShutdown.get()) {
54-
return;
71+
public void execute(Runnable r) {
72+
try (CloseableMonitor.Hold h = monitor.enter()) {
73+
if (isShutdown) {
74+
return;
75+
}
76+
tasks.add(
77+
() -> {
78+
try {
79+
if (shouldExecuteTask()) {
80+
r.run();
81+
}
82+
} finally {
83+
scheduleNextTask();
84+
}
85+
});
86+
if (!isTaskActive) {
87+
scheduleNextTask();
88+
}
5589
}
56-
tasks.add(
57-
() -> {
58-
if (isShutdown.get()) {
59-
return;
60-
}
61-
try {
62-
r.run();
63-
} finally {
64-
scheduleNextTask();
65-
}
66-
});
67-
if (!isTaskActive) {
68-
scheduleNextTask();
90+
}
91+
92+
private boolean shouldExecuteTask() {
93+
try (CloseableMonitor.Hold h = monitor.enter()) {
94+
return !isShutdown;
6995
}
7096
}
7197

72-
private synchronized void scheduleNextTask() {
73-
isTaskActive = !tasks.isEmpty();
74-
if (isTaskActive) {
75-
executor.execute(tasks.poll());
98+
private void scheduleNextTask() {
99+
try (CloseableMonitor.Hold h = monitor.enter()) {
100+
isTaskActive = !tasks.isEmpty() && !isShutdown;
101+
if (isTaskActive) {
102+
executor.execute(tasks.poll());
103+
}
76104
}
77105
}
78106
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,10 @@ public void triggerReinitialize(CheckedApiException streamError) {
191191
}
192192
if (ResetSignal.isResetSignal(streamError)) {
193193
try {
194+
// Flush pre-seek messages.
195+
messageDeliveryExecutor.waitUntilInactive();
194196
if (resetHandler.handleReset()) {
197+
// Wait for cursor commit.
195198
reset();
196199
}
197200
} catch (CheckedApiException e) {

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/SerialExecutorTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
package com.google.cloud.pubsublite.internal;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20-
import static java.util.concurrent.TimeUnit.SECONDS;
2120

2221
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2322
import java.util.ArrayList;
2423
import java.util.List;
25-
import java.util.concurrent.CountDownLatch;
2624
import org.junit.Test;
2725
import org.junit.runner.RunWith;
2826
import org.junit.runners.JUnit4;
@@ -35,18 +33,16 @@ public final class SerialExecutorTest {
3533
public void serializesTasks() throws Exception {
3634
final int numTasks = 100;
3735
List<Integer> receivedSequences = new ArrayList<>();
38-
CountDownLatch tasksDone = new CountDownLatch(numTasks);
3936
for (int i = 0; i < numTasks; i++) {
4037
int sequence = i;
4138
executor.execute(
4239
() -> {
4340
synchronized (receivedSequences) {
4441
receivedSequences.add(sequence);
4542
}
46-
tasksDone.countDown();
4743
});
4844
}
49-
assertThat(tasksDone.await(30, SECONDS)).isTrue();
45+
executor.waitUntilInactive();
5046

5147
for (int i = 0; i < receivedSequences.size(); i++) {
5248
assertThat(receivedSequences.get(i)).isEqualTo(i);
@@ -56,19 +52,17 @@ public void serializesTasks() throws Exception {
5652
@Test
5753
public void closeDiscardsTasks() throws Exception {
5854
List<Integer> receivedSequences = new ArrayList<>();
59-
CountDownLatch tasksDone = new CountDownLatch(1);
6055
for (int i = 0; i < 10; i++) {
6156
int sequence = i;
6257
executor.execute(
6358
() -> {
6459
synchronized (receivedSequences) {
6560
receivedSequences.add(sequence);
6661
}
67-
tasksDone.countDown();
6862
executor.close();
6963
});
7064
}
71-
assertThat(tasksDone.await(10, SECONDS)).isTrue();
65+
executor.waitUntilInactive();
7266

7367
assertThat(receivedSequences).containsExactly(0);
7468
}

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,6 @@ public void reinitialize_handlesSuccessfulReset() throws Exception {
354354
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
355355
CountDownLatch messagesReceived = countdownMessageBatches(1);
356356
leakedResponseObserver.onResponse(messages);
357-
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
358-
verify(mockMessageConsumer).accept(messages);
359357

360358
doAnswer(
361359
args -> {
@@ -369,6 +367,8 @@ public void reinitialize_handlesSuccessfulReset() throws Exception {
369367
// from the committed cursor upon reconnect.
370368
when(mockResetHandler.handleReset()).thenReturn(true);
371369
subscriber.triggerReinitialize(TestResetSignal.newCheckedException());
370+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
371+
verify(mockMessageConsumer).accept(messages); // Pre-seek messages always received.
372372
verify(mockSubscriberFactory, times(2)).New(any(), any(), eq(initialRequest()));
373373
verify(mockConnectedSubscriber2)
374374
.allowFlow(

0 commit comments

Comments
 (0)