Skip to content

Commit 953d092

Browse files
lhotariclaude
andauthored
[fix][test] Fix flaky PersistentStickyKeyDispatcherMultipleConsumersClassicTest.testSkipRedeliverTemporally (#25385)
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
1 parent 765c46e commit 953d092

1 file changed

Lines changed: 16 additions & 1 deletion

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,13 +499,23 @@ public void testSkipRedeliverTemporally() throws InterruptedException {
499499
return Collections.emptySet();
500500
}).when(cursorMock).asyncReplayEntries(anySet(), any(), any(), anyBoolean());
501501

502+
// Simulate real cursor behavior: track read position so entries are only returned once
503+
// by normal reads (subsequent access must go through asyncReplayEntries).
504+
// When no new entries are available, don't call the callback (simulating "OrWait" behavior).
505+
Set<Position> normalReadReturned = new ConcurrentSkipListSet<>();
502506
doAnswer(invocationOnMock -> {
503507
int maxEntries = invocationOnMock.getArgument(0);
504508
AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(2);
505509
List<Entry> entries = allEntries.stream()
506-
.filter(entry -> entry.getLedgerId() != -1 && !alreadySent.contains(entry.getPosition()))
510+
.filter(entry -> entry.getLedgerId() != -1
511+
&& !normalReadReturned.contains(entry.getPosition()))
507512
.limit(maxEntries)
508513
.toList();
514+
if (entries.isEmpty()) {
515+
// No new entries available - simulate "wait" by not calling callback
516+
return null;
517+
}
518+
entries.forEach(e -> normalReadReturned.add(e.getPosition()));
509519
Object ctx = invocationOnMock.getArgument(3);
510520
callback.readEntriesComplete(copyEntries(entries), ctx);
511521
return null;
@@ -551,6 +561,11 @@ public void testSkipRedeliverTemporally() throws InterruptedException {
551561
// set permits to 2
552562
slowConsumerAvailablePermits.set(2);
553563

564+
// Trigger a new read cycle so the dispatcher can do a replay read to deliver
565+
// messages to the slow consumer. In production, this would be triggered by
566+
// consumerFlow when the consumer sends more permits.
567+
persistentDispatcher.readMoreEntriesAsync();
568+
554569
// now wait for slow consumer messages since there are permits
555570
assertTrue(slowConsumerMessagesSent.await(5, TimeUnit.SECONDS));
556571

0 commit comments

Comments
 (0)