Skip to content

Commit be1d07e

Browse files
[fix] Avoid redelivering duplicated messages when batching is enabled (apache#18486)
### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
1 parent 177b96a commit be1d07e

File tree

4 files changed

+121
-21
lines changed

4 files changed

+121
-21
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.stream.Collectors;
34+
import java.util.stream.IntStream;
3335
import lombok.Cleanup;
3436
import org.apache.pulsar.client.impl.ConsumerImpl;
3537
import org.apache.pulsar.client.impl.MessageIdImpl;
38+
import org.apache.pulsar.common.api.proto.CommandAck;
3639
import org.apache.pulsar.common.util.FutureUtil;
3740
import org.slf4j.Logger;
3841
import org.slf4j.LoggerFactory;
@@ -65,6 +68,19 @@ public Object[][] ackReceiptEnabled() {
6568
return new Object[][] { { true }, { false } };
6669
}
6770

71+
@DataProvider(name = "batchedMessageAck")
72+
public Object[][] batchedMessageAck() {
73+
// When batch index ack is disabled (by default), only after all single messages were sent would the pending
74+
// ACK be added into the ACK tracker.
75+
return new Object[][] {
76+
// numAcked, batchSize, ack type
77+
{ 3, 5, CommandAck.AckType.Individual },
78+
{ 5, 5, CommandAck.AckType.Individual },
79+
{ 3, 5, CommandAck.AckType.Cumulative },
80+
{ 5, 5, CommandAck.AckType.Cumulative }
81+
};
82+
}
83+
6884
/**
6985
* It verifies that redelivered messages are sorted based on the ledger-ids.
7086
* <pre>
@@ -297,4 +313,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
297313
consumer.close();
298314
producer.close();
299315
}
316+
317+
@Test(timeOut = 30000, dataProvider = "batchedMessageAck")
318+
public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
319+
String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
320+
+ numAcked + "-" + batchSize + "-" + ackType.getValue();
321+
@Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
322+
.topic(topic)
323+
.subscriptionName("sub")
324+
.enableBatchIndexAcknowledgment(false)
325+
.acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
326+
.subscribe();
327+
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
328+
.topic(topic)
329+
.enableBatching(true)
330+
.batchingMaxMessages(batchSize)
331+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
332+
.create();
333+
for (int i = 0; i < batchSize; i++) {
334+
String value = "msg-" + i;
335+
producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
336+
}
337+
List<Message<String>> messages = new ArrayList<>();
338+
for (int i = 0; i < batchSize; i++) {
339+
messages.add(consumer.receive());
340+
}
341+
if (ackType == CommandAck.AckType.Individual) {
342+
for (int i = 0; i < numAcked; i++) {
343+
consumer.acknowledge(messages.get(i));
344+
}
345+
} else {
346+
consumer.acknowledgeCumulative(messages.get(numAcked - 1));
347+
}
348+
349+
consumer.redeliverUnacknowledgedMessages();
350+
351+
messages.clear();
352+
for (int i = 0; i < batchSize; i++) {
353+
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
354+
if (msg == null) {
355+
break;
356+
}
357+
log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
358+
messages.add(msg);
359+
}
360+
List<String> values = messages.stream().map(Message::getValue).collect(Collectors.toList());
361+
// All messages are redelivered because only if the whole batch are acknowledged would the message ID be
362+
// added into the ACK tracker.
363+
if (numAcked < batchSize) {
364+
assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
365+
} else {
366+
assertTrue(values.isEmpty());
367+
}
368+
}
300369
}

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -116,23 +116,10 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
116116
}
117117
producer.flush();
118118

119-
if (batchingEnabled) {
120-
for (int i = 0; i < 30; i++) {
121-
Message<String> msg = consumer.receive();
122-
assertEquals(msg.getValue(), "hello-" + i);
123-
consumer.acknowledge(msg);
124-
}
125-
for (int i = 0; i < 30; i++) {
126-
Message<String> msg = consumer.receive();
127-
assertEquals(msg.getValue(), "new-message-" + i);
128-
consumer.acknowledge(msg);
129-
}
130-
} else {
131-
for (int i = 0; i < 30; i++) {
132-
Message<String> msg = consumer.receive();
133-
assertEquals(msg.getValue(), "new-message-" + i);
134-
consumer.acknowledge(msg);
135-
}
119+
for (int i = 0; i < 30; i++) {
120+
Message<String> msg = consumer.receive();
121+
assertEquals(msg.getValue(), "new-message-" + i);
122+
consumer.acknowledge(msg);
136123
}
137124
}
138125

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
113113
*/
114114
@Override
115115
public boolean isDuplicate(MessageId messageId) {
116-
final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
117-
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
116+
if (!(messageId instanceof MessageIdImpl)) {
117+
throw new IllegalArgumentException("isDuplicated cannot accept "
118+
+ messageId.getClass().getName() + ": " + messageId);
119+
}
120+
if (lastCumulativeAck.compareTo(messageId) >= 0) {
118121
// Already included in a cumulative ack
119122
return true;
120123
} else {
121-
return pendingIndividualAcks.contains((MessageIdImpl) messageId);
124+
final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
125+
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
126+
: (MessageIdImpl) messageId;
127+
return pendingIndividualAcks.contains(messageIdImpl);
122128
}
123129
}
124130

@@ -622,7 +628,7 @@ protected LastCumulativeAck initialValue() {
622628
private boolean flushRequired = false;
623629

624630
public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
625-
if (messageId.compareTo(this.messageId) > 0) {
631+
if (compareTo(messageId) < 0) {
626632
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
627633
this.bitSetRecyclable.recycle();
628634
}
@@ -656,6 +662,24 @@ public synchronized void reset() {
656662
flushRequired = false;
657663
}
658664

665+
public synchronized int compareTo(MessageId messageId) {
666+
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
667+
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
668+
final MessageIdImpl rhs = (MessageIdImpl) messageId;
669+
return MessageIdImpl.messageIdCompare(
670+
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
671+
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
672+
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
673+
final MessageIdImpl lhs = this.messageId;
674+
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
675+
return MessageIdImpl.messageIdCompare(
676+
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
677+
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
678+
} else {
679+
return this.messageId.compareTo(messageId);
680+
}
681+
}
682+
659683
private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
660684
this.messageId = messageId;
661685
this.bitSetRecyclable = bitSetRecyclable;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,24 @@ public void testFlush() {
8383
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
8484
}
8585

86+
@Test
87+
public void testCompareTo() {
88+
LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
89+
lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);
90+
91+
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
92+
assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
93+
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
94+
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
95+
96+
lastCumulativeAck = new LastCumulativeAck();
97+
lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);
98+
99+
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
100+
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
101+
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
102+
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
103+
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
104+
assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
105+
}
86106
}

0 commit comments

Comments
 (0)