Skip to content

Commit e94436c

Browse files
authored
fix(pubsub/v2): return AckWithResult after NackImmediately shutdown mode (#13458)
This matches the [Java client behavior](https://github.com/michaelpri10/java-pubsub/blob/cac469e6aa7135e7b5806401eb141082822d205f/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java#L176) of returning AckWithResult and NackWithResult faster after the library nacks messages at shutdown (if exactly once delivery is enabled). The shared `sendAckWithFunc`, used by both `sendAck` and `sendModack` functions, handles both cases and thus only needs to be added in one place. However, at the time of writing, the function logic and length is getting rather unwieldy and hard to parse, so I plan on separating out these logics in the future for clarity.
1 parent a413732 commit e94436c

File tree

1 file changed

+16
-3
lines changed

1 file changed

+16
-3
lines changed

pubsub/v2/iterator.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"log"
2222
"strings"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
ipubsub "cloud.google.com/go/internal/pubsub"
@@ -118,6 +119,8 @@ type messageIterator struct {
118119
// Active ackIDs in this map should also exist 1:1 with ids in keepAliveDeadlines.
119120
// Elements are removed when messages are acked, nacked, or expired in iterator.handleKeepAlives()
120121
activeSpans sync.Map
122+
123+
nackImmediatelyShutdownInProgress atomic.Bool
121124
}
122125

123126
// newMessageIterator starts and returns a new messageIterator.
@@ -337,7 +340,7 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
337340
),
338341
)
339342
_, span := startSpan(ctx, subscribeSpanName, it.subID, opts...)
340-
// Always store the subscribe span, even if sampling isn't enabled.
343+
// Store the subscribe span even if sampling isn't enabled.
341344
// This is useful since we need to propagate the sampling flag
342345
// to the callback in Receive, so traces have an unbroken sampling decision.
343346
it.activeSpans.Store(ackID, span)
@@ -572,6 +575,13 @@ func (it *messageIterator) sendAckWithFunc(ctx context.Context, m map[string]*Ac
572575
batches := makeBatches(ackIDs, ackIDBatchSize)
573576
wg := sync.WaitGroup{}
574577

578+
if exactlyOnceDelivery && it.nackImmediatelyShutdownInProgress.Load() {
579+
for _, ar := range m {
580+
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("shutdown initiated, already nacked"))
581+
}
582+
return
583+
}
584+
575585
for _, batch := range batches {
576586
wg.Add(1)
577587
go func(toSend []string) {
@@ -658,7 +668,6 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
658668
}, it.retryAcks, func(ctx context.Context, toSend []string) {
659669
recordStat(it.ctx, AckCount, int64(len(toSend)))
660670
addAcks(toSend)
661-
662671
})
663672
}
664673

@@ -1028,8 +1037,12 @@ func (it *messageIterator) nackInventory(ctx context.Context) {
10281037

10291038
toNack := make(map[string]*ipubsub.AckResult)
10301039
for ackID := range it.keepAliveDeadlines {
1031-
// Use a dummy AckResult since we don't propagate nacks back to the user.
1040+
// Use a dummy AckResult here since this isn't being propagated to the user.
10321041
toNack[ackID] = newSuccessAckResult()
10331042
}
10341043
it.sendModAck(ctx, toNack, 0, false, false)
1044+
// Only mark this true after the entire inventory has been nacked.
1045+
// Otherwise, the above sendModAck function will be shortcircuited and
1046+
// no messages will be nacked.
1047+
it.nackImmediatelyShutdownInProgress.Store(true)
10351048
}

0 commit comments

Comments
 (0)