@@ -62,20 +62,22 @@ var (
6262)
6363
6464type messageIterator struct {
65- ctx context.Context
66- cancel func () // the function that will cancel ctx; called in stop
67- po * pullOptions
68- ps * pullStream
69- subc * vkit.SubscriberClient
70- subName string
71- kaTick <- chan time.Time // keep-alive (deadline extensions)
72- ackTicker * time.Ticker // message acks
73- nackTicker * time.Ticker // message nacks
74- pingTicker * time.Ticker // sends to the stream to keep it open
75- failed chan struct {} // closed on stream error
76- drained chan struct {} // closed when stopped && no more pending messages
77- wg sync.WaitGroup
78-
65+ ctx context.Context
66+ cancel func () // the function that will cancel ctx; called in stop
67+ po * pullOptions
68+ ps * pullStream
69+ subc * vkit.SubscriberClient
70+ subName string
71+ kaTick <- chan time.Time // keep-alive (deadline extensions)
72+ ackTicker * time.Ticker // message acks
73+ nackTicker * time.Ticker // message nacks
74+ pingTicker * time.Ticker // sends to the stream to keep it open
75+ receiptTicker * time.Ticker // sends receipt modacks
76+ failed chan struct {} // closed on stream error
77+ drained chan struct {} // closed when stopped && no more pending messages
78+ wg sync.WaitGroup
79+
80+ // This mutex guards the structs related to lease extension.
7981 mu sync.Mutex
8082 ackTimeDist * distribution.D // dist uses seconds
8183
@@ -91,7 +93,9 @@ type messageIterator struct {
9193 // ack IDs whose ack deadline is to be modified
9294 // ModAcks don't have AckResults but allows reuse of the SendModAck function.
9395 pendingModAcks map [string ]* AckResult
94- err error // error from stream failure
96+ // ack IDs whose receipt need to be acknowledged with a modack.
97+ pendingReceipts map [string ]* AckResult
98+ err error // error from stream failure
9599
96100 eoMu sync.RWMutex
97101 enableExactlyOnceDelivery bool
@@ -127,6 +131,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
127131 ackTicker := time .NewTicker (100 * time .Millisecond )
128132 nackTicker := time .NewTicker (100 * time .Millisecond )
129133 pingTicker := time .NewTicker (30 * time .Second )
134+ receiptTicker := time .NewTicker (100 * time .Millisecond )
130135 cctx , cancel := context .WithCancel (context .Background ())
131136 cctx = withSubscriptionKey (cctx , subName )
132137 it := & messageIterator {
@@ -140,13 +145,15 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
140145 ackTicker : ackTicker ,
141146 nackTicker : nackTicker ,
142147 pingTicker : pingTicker ,
148+ receiptTicker : receiptTicker ,
143149 failed : make (chan struct {}),
144150 drained : make (chan struct {}),
145151 ackTimeDist : distribution .New (int (maxDurationPerLeaseExtension / time .Second ) + 1 ),
146152 keepAliveDeadlines : map [string ]time.Time {},
147153 pendingAcks : map [string ]* AckResult {},
148154 pendingNacks : map [string ]* AckResult {},
149155 pendingModAcks : map [string ]* AckResult {},
156+ pendingReceipts : map [string ]* AckResult {},
150157 }
151158 it .wg .Add (1 )
152159 go it .sender ()
@@ -307,11 +314,15 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
307314 it .mu .Unlock ()
308315
309316 if len (ackIDs ) > 0 {
310- // When exactly once delivery is not enabled, modacks are fire and forget.
311317 if ! exactlyOnceDelivery {
312- go func () {
313- it .sendModAck (ackIDs , deadline , false )
314- }()
318+ // When exactly once delivery is not enabled, modacks are fire and forget.
319+ // Add pending receipt modacks to queue to batch with other modacks.
320+ it .mu .Lock ()
321+ for id := range ackIDs {
322+ // Use a SuccessAckResult (dummy) since we don't propagate modacks back to the user.
323+ it .pendingReceipts [id ] = newSuccessAckResult ()
324+ }
325+ it .mu .Unlock ()
315326 return msgs , nil
316327 }
317328
@@ -402,6 +413,7 @@ func (it *messageIterator) sender() {
402413 defer it .ackTicker .Stop ()
403414 defer it .nackTicker .Stop ()
404415 defer it .pingTicker .Stop ()
416+ defer it .receiptTicker .Stop ()
405417 defer func () {
406418 if it .ps != nil {
407419 it .ps .CloseSend ()
@@ -414,6 +426,7 @@ func (it *messageIterator) sender() {
414426 sendNacks := false
415427 sendModAcks := false
416428 sendPing := false
429+ sendReceipt := false
417430
418431 dl := it .ackDeadline ()
419432
@@ -456,9 +469,12 @@ func (it *messageIterator) sender() {
456469 it .mu .Lock ()
457470 // Ping only if we are processing messages via streaming.
458471 sendPing = ! it .po .synchronous
472+ case <- it .receiptTicker .C :
473+ it .mu .Lock ()
474+ sendReceipt = (len (it .pendingReceipts ) > 0 )
459475 }
460476 // Lock is held here.
461- var acks , nacks , modAcks map [string ]* AckResult
477+ var acks , nacks , modAcks , receipts map [string ]* AckResult
462478 if sendAcks {
463479 acks = it .pendingAcks
464480 it .pendingAcks = map [string ]* AckResult {}
@@ -471,6 +487,10 @@ func (it *messageIterator) sender() {
471487 modAcks = it .pendingModAcks
472488 it .pendingModAcks = map [string ]* AckResult {}
473489 }
490+ if sendReceipt {
491+ receipts = it .pendingReceipts
492+ it .pendingReceipts = map [string ]* AckResult {}
493+ }
474494 it .mu .Unlock ()
475495 // Make Ack and ModAck RPCs.
476496 if sendAcks {
@@ -486,6 +506,9 @@ func (it *messageIterator) sender() {
486506 if sendPing {
487507 it .pingStream ()
488508 }
509+ if sendReceipt {
510+ it .sendModAck (receipts , dl , true )
511+ }
489512 }
490513}
491514
0 commit comments