Skip to content

Commit 7712aa3

Browse files
authored
[fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. (#18454)
1 parent 034e6a4 commit 7712aa3

File tree

4 files changed

+105
-8
lines changed

4 files changed

+105
-8
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,23 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
116116
}
117117
producer.flush();
118118

119-
for (int i = 0; i < 30; i++) {
120-
Message<String> msg = consumer.receive();
121-
assertEquals(msg.getValue(), "new-message-" + i);
122-
consumer.acknowledge(msg);
119+
if (batchingEnabled) {
120+
for (int i = 0; i < 30; i++) {
121+
Message<String> msg = consumer.receive();
122+
assertEquals(msg.getValue(), "hello-" + i);
123+
consumer.acknowledge(msg);
124+
}
125+
for (int i = 0; i < 30; i++) {
126+
Message<String> msg = consumer.receive();
127+
assertEquals(msg.getValue(), "new-message-" + i);
128+
consumer.acknowledge(msg);
129+
}
130+
} else {
131+
for (int i = 0; i < 30; i++) {
132+
Message<String> msg = consumer.receive();
133+
assertEquals(msg.getValue(), "new-message-" + i);
134+
consumer.acknowledge(msg);
135+
}
123136
}
124137
}
125138

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.HashSet;
2626
import java.util.Set;
27+
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
2829
import lombok.Cleanup;
2930
import lombok.extern.slf4j.Slf4j;
@@ -35,6 +36,7 @@
3536
import org.apache.pulsar.client.api.ProducerConsumerBase;
3637
import org.apache.pulsar.client.api.Schema;
3738
import org.apache.pulsar.client.api.SubscriptionType;
39+
import org.testng.Assert;
3840
import org.testng.annotations.AfterClass;
3941
import org.testng.annotations.BeforeClass;
4042
import org.testng.annotations.DataProvider;
@@ -362,4 +364,79 @@ public void testNegativeAcksWithBatchAckEnabled() throws Exception {
362364
// There should be no more messages
363365
assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
364366
}
367+
368+
@Test
369+
public void testFailoverConsumerBatchCumulateAck() throws Exception {
370+
final String topic = BrokerTestUtil.newUniqueName("my-topic");
371+
admin.topics().createPartitionedTopic(topic, 2);
372+
373+
@Cleanup
374+
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
375+
.topic(topic)
376+
.subscriptionName("sub")
377+
.subscriptionType(SubscriptionType.Failover)
378+
.enableBatchIndexAcknowledgment(true)
379+
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
380+
.receiverQueueSize(10)
381+
.subscribe();
382+
383+
@Cleanup
384+
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
385+
.topic(topic)
386+
.batchingMaxMessages(10)
387+
.batchingMaxPublishDelay(3, TimeUnit.SECONDS)
388+
.blockIfQueueFull(true)
389+
.create();
390+
391+
int count = 0;
392+
Set<Integer> datas = new HashSet<>();
393+
CountDownLatch producerLatch = new CountDownLatch(10);
394+
while (count < 10) {
395+
datas.add(count);
396+
producer.sendAsync(count).whenComplete((m, e) -> {
397+
producerLatch.countDown();
398+
});
399+
count++;
400+
}
401+
producerLatch.await();
402+
CountDownLatch consumerLatch = new CountDownLatch(1);
403+
new Thread(new Runnable() {
404+
@Override
405+
public void run() {
406+
consumer.receiveAsync()
407+
.thenCompose(m -> {
408+
log.info("received one msg : {}", m.getMessageId());
409+
datas.remove(m.getValue());
410+
return consumer.acknowledgeCumulativeAsync(m);
411+
})
412+
.thenAccept(ignore -> {
413+
try {
414+
Thread.sleep(500);
415+
consumer.redeliverUnacknowledgedMessages();
416+
} catch (Exception e) {
417+
throw new RuntimeException(e);
418+
}
419+
})
420+
.whenComplete((r, e) -> {
421+
consumerLatch.countDown();
422+
});
423+
}
424+
}).start();
425+
consumerLatch.await();
426+
Thread.sleep(500);
427+
count = 0;
428+
while(true) {
429+
Message<Integer> msg = consumer.receive(5, TimeUnit.SECONDS);
430+
if (msg == null) {
431+
break;
432+
}
433+
consumer.acknowledgeCumulative(msg);
434+
Thread.sleep(200);
435+
datas.remove(msg.getValue());
436+
log.info("received msg : {}", msg.getMessageId());
437+
count++;
438+
}
439+
Assert.assertEquals(count, 9);
440+
Assert.assertEquals(0, datas.size());
441+
}
365442
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,9 +1275,9 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
12751275
final int numMessages = msgMetadata.getNumMessagesInBatch();
12761276
final int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0;
12771277
final boolean isChunkedMessage = numChunks > 1;
1278-
12791278
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
1280-
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
1279+
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
1280+
&& acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
12811281
if (log.isDebugEnabled()) {
12821282
log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
12831283
topic, subscription, consumerName, msgId);
@@ -1541,7 +1541,10 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
15411541
skippedMessages++;
15421542
continue;
15431543
}
1544-
1544+
}
1545+
if (acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
1546+
skippedMessages++;
1547+
continue;
15451548
}
15461549
executeNotifyCallback(message);
15471550
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,11 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
261261
return;
262262
}
263263
// Process the message, add to the queue and trigger listener or async callback
264-
messages.forEach(msg -> messageReceived(consumer, msg));
264+
messages.forEach(msg -> {
265+
if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
266+
messageReceived(consumer, msg);
267+
}
268+
});
265269

266270
int size = incomingMessages.size();
267271
int maxReceiverQueueSize = getCurrentReceiverQueueSize();

0 commit comments

Comments
 (0)