Skip to content

Commit b5d21e8

Browse files
pongadgarrettjonesgoogle
authored andcommitted
---
yaml --- r: 7837 b: refs/heads/tswast-patch-1 c: 68ce84d h: refs/heads/master i: 7835: f9c213c
1 parent fa18e6d commit b5d21e8

6 files changed

Lines changed: 16 additions & 11 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: 093631b41ba0b6e7702c4d89991e5ac31b291caa
60+
refs/heads/tswast-patch-1: 68ce84d313b7745b1dc37be300522ea1c051063b
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ public void onFailure(Throwable t) {
169169
Level.WARNING,
170170
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
171171
t);
172+
acked.getAndSet(true);
172173
synchronized (pendingNacks) {
173174
pendingNacks.add(ackId);
174175
}
@@ -494,7 +495,6 @@ private void processOutstandingAckOperations(
494495
modifyAckDeadlinesToSend.add(nacksToSend);
495496
}
496497
}
497-
498498
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
499499
}
500500
}

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public PollingSubscriberConnection(
9090
flowController,
9191
executor,
9292
clock);
93+
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
9394
}
9495

9596
@Override

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class Subscriber {
113113
20 * 1024 * 1024; // 20MB API maximum message size.
114114
private static final int INITIAL_ACK_DEADLINE_SECONDS = 10;
115115
private static final int MAX_ACK_DEADLINE_SECONDS = 600;
116-
private static final int MIN_ACK_DEADLINE_SECONDS = 10;
116+
static final int MIN_ACK_DEADLINE_SECONDS = 10;
117117
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1);
118118
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
119119

@@ -333,7 +333,9 @@ public void close() throws IOException {
333333
@Override
334334
protected void doStart() {
335335
logger.log(Level.INFO, "Starting subscriber group.");
336-
startStreamingConnections();
336+
// Streaming pull is not enabled on the service yet.
337+
// startStreamingConnections();
338+
startPollingConnections();
337339
notifyStarted();
338340
}
339341

@@ -422,7 +424,9 @@ public void run() {
422424

423425
private void stopAllStreamingConnections() {
424426
stopConnections(streamingSubscriberConnections);
425-
ackDeadlineUpdater.cancel(true);
427+
if (ackDeadlineUpdater != null) {
428+
ackDeadlineUpdater.cancel(true);
429+
}
426430
}
427431

428432
private void startPollingConnections() {

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
class FakeSubscriberServiceImpl extends SubscriberImplBase {
5050
private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false);
5151
private String subscription = "";
52-
private final AtomicInteger messageAckDeadline = new AtomicInteger();
52+
private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
5353
private final List<Stream> openedStreams = new ArrayList<>();
5454
private final List<Stream> closedStreams = new ArrayList<>();
5555
private final List<String> acks = new ArrayList<>();

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class SubscriberImplTest {
6868

6969
@Parameters
7070
public static Collection<Object[]> data() {
71-
return Arrays.asList(new Object[][] {{true}, {false}});
71+
return Arrays.asList(new Object[][] {{false}});
7272
}
7373

7474
private final boolean isStreamingTest;
@@ -436,10 +436,10 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
436436
private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
437437
Subscriber subscriber = testSubscriberBuilder.build();
438438
subscriber.startAsync().awaitRunning();
439-
if (!isStreamingTest) {
440-
// Shutdown streaming
441-
fakeSubscriberServiceImpl.sendError(new StatusException(Status.UNIMPLEMENTED));
442-
}
439+
// if (!isStreamingTest) {
440+
// // Shutdown streaming
441+
// fakeSubscriberServiceImpl.sendError(new StatusException(Status.UNIMPLEMENTED));
442+
// }
443443
return subscriber;
444444
}
445445

0 commit comments

Comments
 (0)