Skip to content

Commit 48da731

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 18399 b: refs/heads/autosynth-logging c: 77fb0ca h: refs/heads/master i: 18397: 1a0eb3c 18395: 8d65c01 18391: 7f7f3c0 18383: a96acd3 18367: 048db38
1 parent 2dfbb68 commit 48da731

2 files changed

Lines changed: 42 additions & 40 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ refs/tags/v0.61.0: e4b526656bb1bf5eefd0ee578b7405147821225e
105105
refs/tags/v0.62.0: bbede7385d48ba08f487bdd29ec10668ace96396
106106
refs/heads/0.60.0-alpha: 10939381ffe0b8da32db4fe3087c86e3aa7f3e55
107107
refs/heads/autosynth-dlp: 18c18252802d2b3c5c5ff95f81c5eceb07234772
108-
refs/heads/autosynth-logging: aa5f27188cc55ebb5773b6597c6c49c965a40add
108+
refs/heads/autosynth-logging: 77fb0ca8e507c38b4b559836b5c55576ef50e0c9
109109
refs/heads/dupes: 3478c5d81fd242d0e985656645a679420a2060c2
110110
refs/tags/v0.63.0: 94f19b71d40f46b36120e7b9d78a1a3d41bfcbd6
111111
refs/tags/v0.64.0: 456e8fbd129deced3ca025f239a2d8a82bde1d0a

branches/autosynth-logging/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -402,51 +402,53 @@ public void processOutstandingBatches() {
402402
batchCallback = nextBatch.doneCallback;
403403
}
404404
}
405-
406-
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
407-
final AckHandler ackHandler = outstandingMessage.ackHandler();
408-
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
409-
final AckReplyConsumer consumer =
410-
new AckReplyConsumer() {
411-
@Override
412-
public void ack() {
413-
response.set(AckReply.ACK);
414-
}
415-
416-
@Override
417-
public void nack() {
418-
response.set(AckReply.NACK);
419-
}
420-
};
421-
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
422-
executor.execute(
423-
new Runnable() {
424-
@Override
425-
public void run() {
426-
try {
427-
if (ackHandler
428-
.totalExpiration
429-
.plusSeconds(messageDeadlineSeconds.get())
430-
.isBefore(now())) {
431-
// Message expired while waiting. We don't extend these messages anymore,
432-
// so it was probably sent to someone else. Don't work on it.
433-
// Don't nack it either, because we'd be nacking someone else's message.
434-
ackHandler.forget();
435-
return;
436-
}
437-
438-
receiver.receiveMessage(message, consumer);
439-
} catch (Exception e) {
440-
response.setException(e);
441-
}
442-
}
443-
});
405+
processOutstandingMessage(
406+
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
444407
if (batchDone) {
445408
batchCallback.run();
446409
}
447410
}
448411
}
449412

413+
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
414+
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
415+
final AckReplyConsumer consumer =
416+
new AckReplyConsumer() {
417+
@Override
418+
public void ack() {
419+
response.set(AckReply.ACK);
420+
}
421+
422+
@Override
423+
public void nack() {
424+
response.set(AckReply.NACK);
425+
}
426+
};
427+
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
428+
executor.execute(
429+
new Runnable() {
430+
@Override
431+
public void run() {
432+
try {
433+
if (ackHandler
434+
.totalExpiration
435+
.plusSeconds(messageDeadlineSeconds.get())
436+
.isBefore(now())) {
437+
// Message expired while waiting. We don't extend these messages anymore,
438+
// so it was probably sent to someone else. Don't work on it.
439+
// Don't nack it either, because we'd be nacking someone else's message.
440+
ackHandler.forget();
441+
return;
442+
}
443+
444+
receiver.receiveMessage(message, consumer);
445+
} catch (Exception e) {
446+
response.setException(e);
447+
}
448+
}
449+
});
450+
}
451+
450452
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
451453
@InternalApi
452454
int computeDeadlineSeconds() {

0 commit comments

Comments
 (0)