Skip to content

Commit baf155f

Browse files
authored
[ClientAPI]Fix hasMessageAvailable() (#6362)
Fixes #6333 Previously, `hasMoreMessages` is test against: ``` return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0 && incomingMessages.size() > 0; ``` However, the `incomingMessages` could be 0 when the consumer/reader has just started and hasn't received any messages yet. In this PR, the last entry is retrieved and decoded to get message metadata. for the batchIndex field population.
1 parent 333888a commit baf155f

File tree

7 files changed

+233
-41
lines changed

7 files changed

+233
-41
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@
4646
import javax.naming.AuthenticationException;
4747
import javax.net.ssl.SSLSession;
4848

49+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
50+
import org.apache.bookkeeper.mledger.Entry;
51+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
4952
import org.apache.bookkeeper.mledger.Position;
53+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5054
import org.apache.bookkeeper.mledger.impl.PositionImpl;
5155
import org.apache.bookkeeper.mledger.util.SafeRun;
5256
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +63,7 @@
5963
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
6064
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
6165
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
66+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
6267
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
6368
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
6469
import org.apache.pulsar.broker.web.RestException;
@@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
13961401
Topic topic = consumer.getSubscription().getTopic();
13971402
Position position = topic.getLastMessageId();
13981403
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
1399-
if (log.isDebugEnabled()) {
1400-
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
1401-
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
1402-
}
1403-
MessageIdData messageId = MessageIdData.newBuilder()
1404-
.setLedgerId(((PositionImpl)position).getLedgerId())
1405-
.setEntryId(((PositionImpl)position).getEntryId())
1406-
.setPartition(partitionIndex)
1407-
.build();
14081404

1409-
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
1405+
getLargestBatchIndexWhenPossible(
1406+
topic,
1407+
(PositionImpl) position,
1408+
partitionIndex,
1409+
requestId,
1410+
consumer.getSubscription().getName());
1411+
14101412
} else {
14111413
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
14121414
}
14131415
}
14141416

1417+
private void getLargestBatchIndexWhenPossible(
1418+
Topic topic,
1419+
PositionImpl position,
1420+
int partitionIndex,
1421+
long requestId,
1422+
String subscriptionName) {
1423+
1424+
PersistentTopic persistentTopic = (PersistentTopic) topic;
1425+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
1426+
1427+
// If it's not pointing to a valid entry, respond messageId of the current position.
1428+
if (position.getEntryId() == -1) {
1429+
MessageIdData messageId = MessageIdData.newBuilder()
1430+
.setLedgerId(position.getLedgerId())
1431+
.setEntryId(position.getEntryId())
1432+
.setPartition(partitionIndex).build();
1433+
1434+
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
1435+
}
1436+
1437+
// For a valid position, we read the entry out and parse the batch size from its metadata.
1438+
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
1439+
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
1440+
@Override
1441+
public void readEntryComplete(Entry entry, Object ctx) {
1442+
entryFuture.complete(entry);
1443+
}
1444+
1445+
@Override
1446+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
1447+
entryFuture.completeExceptionally(exception);
1448+
}
1449+
}, null);
1450+
1451+
CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
1452+
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
1453+
int batchSize = metadata.getNumMessagesInBatch();
1454+
entry.release();
1455+
return batchSize;
1456+
});
1457+
1458+
batchSizeFuture.whenComplete((batchSize, e) -> {
1459+
if (e != null) {
1460+
ctx.writeAndFlush(Commands.newError(
1461+
requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
1462+
} else {
1463+
int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;
1464+
1465+
if (log.isDebugEnabled()) {
1466+
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
1467+
topic.getName(), subscriptionName, position, partitionIndex);
1468+
}
1469+
1470+
MessageIdData messageId = MessageIdData.newBuilder()
1471+
.setLedgerId(position.getLedgerId())
1472+
.setEntryId(position.getEntryId())
1473+
.setPartition(partitionIndex)
1474+
.setBatchIndex(largestBatchIndex).build();
1475+
1476+
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
1477+
}
1478+
});
1479+
}
1480+
14151481
@Override
14161482
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
14171483
final long requestId = commandGetTopicsOfNamespace.getRequestId();

pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.pulsar.client.api.PulsarClient;
4444
import org.apache.pulsar.client.api.RawMessage;
4545
import org.apache.pulsar.client.api.RawReader;
46+
import org.apache.pulsar.client.impl.MessageIdImpl;
4647
import org.apache.pulsar.client.impl.RawBatchConverter;
4748
import org.apache.pulsar.common.protocol.Commands;
4849
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -95,7 +96,10 @@ private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
9596
} else {
9697
log.info("Commencing phase one of compaction for {}, reading to {}",
9798
reader.getTopic(), lastMessageId);
98-
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
99+
// Each entry is processed as a whole, discard the batchIndex part deliberately.
100+
MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
101+
MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex());
102+
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey,
99103
loopPromise);
100104
}
101105
});

pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ public static Object[][] variationsForResetOnLatestMsg() {
9191
};
9292
}
9393

94+
@DataProvider
95+
public static Object[][] variationsForHasMessageAvailable() {
96+
return new Object[][] {
97+
// batching / start-inclusive
98+
{true, true},
99+
{true, false},
100+
{false, true},
101+
{false, false},
102+
};
103+
}
104+
94105
@Test
95106
public void testSimpleReader() throws Exception {
96107
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
@@ -531,6 +542,68 @@ public void testMessageAvailableAfterRestart() throws Exception {
531542

532543
}
533544

545+
@Test(dataProvider = "variationsForHasMessageAvailable")
546+
public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
547+
final String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
548+
final int numOfMessage = 100;
549+
550+
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
551+
.topic(topicName);
552+
553+
if (enableBatch) {
554+
producerBuilder
555+
.enableBatching(true)
556+
.batchingMaxMessages(10);
557+
} else {
558+
producerBuilder
559+
.enableBatching(false);
560+
}
561+
562+
Producer<byte[]> producer = producerBuilder.create();
563+
564+
CountDownLatch latch = new CountDownLatch(numOfMessage);
565+
566+
List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
567+
568+
for (int i = 0; i < numOfMessage; i++) {
569+
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
570+
if (e != null) {
571+
Assert.fail();
572+
} else {
573+
allIds.add(mid);
574+
}
575+
latch.countDown();
576+
});
577+
}
578+
579+
latch.await();
580+
581+
allIds.sort(null); // make sure the largest mid appears at last.
582+
583+
for (MessageId id : allIds) {
584+
Reader<byte[]> reader;
585+
586+
if (startInclusive) {
587+
reader = pulsarClient.newReader().topic(topicName)
588+
.startMessageId(id).startMessageIdInclusive().create();
589+
} else {
590+
reader = pulsarClient.newReader().topic(topicName)
591+
.startMessageId(id).create();
592+
}
593+
594+
if (startInclusive) {
595+
assertTrue(reader.hasMessageAvailable());
596+
} else if (id != allIds.get(allIds.size() - 1)) {
597+
assertTrue(reader.hasMessageAvailable());
598+
} else {
599+
assertFalse(reader.hasMessageAvailable());
600+
}
601+
reader.close();
602+
}
603+
604+
producer.close();
605+
}
606+
534607
@Test
535608
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
536609
final int numOfMessage = 10;
@@ -794,7 +867,7 @@ public void testReaderStartInMiddleOfBatch() throws Exception {
794867
.batchingMaxMessages(10)
795868
.create();
796869

797-
CountDownLatch latch = new CountDownLatch(100);
870+
CountDownLatch latch = new CountDownLatch(numOfMessage);
798871

799872
List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
800873

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
145145
while (reader.hasMessageAvailable()) {
146146
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
147147
}
148-
Assert.assertTrue(keys.isEmpty());
148+
// start from latest with start message inclusive should only read the last message in batch
149+
Assert.assertTrue(keys.size() == 9);
150+
Assert.assertFalse(keys.contains("key9"));
149151
Assert.assertFalse(reader.hasMessageAvailable());
150152
}
151153

0 commit comments

Comments
 (0)