Commit be1d07e
authored
[fix] Avoid redelivering duplicated messages when batching is enabled (apache#18486)
### Motivation
apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454.
The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.
### Modifications
Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454.
`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.
### TODO
The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: BewareMyPower#81 parent 177b96a commit be1d07e
File tree
4 files changed
+121
-21
lines changed- pulsar-broker/src/test/java/org/apache/pulsar/client
- api
- impl
- pulsar-client/src
- main/java/org/apache/pulsar/client/impl
- test/java/org/apache/pulsar/client/impl
4 files changed
+121
-21
lines changedLines changed: 69 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
30 | 30 | | |
31 | 31 | | |
32 | 32 | | |
| 33 | + | |
| 34 | + | |
33 | 35 | | |
34 | 36 | | |
35 | 37 | | |
| 38 | + | |
36 | 39 | | |
37 | 40 | | |
38 | 41 | | |
| |||
65 | 68 | | |
66 | 69 | | |
67 | 70 | | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
68 | 84 | | |
69 | 85 | | |
70 | 86 | | |
| |||
297 | 313 | | |
298 | 314 | | |
299 | 315 | | |
| 316 | + | |
| 317 | + | |
| 318 | + | |
| 319 | + | |
| 320 | + | |
| 321 | + | |
| 322 | + | |
| 323 | + | |
| 324 | + | |
| 325 | + | |
| 326 | + | |
| 327 | + | |
| 328 | + | |
| 329 | + | |
| 330 | + | |
| 331 | + | |
| 332 | + | |
| 333 | + | |
| 334 | + | |
| 335 | + | |
| 336 | + | |
| 337 | + | |
| 338 | + | |
| 339 | + | |
| 340 | + | |
| 341 | + | |
| 342 | + | |
| 343 | + | |
| 344 | + | |
| 345 | + | |
| 346 | + | |
| 347 | + | |
| 348 | + | |
| 349 | + | |
| 350 | + | |
| 351 | + | |
| 352 | + | |
| 353 | + | |
| 354 | + | |
| 355 | + | |
| 356 | + | |
| 357 | + | |
| 358 | + | |
| 359 | + | |
| 360 | + | |
| 361 | + | |
| 362 | + | |
| 363 | + | |
| 364 | + | |
| 365 | + | |
| 366 | + | |
| 367 | + | |
| 368 | + | |
300 | 369 | | |
Lines changed: 4 additions & 17 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
116 | 116 | | |
117 | 117 | | |
118 | 118 | | |
119 | | - | |
120 | | - | |
121 | | - | |
122 | | - | |
123 | | - | |
124 | | - | |
125 | | - | |
126 | | - | |
127 | | - | |
128 | | - | |
129 | | - | |
130 | | - | |
131 | | - | |
132 | | - | |
133 | | - | |
134 | | - | |
135 | | - | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
136 | 123 | | |
137 | 124 | | |
138 | 125 | | |
| |||
Lines changed: 28 additions & 4 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
113 | 113 | | |
114 | 114 | | |
115 | 115 | | |
116 | | - | |
117 | | - | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
118 | 121 | | |
119 | 122 | | |
120 | 123 | | |
121 | | - | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
122 | 128 | | |
123 | 129 | | |
124 | 130 | | |
| |||
622 | 628 | | |
623 | 629 | | |
624 | 630 | | |
625 | | - | |
| 631 | + | |
626 | 632 | | |
627 | 633 | | |
628 | 634 | | |
| |||
656 | 662 | | |
657 | 663 | | |
658 | 664 | | |
| 665 | + | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
| 670 | + | |
| 671 | + | |
| 672 | + | |
| 673 | + | |
| 674 | + | |
| 675 | + | |
| 676 | + | |
| 677 | + | |
| 678 | + | |
| 679 | + | |
| 680 | + | |
| 681 | + | |
| 682 | + | |
659 | 683 | | |
660 | 684 | | |
661 | 685 | | |
| |||
Lines changed: 20 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
83 | 83 | | |
84 | 84 | | |
85 | 85 | | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
86 | 106 | | |
0 commit comments