Skip to content

Commit 3bc0675

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20647 b: refs/heads/autosynth-websecurityscanner c: 77fb0ca h: refs/heads/master i: 20645: fae09a8 20643: 199f230 20639: 76dda9d
1 parent af88cd4 commit 3bc0675

2 files changed

Lines changed: 42 additions & 40 deletions

File tree

  • branches/autosynth-websecurityscanner/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ refs/heads/autosynth-speech: 75d6c62a9d07d3a3642980502a25d07fbde0f232
143143
refs/heads/autosynth-tasks: 5d6a4f686ffef2080506d3b613fc7563a8bba1cb
144144
refs/heads/autosynth-texttospeech: 2dcc5dc22be0f456caa1b6a8a4bcdace2641239c
145145
refs/heads/autosynth-trace: 8804c46bfe147702ee9c95669f17f42d3790cf23
146-
refs/heads/autosynth-websecurityscanner: aa5f27188cc55ebb5773b6597c6c49c965a40add
146+
refs/heads/autosynth-websecurityscanner: 77fb0ca8e507c38b4b559836b5c55576ef50e0c9
147147
refs/heads/bigquerystorage: 06db74d123d7f8a3ef48755c2fcabed09faf8e64
148148
refs/heads/elharo-patch-1: ce159ef828d3c545991ff78e7b6e0d912a9453e9
149149
refs/heads/mrschmidt-numericadd: 96509abadf6190b7886d57fdd9b090da55f5171c

branches/autosynth-websecurityscanner/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -402,51 +402,53 @@ public void processOutstandingBatches() {
402402
batchCallback = nextBatch.doneCallback;
403403
}
404404
}
405-
406-
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
407-
final AckHandler ackHandler = outstandingMessage.ackHandler();
408-
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
409-
final AckReplyConsumer consumer =
410-
new AckReplyConsumer() {
411-
@Override
412-
public void ack() {
413-
response.set(AckReply.ACK);
414-
}
415-
416-
@Override
417-
public void nack() {
418-
response.set(AckReply.NACK);
419-
}
420-
};
421-
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
422-
executor.execute(
423-
new Runnable() {
424-
@Override
425-
public void run() {
426-
try {
427-
if (ackHandler
428-
.totalExpiration
429-
.plusSeconds(messageDeadlineSeconds.get())
430-
.isBefore(now())) {
431-
// Message expired while waiting. We don't extend these messages anymore,
432-
// so it was probably sent to someone else. Don't work on it.
433-
// Don't nack it either, because we'd be nacking someone else's message.
434-
ackHandler.forget();
435-
return;
436-
}
437-
438-
receiver.receiveMessage(message, consumer);
439-
} catch (Exception e) {
440-
response.setException(e);
441-
}
442-
}
443-
});
405+
processOutstandingMessage(
406+
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
444407
if (batchDone) {
445408
batchCallback.run();
446409
}
447410
}
448411
}
449412

413+
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
414+
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
415+
final AckReplyConsumer consumer =
416+
new AckReplyConsumer() {
417+
@Override
418+
public void ack() {
419+
response.set(AckReply.ACK);
420+
}
421+
422+
@Override
423+
public void nack() {
424+
response.set(AckReply.NACK);
425+
}
426+
};
427+
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
428+
executor.execute(
429+
new Runnable() {
430+
@Override
431+
public void run() {
432+
try {
433+
if (ackHandler
434+
.totalExpiration
435+
.plusSeconds(messageDeadlineSeconds.get())
436+
.isBefore(now())) {
437+
// Message expired while waiting. We don't extend these messages anymore,
438+
// so it was probably sent to someone else. Don't work on it.
439+
// Don't nack it either, because we'd be nacking someone else's message.
440+
ackHandler.forget();
441+
return;
442+
}
443+
444+
receiver.receiveMessage(message, consumer);
445+
} catch (Exception e) {
446+
response.setException(e);
447+
}
448+
}
449+
});
450+
}
451+
450452
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
451453
@InternalApi
452454
int computeDeadlineSeconds() {

0 commit comments

Comments
 (0)