Skip to content

Commit 3ba49e9

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20191 b: refs/heads/autosynth-redis c: 77fb0ca h: refs/heads/master i: 20189: 14b694a 20187: da432bb 20183: 08e9828 20175: e5fc5f6 20159: 928c7b7
1 parent f26d04d commit 3ba49e9

2 files changed

Lines changed: 42 additions & 40 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ refs/heads/autosynth-iot: f03bdd338a9056ca3b7ea6d9ca901649ba9aab78
136136
refs/heads/autosynth-kms: 2828edfe3d2c53dd6e71912eae8a53c87bf40c87
137137
refs/heads/autosynth-language: c3d990dd34d81e7e935041e7147fb9dd27f8a557
138138
refs/heads/autosynth-os-login: 092fdbed6d5317948f92b708e9f50dedd89fc666
139-
refs/heads/autosynth-redis: aa5f27188cc55ebb5773b6597c6c49c965a40add
139+
refs/heads/autosynth-redis: 77fb0ca8e507c38b4b559836b5c55576ef50e0c9
140140
refs/heads/autosynth-scheduler: d97f8743ba965c7d5e492c8dc1f51d023104e260
141141
refs/heads/autosynth-spanner: 9bff86d057df31e04c76d72865e8e073ac5794fb
142142
refs/heads/autosynth-speech: 75d6c62a9d07d3a3642980502a25d07fbde0f232

branches/autosynth-redis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -402,51 +402,53 @@ public void processOutstandingBatches() {
402402
batchCallback = nextBatch.doneCallback;
403403
}
404404
}
405-
406-
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
407-
final AckHandler ackHandler = outstandingMessage.ackHandler();
408-
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
409-
final AckReplyConsumer consumer =
410-
new AckReplyConsumer() {
411-
@Override
412-
public void ack() {
413-
response.set(AckReply.ACK);
414-
}
415-
416-
@Override
417-
public void nack() {
418-
response.set(AckReply.NACK);
419-
}
420-
};
421-
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
422-
executor.execute(
423-
new Runnable() {
424-
@Override
425-
public void run() {
426-
try {
427-
if (ackHandler
428-
.totalExpiration
429-
.plusSeconds(messageDeadlineSeconds.get())
430-
.isBefore(now())) {
431-
// Message expired while waiting. We don't extend these messages anymore,
432-
// so it was probably sent to someone else. Don't work on it.
433-
// Don't nack it either, because we'd be nacking someone else's message.
434-
ackHandler.forget();
435-
return;
436-
}
437-
438-
receiver.receiveMessage(message, consumer);
439-
} catch (Exception e) {
440-
response.setException(e);
441-
}
442-
}
443-
});
405+
processOutstandingMessage(
406+
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
444407
if (batchDone) {
445408
batchCallback.run();
446409
}
447410
}
448411
}
449412

413+
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
414+
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
415+
final AckReplyConsumer consumer =
416+
new AckReplyConsumer() {
417+
@Override
418+
public void ack() {
419+
response.set(AckReply.ACK);
420+
}
421+
422+
@Override
423+
public void nack() {
424+
response.set(AckReply.NACK);
425+
}
426+
};
427+
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
428+
executor.execute(
429+
new Runnable() {
430+
@Override
431+
public void run() {
432+
try {
433+
if (ackHandler
434+
.totalExpiration
435+
.plusSeconds(messageDeadlineSeconds.get())
436+
.isBefore(now())) {
437+
// Message expired while waiting. We don't extend these messages anymore,
438+
// so it was probably sent to someone else. Don't work on it.
439+
// Don't nack it either, because we'd be nacking someone else's message.
440+
ackHandler.forget();
441+
return;
442+
}
443+
444+
receiver.receiveMessage(message, consumer);
445+
} catch (Exception e) {
446+
response.setException(e);
447+
}
448+
}
449+
});
450+
}
451+
450452
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
451453
@InternalApi
452454
int computeDeadlineSeconds() {

0 commit comments

Comments
 (0)