Skip to content

Commit 3bc834f

Browse files
lhotariclaude
andauthored
[fix][client] Fix stale Healthy state in SameAuthParamsLookupAutoClusterFailover causing flaky test (#25388)
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
1 parent ce3429c commit 3bc834f

2 files changed

Lines changed: 258 additions & 0 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ private int findFailoverTo() {
143143
for (int i = currentPulsarServiceIndex + 1; i < pulsarServiceUrlArray.length; i++) {
144144
if (probeAvailable(i)) {
145145
return i;
146+
} else {
147+
// Mark the service as Failed to prevent a spurious recovery to it
148+
// after we failover to a higher-indexed service.
149+
pulsarServiceStateArray[i] = PulsarServiceState.Failed;
150+
checkCounterArray[i].setValue(0);
146151
}
147152
}
148153
return -1;
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyString;
23+
import static org.mockito.Mockito.doNothing;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
26+
import static org.testng.Assert.assertEquals;
27+
import io.netty.channel.EventLoopGroup;
28+
import java.lang.reflect.Method;
29+
import java.net.InetSocketAddress;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.TimeUnit;
32+
import org.apache.commons.lang3.mutable.MutableInt;
33+
import org.apache.commons.lang3.reflect.FieldUtils;
34+
import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
35+
import org.apache.pulsar.client.util.ExecutorProvider;
36+
import org.apache.pulsar.common.util.netty.EventLoopUtil;
37+
import org.awaitility.Awaitility;
38+
import org.testng.annotations.AfterMethod;
39+
import org.testng.annotations.BeforeMethod;
40+
import org.testng.annotations.Test;
41+
42+
@Test(groups = "broker-impl")
43+
public class SameAuthParamsLookupAutoClusterFailoverTest {
44+
45+
private static final String URL0 = "pulsar://broker0:6650";
46+
private static final String URL1 = "pulsar://broker1:6650";
47+
private static final String URL2 = "pulsar://broker2:6650";
48+
49+
private EventLoopGroup executor;
50+
private PulsarClientImpl mockClient;
51+
private SameAuthParamsLookupAutoClusterFailover failover;
52+
private PulsarServiceState[] stateArray;
53+
private MutableInt[] counterArray;
54+
55+
@BeforeMethod
56+
public void setup() throws Exception {
57+
executor = EventLoopUtil.newEventLoopGroup(1, false,
58+
new ExecutorProvider.ExtendedThreadFactory("test-failover"));
59+
60+
String[] urlArray = new String[]{URL0, URL1, URL2};
61+
failover = SameAuthParamsLookupAutoClusterFailover.builder()
62+
.pulsarServiceUrlArray(urlArray)
63+
.failoverThreshold(1)
64+
.recoverThreshold(2)
65+
.checkHealthyIntervalMs(100)
66+
.testTopic("a/b/c")
67+
.markTopicNotFoundAsAvailable(true)
68+
.build();
69+
70+
mockClient = mock(PulsarClientImpl.class);
71+
doNothing().when(mockClient).updateServiceUrl(anyString());
72+
doNothing().when(mockClient).reloadLookUp();
73+
74+
FieldUtils.writeField(failover, "pulsarClient", mockClient, true);
75+
FieldUtils.writeField(failover, "executor", executor, true);
76+
77+
stateArray = (PulsarServiceState[]) FieldUtils.readField(failover, "pulsarServiceStateArray", true);
78+
counterArray = (MutableInt[]) FieldUtils.readField(failover, "checkCounterArray", true);
79+
}
80+
81+
@AfterMethod(alwaysRun = true)
82+
public void cleanup() {
83+
if (executor != null) {
84+
executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
85+
}
86+
}
87+
88+
private void setLookupResult(String url, boolean available) {
89+
LookupService lookup = mock(LookupService.class);
90+
if (available) {
91+
InetSocketAddress addr = InetSocketAddress.createUnresolved("broker", 6650);
92+
when(lookup.getBroker(any()))
93+
.thenReturn(CompletableFuture.completedFuture(
94+
new LookupTopicResult(addr, addr, false)));
95+
} else {
96+
when(lookup.getBroker(any()))
97+
.thenReturn(CompletableFuture.failedFuture(
98+
new RuntimeException("connection refused")));
99+
}
100+
when(mockClient.getLookup(url)).thenReturn(lookup);
101+
}
102+
103+
/**
104+
* Reproduces the race condition where findFailoverTo() skips over an unavailable service
105+
* without marking it as Failed. This leaves a stale Healthy state that causes a spurious
106+
* recovery bounce (0 -> 2 -> 1 -> 2) instead of a clean failover (0 -> 2).
107+
*
108+
* <p>Before the fix, after failover from index 0 to 2, state[1] remained Healthy (stale).
109+
* On the next check cycle, firstHealthyPulsarService() would see state[1]=Healthy and
110+
* immediately "recover" to index 1 — which is actually a broken service. This caused
111+
* unnecessary bouncing and, combined with 3-second probe timeouts on dead services,
112+
* could push the total failover time past the test's awaitility timeout.
113+
*/
114+
@Test(timeOut = 30000)
115+
public void testFindFailoverToMarksSkippedServicesAsFailed() throws Exception {
116+
// url0 is down, url1 is down, url2 is healthy.
117+
setLookupResult(URL0, false);
118+
setLookupResult(URL1, false);
119+
setLookupResult(URL2, true);
120+
121+
// Pre-set state[0] to Failed (as if checkPulsarServices already detected it),
122+
// then run one check cycle. All on the executor to ensure thread safety.
123+
runOnExecutor(() -> {
124+
stateArray[0] = PulsarServiceState.Failed;
125+
counterArray[0].setValue(0);
126+
});
127+
runCheckCycle();
128+
129+
// After the fix, findFailoverTo marks url1 as Failed when it fails probing.
130+
// Verify on the executor thread where state is owned.
131+
runOnExecutor(() -> {
132+
assertEquals(failover.getCurrentPulsarServiceIndex(), 2,
133+
"Should have failed over to index 2");
134+
assertEquals(stateArray[1], PulsarServiceState.Failed,
135+
"Service 1 should be marked Failed by findFailoverTo, not remain stale Healthy");
136+
assertEquals(stateArray[2], PulsarServiceState.Healthy,
137+
"Service 2 should remain Healthy");
138+
});
139+
}
140+
141+
/**
142+
* Verifies no spurious recovery bounce occurs after failover. Without the fix,
143+
* the first check cycle after failover to index 2 would see stale Healthy state[1]
144+
* and immediately switch to index 1.
145+
*/
146+
@Test(timeOut = 30000)
147+
public void testNoSpuriousRecoveryBounceAfterFailover() throws Exception {
148+
// url0 is down, url1 is down, url2 is healthy.
149+
setLookupResult(URL0, false);
150+
setLookupResult(URL1, false);
151+
setLookupResult(URL2, true);
152+
153+
// Pre-set state[0] to Failed.
154+
runOnExecutor(() -> {
155+
stateArray[0] = PulsarServiceState.Failed;
156+
counterArray[0].setValue(0);
157+
});
158+
159+
// Failover: 0 -> 2.
160+
runCheckCycle();
161+
runOnExecutor(() -> assertEquals(failover.getCurrentPulsarServiceIndex(), 2));
162+
163+
// Run another check cycle. Without the fix, state[1] would be stale Healthy,
164+
// and firstHealthyPulsarService would return 1, causing a spurious switch.
165+
runCheckCycle();
166+
runOnExecutor(() -> assertEquals(failover.getCurrentPulsarServiceIndex(), 2,
167+
"Should stay at index 2, not bounce to index 1"));
168+
}
169+
170+
/**
171+
* Verifies that recovery still works correctly for a service that was marked Failed
172+
* by findFailoverTo, once that service becomes available again.
173+
*/
174+
@Test(timeOut = 30000)
175+
public void testRecoveryAfterFindFailoverToMarksServiceFailed() throws Exception {
176+
// url0 is down, url1 is down, url2 is healthy.
177+
setLookupResult(URL0, false);
178+
setLookupResult(URL1, false);
179+
setLookupResult(URL2, true);
180+
181+
// Pre-set state[0] to Failed and trigger failover 0 -> 2.
182+
runOnExecutor(() -> {
183+
stateArray[0] = PulsarServiceState.Failed;
184+
counterArray[0].setValue(0);
185+
});
186+
runCheckCycle();
187+
runOnExecutor(() -> {
188+
assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
189+
assertEquals(stateArray[1], PulsarServiceState.Failed);
190+
});
191+
192+
// Now make url1 healthy (simulating recovery of that service).
193+
setLookupResult(URL1, true);
194+
195+
// Run check cycles until service 1 recovers.
196+
// Failed -> PreRecover (1 check) -> Healthy (recoverThreshold=2, so 1 more check).
197+
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
198+
runCheckCycle();
199+
runOnExecutor(() -> {
200+
assertEquals(failover.getCurrentPulsarServiceIndex(), 1,
201+
"Should recover to index 1 after it becomes healthy");
202+
assertEquals(stateArray[1], PulsarServiceState.Healthy);
203+
});
204+
});
205+
}
206+
207+
private void runOnExecutor(Runnable task) throws Exception {
208+
try {
209+
executor.submit(task).get(5, TimeUnit.SECONDS);
210+
} catch (java.util.concurrent.ExecutionException e) {
211+
// Unwrap so that AssertionErrors propagate directly to Awaitility.
212+
if (e.getCause() instanceof AssertionError) {
213+
throw (AssertionError) e.getCause();
214+
}
215+
throw e;
216+
}
217+
}
218+
219+
private void runCheckCycle() throws Exception {
220+
runOnExecutor(() -> {
221+
try {
222+
Method checkMethod = SameAuthParamsLookupAutoClusterFailover.class
223+
.getDeclaredMethod("checkPulsarServices");
224+
checkMethod.setAccessible(true);
225+
Method firstHealthyMethod = SameAuthParamsLookupAutoClusterFailover.class
226+
.getDeclaredMethod("firstHealthyPulsarService");
227+
firstHealthyMethod.setAccessible(true);
228+
Method findFailoverMethod = SameAuthParamsLookupAutoClusterFailover.class
229+
.getDeclaredMethod("findFailoverTo");
230+
findFailoverMethod.setAccessible(true);
231+
Method updateMethod = SameAuthParamsLookupAutoClusterFailover.class
232+
.getDeclaredMethod("updateServiceUrl", int.class);
233+
updateMethod.setAccessible(true);
234+
235+
checkMethod.invoke(failover);
236+
int firstHealthy = (int) firstHealthyMethod.invoke(failover);
237+
int currentIndex = failover.getCurrentPulsarServiceIndex();
238+
if (firstHealthy != currentIndex) {
239+
if (firstHealthy < 0) {
240+
int failoverTo = (int) findFailoverMethod.invoke(failover);
241+
if (failoverTo >= 0) {
242+
updateMethod.invoke(failover, failoverTo);
243+
}
244+
} else {
245+
updateMethod.invoke(failover, firstHealthy);
246+
}
247+
}
248+
} catch (Exception e) {
249+
throw new RuntimeException(e);
250+
}
251+
});
252+
}
253+
}

0 commit comments

Comments
 (0)