Skip to content

Commit 68ce84d

Browse files
pongadgarrettjonesgoogle
authored andcommitted
pubsub: start with polling by default (#1625)
* pubsub: start with polling by default Previously, Subscribers start with streaming pull and fall back to polling pull if streaming is unavailable. To prevent fallback or streaming from causing any problem in production, this commit starts Subscribers with polling pull directly. Streaming pull will be re-enabled once the endpoint is working. Since the ultimate fate of fallback is up in the air, I chose to comment out the start-with-streaming code instead of deleting. This change also smoked out a logic bug that causes a race condition. A message can be "nacked" either by calling AckReplyConsumer::accept with an explicit NACK or a throwable signally an error. The explicit NACK case works properly. In case of a throwable, MessageDispatcher did not set the "acked" flag. (Acks and nacks share most of the same code path; they might as well use the same flag.) This causes two things to happen concurrently. 1. Since the message is being nacked, it is added to the nack list to be reported to the pubsub service. 2. Pubsub client automatically extends deadlines of the messages the client's user is processing. Since acked flag is not set, the client also tries to extend the message's deadline. If (1) happens first, the test code sees that the message is being nacked and the test passes even though the client will later incorrectly extend the message's deadline. If (2) happens first, the test code sees the incorrect deadline extension and fails. The fix is simple: set the acked flag. * pubsub: fix testModifyAckDeadline flaking, maybe Previously MessageDispatcher starts off with deadline duration of 0. Since we need to extend deadline before the time is up, the client extends deadline a little before the deadline. This translates to scheduling deadling extension "in the past". In turn, this causes tasks to be run -- and new tasks scheduled -- without explicit calls to advance the time on the fake clock. So, when the test code actually advance the fake clock, it runs more tasks than it wants to. This PR conservatively sets the initial deadline duration to 10 seconds. This is already the default in streaming version.
1 parent 093631b commit 68ce84d

5 files changed

Lines changed: 15 additions & 10 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ public void onFailure(Throwable t) {
169169
Level.WARNING,
170170
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
171171
t);
172+
acked.getAndSet(true);
172173
synchronized (pendingNacks) {
173174
pendingNacks.add(ackId);
174175
}
@@ -494,7 +495,6 @@ private void processOutstandingAckOperations(
494495
modifyAckDeadlinesToSend.add(nacksToSend);
495496
}
496497
}
497-
498498
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
499499
}
500500
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public PollingSubscriberConnection(
9090
flowController,
9191
executor,
9292
clock);
93+
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
9394
}
9495

9596
@Override

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class Subscriber {
113113
20 * 1024 * 1024; // 20MB API maximum message size.
114114
private static final int INITIAL_ACK_DEADLINE_SECONDS = 10;
115115
private static final int MAX_ACK_DEADLINE_SECONDS = 600;
116-
private static final int MIN_ACK_DEADLINE_SECONDS = 10;
116+
static final int MIN_ACK_DEADLINE_SECONDS = 10;
117117
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1);
118118
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
119119

@@ -333,7 +333,9 @@ public void close() throws IOException {
333333
@Override
334334
protected void doStart() {
335335
logger.log(Level.INFO, "Starting subscriber group.");
336-
startStreamingConnections();
336+
// Streaming pull is not enabled on the service yet.
337+
// startStreamingConnections();
338+
startPollingConnections();
337339
notifyStarted();
338340
}
339341

@@ -422,7 +424,9 @@ public void run() {
422424

423425
private void stopAllStreamingConnections() {
424426
stopConnections(streamingSubscriberConnections);
425-
ackDeadlineUpdater.cancel(true);
427+
if (ackDeadlineUpdater != null) {
428+
ackDeadlineUpdater.cancel(true);
429+
}
426430
}
427431

428432
private void startPollingConnections() {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
class FakeSubscriberServiceImpl extends SubscriberImplBase {
5050
private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false);
5151
private String subscription = "";
52-
private final AtomicInteger messageAckDeadline = new AtomicInteger();
52+
private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
5353
private final List<Stream> openedStreams = new ArrayList<>();
5454
private final List<Stream> closedStreams = new ArrayList<>();
5555
private final List<String> acks = new ArrayList<>();

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class SubscriberImplTest {
6868

6969
@Parameters
7070
public static Collection<Object[]> data() {
71-
return Arrays.asList(new Object[][] {{true}, {false}});
71+
return Arrays.asList(new Object[][] {{false}});
7272
}
7373

7474
private final boolean isStreamingTest;
@@ -436,10 +436,10 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
436436
private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
437437
Subscriber subscriber = testSubscriberBuilder.build();
438438
subscriber.startAsync().awaitRunning();
439-
if (!isStreamingTest) {
440-
// Shutdown streaming
441-
fakeSubscriberServiceImpl.sendError(new StatusException(Status.UNIMPLEMENTED));
442-
}
439+
// if (!isStreamingTest) {
440+
// // Shutdown streaming
441+
// fakeSubscriberServiceImpl.sendError(new StatusException(Status.UNIMPLEMENTED));
442+
// }
443443
return subscriber;
444444
}
445445

0 commit comments

Comments
 (0)