Skip to content

Commit c48ab86

Browse files
committed
Enhance compareTo
1 parent fc4de9c commit c48ab86

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,18 @@ public synchronized void reset() {
663663
}
664664

665665
public synchronized int compareTo(MessageId messageId) {
666-
// The position of a message in the batch (BatchMessageIdImpl) must precede the batch itself (MessageIdImpl)
667666
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
668-
int result = ((BatchMessageIdImpl) this.messageId).toMessageIdImpl().compareTo(messageId);
669-
return (result == 0) ? -1 : result;
667+
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
668+
final MessageIdImpl rhs = (MessageIdImpl) messageId;
669+
return MessageIdImpl.messageIdCompare(
670+
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
671+
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
670672
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
671-
int result = this.messageId.compareTo(((BatchMessageIdImpl) messageId).toMessageIdImpl());
672-
return (result == 0) ? 1 : result;
673+
final MessageIdImpl lhs = this.messageId;
674+
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
675+
return MessageIdImpl.messageIdCompare(
676+
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
677+
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
673678
} else {
674679
return this.messageId.compareTo(messageId);
675680
}

0 commit comments

Comments
 (0)