Skip to content

Commit 4c2cd10

Browse files
authored
feat(pubsub): batch receipt modacks (#10234)
* feat(pubsub): batch receipt modacks * add comment on why we use success ack result * update streaming pull retry test nack logic * remove extra gofunc wrapper
1 parent e4b2737 commit 4c2cd10

File tree

2 files changed

+47
-22
lines changed

2 files changed

+47
-22
lines changed

pubsub/iterator.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,22 @@ var (
6262
)
6363

6464
type messageIterator struct {
65-
ctx context.Context
66-
cancel func() // the function that will cancel ctx; called in stop
67-
po *pullOptions
68-
ps *pullStream
69-
subc *vkit.SubscriberClient
70-
subName string
71-
kaTick <-chan time.Time // keep-alive (deadline extensions)
72-
ackTicker *time.Ticker // message acks
73-
nackTicker *time.Ticker // message nacks
74-
pingTicker *time.Ticker // sends to the stream to keep it open
75-
failed chan struct{} // closed on stream error
76-
drained chan struct{} // closed when stopped && no more pending messages
77-
wg sync.WaitGroup
78-
65+
ctx context.Context
66+
cancel func() // the function that will cancel ctx; called in stop
67+
po *pullOptions
68+
ps *pullStream
69+
subc *vkit.SubscriberClient
70+
subName string
71+
kaTick <-chan time.Time // keep-alive (deadline extensions)
72+
ackTicker *time.Ticker // message acks
73+
nackTicker *time.Ticker // message nacks
74+
pingTicker *time.Ticker // sends to the stream to keep it open
75+
receiptTicker *time.Ticker // sends receipt modacks
76+
failed chan struct{} // closed on stream error
77+
drained chan struct{} // closed when stopped && no more pending messages
78+
wg sync.WaitGroup
79+
80+
// This mutex guards the structs related to lease extension.
7981
mu sync.Mutex
8082
ackTimeDist *distribution.D // dist uses seconds
8183

@@ -91,7 +93,9 @@ type messageIterator struct {
9193
// ack IDs whose ack deadline is to be modified
9294
// ModAcks don't have AckResults but allows reuse of the SendModAck function.
9395
pendingModAcks map[string]*AckResult
94-
err error // error from stream failure
96+
// ack IDs whose receipt need to be acknowledged with a modack.
97+
pendingReceipts map[string]*AckResult
98+
err error // error from stream failure
9599

96100
eoMu sync.RWMutex
97101
enableExactlyOnceDelivery bool
@@ -127,6 +131,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
127131
ackTicker := time.NewTicker(100 * time.Millisecond)
128132
nackTicker := time.NewTicker(100 * time.Millisecond)
129133
pingTicker := time.NewTicker(30 * time.Second)
134+
receiptTicker := time.NewTicker(100 * time.Millisecond)
130135
cctx, cancel := context.WithCancel(context.Background())
131136
cctx = withSubscriptionKey(cctx, subName)
132137
it := &messageIterator{
@@ -140,13 +145,15 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
140145
ackTicker: ackTicker,
141146
nackTicker: nackTicker,
142147
pingTicker: pingTicker,
148+
receiptTicker: receiptTicker,
143149
failed: make(chan struct{}),
144150
drained: make(chan struct{}),
145151
ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
146152
keepAliveDeadlines: map[string]time.Time{},
147153
pendingAcks: map[string]*AckResult{},
148154
pendingNacks: map[string]*AckResult{},
149155
pendingModAcks: map[string]*AckResult{},
156+
pendingReceipts: map[string]*AckResult{},
150157
}
151158
it.wg.Add(1)
152159
go it.sender()
@@ -307,11 +314,15 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
307314
it.mu.Unlock()
308315

309316
if len(ackIDs) > 0 {
310-
// When exactly once delivery is not enabled, modacks are fire and forget.
311317
if !exactlyOnceDelivery {
312-
go func() {
313-
it.sendModAck(ackIDs, deadline, false)
314-
}()
318+
// When exactly once delivery is not enabled, modacks are fire and forget.
319+
// Add pending receipt modacks to queue to batch with other modacks.
320+
it.mu.Lock()
321+
for id := range ackIDs {
322+
// Use a SuccessAckResult (dummy) since we don't propagate modacks back to the user.
323+
it.pendingReceipts[id] = newSuccessAckResult()
324+
}
325+
it.mu.Unlock()
315326
return msgs, nil
316327
}
317328

@@ -402,6 +413,7 @@ func (it *messageIterator) sender() {
402413
defer it.ackTicker.Stop()
403414
defer it.nackTicker.Stop()
404415
defer it.pingTicker.Stop()
416+
defer it.receiptTicker.Stop()
405417
defer func() {
406418
if it.ps != nil {
407419
it.ps.CloseSend()
@@ -414,6 +426,7 @@ func (it *messageIterator) sender() {
414426
sendNacks := false
415427
sendModAcks := false
416428
sendPing := false
429+
sendReceipt := false
417430

418431
dl := it.ackDeadline()
419432

@@ -456,9 +469,12 @@ func (it *messageIterator) sender() {
456469
it.mu.Lock()
457470
// Ping only if we are processing messages via streaming.
458471
sendPing = !it.po.synchronous
472+
case <-it.receiptTicker.C:
473+
it.mu.Lock()
474+
sendReceipt = (len(it.pendingReceipts) > 0)
459475
}
460476
// Lock is held here.
461-
var acks, nacks, modAcks map[string]*AckResult
477+
var acks, nacks, modAcks, receipts map[string]*AckResult
462478
if sendAcks {
463479
acks = it.pendingAcks
464480
it.pendingAcks = map[string]*AckResult{}
@@ -471,6 +487,10 @@ func (it *messageIterator) sender() {
471487
modAcks = it.pendingModAcks
472488
it.pendingModAcks = map[string]*AckResult{}
473489
}
490+
if sendReceipt {
491+
receipts = it.pendingReceipts
492+
it.pendingReceipts = map[string]*AckResult{}
493+
}
474494
it.mu.Unlock()
475495
// Make Ack and ModAck RPCs.
476496
if sendAcks {
@@ -486,6 +506,9 @@ func (it *messageIterator) sender() {
486506
if sendPing {
487507
it.pingStream()
488508
}
509+
if sendReceipt {
510+
it.sendModAck(receipts, dl, true)
511+
}
489512
}
490513
}
491514

pubsub/streaming_pull_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,15 +245,17 @@ func TestStreamingPullRetry(t *testing.T) {
245245
server.wait()
246246
for i := 0; i < len(testMessages); i++ {
247247
id := testMessages[i].AckId
248+
server.mu.Lock()
248249
if i%2 == 0 {
249250
if !server.Acked[id] {
250251
t.Errorf("msg %q should have been acked but wasn't", id)
251252
}
252253
} else {
253-
if dl, ok := server.Deadlines[id]; !ok || dl != 0 {
254-
t.Errorf("msg %q should have been nacked but wasn't", id)
254+
if server.Acked[id] {
255+
t.Errorf("msg %q should have not been acked", id)
255256
}
256257
}
258+
server.mu.Unlock()
257259
}
258260
}
259261

0 commit comments

Comments
 (0)