Skip to content

Commit 4959182

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19281 b: refs/heads/autosynth-bigquerystorage c: 77fb0ca h: refs/heads/master i: 19279: 89dc801
1 parent ae8ad2f commit 4959182

2 files changed

Lines changed: 42 additions & 40 deletions

File tree

  • branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
124124
refs/heads/autosynth-asset: cd8251de8c40e239ad24dcf9ed93ea2708a3eed5
125125
refs/heads/autosynth-automl: 2a8b018cf05811fd472e5d1053e67a12ceec0b64
126126
refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154246
127-
refs/heads/autosynth-bigquerystorage: aa5f27188cc55ebb5773b6597c6c49c965a40add
127+
refs/heads/autosynth-bigquerystorage: 77fb0ca8e507c38b4b559836b5c55576ef50e0c9
128128
refs/heads/autosynth-bigtable: 2fbcb15847e0e89e79d6dc07420e28d7dfcea894
129129
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
130130
refs/heads/autosynth-containeranalysis: 039ca5b8db725c76c16a965ff26b2774322b8ef8

branches/autosynth-bigquerystorage/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -402,51 +402,53 @@ public void processOutstandingBatches() {
402402
batchCallback = nextBatch.doneCallback;
403403
}
404404
}
405-
406-
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
407-
final AckHandler ackHandler = outstandingMessage.ackHandler();
408-
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
409-
final AckReplyConsumer consumer =
410-
new AckReplyConsumer() {
411-
@Override
412-
public void ack() {
413-
response.set(AckReply.ACK);
414-
}
415-
416-
@Override
417-
public void nack() {
418-
response.set(AckReply.NACK);
419-
}
420-
};
421-
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
422-
executor.execute(
423-
new Runnable() {
424-
@Override
425-
public void run() {
426-
try {
427-
if (ackHandler
428-
.totalExpiration
429-
.plusSeconds(messageDeadlineSeconds.get())
430-
.isBefore(now())) {
431-
// Message expired while waiting. We don't extend these messages anymore,
432-
// so it was probably sent to someone else. Don't work on it.
433-
// Don't nack it either, because we'd be nacking someone else's message.
434-
ackHandler.forget();
435-
return;
436-
}
437-
438-
receiver.receiveMessage(message, consumer);
439-
} catch (Exception e) {
440-
response.setException(e);
441-
}
442-
}
443-
});
405+
processOutstandingMessage(
406+
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
444407
if (batchDone) {
445408
batchCallback.run();
446409
}
447410
}
448411
}
449412

413+
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
414+
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
415+
final AckReplyConsumer consumer =
416+
new AckReplyConsumer() {
417+
@Override
418+
public void ack() {
419+
response.set(AckReply.ACK);
420+
}
421+
422+
@Override
423+
public void nack() {
424+
response.set(AckReply.NACK);
425+
}
426+
};
427+
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
428+
executor.execute(
429+
new Runnable() {
430+
@Override
431+
public void run() {
432+
try {
433+
if (ackHandler
434+
.totalExpiration
435+
.plusSeconds(messageDeadlineSeconds.get())
436+
.isBefore(now())) {
437+
// Message expired while waiting. We don't extend these messages anymore,
438+
// so it was probably sent to someone else. Don't work on it.
439+
// Don't nack it either, because we'd be nacking someone else's message.
440+
ackHandler.forget();
441+
return;
442+
}
443+
444+
receiver.receiveMessage(message, consumer);
445+
} catch (Exception e) {
446+
response.setException(e);
447+
}
448+
}
449+
});
450+
}
451+
450452
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
451453
@InternalApi
452454
int computeDeadlineSeconds() {

0 commit comments

Comments
 (0)