Skip to content

Commit 461ffd8

Browse files
[fix][broker] Ensure LoadSheddingTask is scheduled after metadata service is available again (apache#24838)
Motivation After PR: apache#23040, if the metadata service is unavailable and then becomes available again, the LoadSheddingTask will not run again. Modifications Ensure LoadSheddingTask is scheduled after metadata service is available again by moving the isMetadataServiceAvailable check to the try block.
1 parent 2233fa8 commit 461ffd8

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public void run() {
5959
if (isCancel) {
6060
return;
6161
}
62-
if (factory instanceof ManagedLedgerFactoryImpl
63-
&& !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) {
64-
return;
65-
}
6662
try {
63+
if (factory instanceof ManagedLedgerFactoryImpl
64+
&& !((ManagedLedgerFactoryImpl) factory).isMetadataServiceAvailable()) {
65+
return;
66+
}
6767
loadManager.get().doLoadShedding();
6868
} catch (Exception e) {
6969
LOG.warn("Error during the load shedding", e);

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.Mockito.atLeastOnce;
2323
import static org.mockito.Mockito.doReturn;
2424
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.spy;
2526
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
2728
import static org.testng.Assert.assertEquals;
@@ -521,12 +522,14 @@ public void testMetadataServiceNotAvailable() {
521522
AtomicReference<LoadManager> atomicLoadManager = new AtomicReference<>(loadManager);
522523
ManagedLedgerFactoryImpl factory = mock(ManagedLedgerFactoryImpl.class);
523524
doReturn(false).when(factory).isMetadataServiceAvailable();
524-
LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null, factory);
525+
LoadSheddingTask task2 = spy(new LoadSheddingTask(atomicLoadManager, null, null, factory));
525526
task2.run();
526527
verify(loadManager, times(0)).doLoadShedding();
528+
verify(task2, times(1)).start();
527529
doReturn(true).when(factory).isMetadataServiceAvailable();
528530
task2.run();
529531
verify(loadManager, times(1)).doLoadShedding();
532+
verify(task2, times(2)).start();
530533
}
531534

532535
@Test

0 commit comments

Comments
 (0)