Skip to content

Commit d38a22d

Browse files
yyj8lhotari
andcommitted
[improve][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable (#23634)
Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit d833b8b)
1 parent c3f6f59 commit d38a22d

3 files changed

Lines changed: 254 additions & 12 deletions

File tree

pulsar-broker-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@
9494
<artifactId>rest-assured</artifactId>
9595
<scope>test</scope>
9696
</dependency>
97+
98+
<dependency>
99+
<groupId>org.glassfish.jersey.core</groupId>
100+
<artifactId>jersey-server</artifactId>
101+
<scope>test</scope>
102+
</dependency>
97103
</dependencies>
98104

99105
<build>

pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,23 @@
1818
*/
1919
package org.apache.pulsar.common.configuration;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import java.io.File;
23+
import java.lang.management.ManagementFactory;
24+
import java.lang.management.ThreadInfo;
25+
import java.lang.management.ThreadMXBean;
26+
import java.time.Clock;
27+
import java.util.Arrays;
2228
import java.util.function.Supplier;
29+
import java.util.stream.Collectors;
2330
import javax.servlet.ServletContext;
2431
import javax.ws.rs.GET;
2532
import javax.ws.rs.Path;
2633
import javax.ws.rs.WebApplicationException;
2734
import javax.ws.rs.core.Context;
2835
import javax.ws.rs.core.Response.Status;
2936
import lombok.extern.slf4j.Slf4j;
37+
import org.apache.pulsar.common.util.ThreadDumpUtil;
3038

3139
/**
3240
* Web resource used by the VIP service to check to availability of the service instance.
@@ -38,25 +46,92 @@ public class VipStatus {
3846
public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
3947
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
4048

49+
// log a full thread dump when a deadlock is detected in status check once every 10 minutes
50+
// to prevent excessive logging
51+
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
52+
// Rate limit status checks to every 500ms to prevent DoS
53+
private static final long CHECK_STATUS_INTERVAL = 500L;
54+
55+
private static volatile long lastCheckStatusTimestamp;
56+
private static volatile long lastPrintThreadDumpTimestamp;
57+
private static volatile boolean lastCheckStatusResult;
58+
59+
private long printThreadDumpIntervalMs;
60+
private Clock clock;
61+
4162
@Context
4263
protected ServletContext servletContext;
4364

65+
public VipStatus() {
66+
this.clock = Clock.systemUTC();
67+
this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED;
68+
}
69+
70+
@VisibleForTesting
71+
public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) {
72+
this.servletContext = servletContext;
73+
this.printThreadDumpIntervalMs = printThreadDumpIntervalMs;
74+
this.clock = Clock.systemUTC();
75+
}
76+
77+
@VisibleForTesting
78+
static void reset() {
79+
lastCheckStatusTimestamp = 0L;
80+
lastPrintThreadDumpTimestamp = 0L;
81+
lastCheckStatusResult = false;
82+
}
83+
4484
@GET
4585
public String checkStatus() {
46-
String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
47-
@SuppressWarnings("unchecked")
48-
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
86+
// Locking classes to avoid deadlock detection in multi-thread concurrent requests.
87+
synchronized (VipStatus.class) {
88+
if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
89+
if (lastCheckStatusResult) {
90+
return "OK";
91+
} else {
92+
throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
93+
}
94+
}
95+
lastCheckStatusTimestamp = clock.millis();
4996

50-
boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
97+
String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
98+
@SuppressWarnings("unchecked")
99+
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);
100+
boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;
51101

52-
if (statusFilePath != null) {
53-
File statusFile = new File(statusFilePath);
54-
if (isReady && statusFile.exists() && statusFile.isFile()) {
55-
return "OK";
102+
if (statusFilePath != null) {
103+
File statusFile = new File(statusFilePath);
104+
if (isReady && statusFile.exists() && statusFile.isFile()) {
105+
// check deadlock
106+
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
107+
long[] threadIds = threadBean.findDeadlockedThreads();
108+
if (threadIds != null && threadIds.length > 0) {
109+
ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false,
110+
false);
111+
String threadNames = Arrays.stream(threadInfos)
112+
.map(threadInfo -> threadInfo.getThreadName()
113+
+ "(tid=" + threadInfo.getThreadId() + ")")
114+
.collect(Collectors.joining(", "));
115+
if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) {
116+
String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString();
117+
log.error("Deadlocked threads detected. {}. Service may be unavailable, "
118+
+ "thread stack details are as follows:\n{}", threadNames, diagnosticResult);
119+
lastPrintThreadDumpTimestamp = clock.millis();
120+
} else {
121+
log.error("Deadlocked threads detected. {}", threadNames);
122+
}
123+
lastCheckStatusResult = false;
124+
throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
125+
} else {
126+
lastCheckStatusResult = true;
127+
return "OK";
128+
}
129+
}
56130
}
131+
lastCheckStatusResult = false;
132+
log.warn("Status file '{}' doesn't exist or ready probe value ({}) isn't true. The service is not ready",
133+
statusFilePath, isReady);
134+
throw new WebApplicationException(Status.NOT_FOUND);
57135
}
58-
log.warn("Failed to access \"status.html\". The service is not ready");
59-
throw new WebApplicationException(Status.NOT_FOUND);
60136
}
61-
62-
}
137+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.configuration;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import java.io.Closeable;
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Phaser;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.locks.ReentrantLock;
30+
import java.util.function.Supplier;
31+
import javax.servlet.ServletContext;
32+
import javax.ws.rs.WebApplicationException;
33+
import javax.ws.rs.core.Response;
34+
import lombok.SneakyThrows;
35+
import lombok.extern.slf4j.Slf4j;
36+
import org.assertj.core.util.Files;
37+
import org.mockito.Mockito;
38+
import org.testng.annotations.AfterMethod;
39+
import org.testng.annotations.BeforeMethod;
40+
import org.testng.annotations.Test;
41+
42+
@Slf4j
43+
public class VipStatusTest {
44+
45+
public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath";
46+
public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe";
47+
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L;
48+
// Rate limit status checks to every 500ms to prevent DoS
49+
private static final long CHECK_STATUS_INTERVAL = 500L;
50+
51+
private ServletContext mockServletContext;
52+
private VipStatus vipStatus;
53+
private File file;
54+
55+
@BeforeMethod
56+
public void setup() throws IOException {
57+
file = Files.newTemporaryFile();
58+
Supplier<Boolean> isReadyProbe = () -> true;
59+
mockServletContext = Mockito.mock(ServletContext.class);
60+
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(file.getAbsolutePath());
61+
Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe);
62+
vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED);
63+
VipStatus.reset();
64+
}
65+
66+
@Test
67+
public void testVipStatusCheckStatus() {
68+
// No deadlocks
69+
testVipStatusCheckStatusWithoutDeadlock();
70+
// There is a deadlock
71+
testVipStatusCheckStatusWithDeadlock();
72+
}
73+
74+
@AfterMethod(alwaysRun = true)
75+
public void release() throws IOException {
76+
if (file != null) {
77+
file.delete();
78+
file = null;
79+
}
80+
}
81+
82+
@Test
83+
public void testVipStatusCheckStatusWithoutDeadlock() {
84+
assertEquals(vipStatus.checkStatus(), "OK");
85+
}
86+
87+
@Test
88+
public void testVipStatusCheckStatusWithDeadlock() {
89+
MockDeadlock mockDeadlock = new MockDeadlock();
90+
boolean asExpected = true;
91+
try {
92+
mockDeadlock.startDeadlock();
93+
vipStatus.checkStatus();
94+
asExpected = false;
95+
System.out.println("Simulated deadlock, no deadlock detected, not as expected.");
96+
} catch (Exception wae) {
97+
System.out.println("Simulated deadlock and detected it, as expected.");
98+
} finally {
99+
mockDeadlock.close();
100+
}
101+
102+
if (!asExpected) {
103+
throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
104+
}
105+
}
106+
107+
static class MockDeadlock implements Closeable {
108+
private ExecutorService executorService = Executors.newCachedThreadPool();
109+
private ReentrantLock lockA = new ReentrantLock();
110+
private ReentrantLock lockB = new ReentrantLock();
111+
private Phaser phaser = new Phaser(2);
112+
113+
@SneakyThrows
114+
public void startDeadlock() {
115+
executorService.execute(new TaskOne());
116+
executorService.execute(new TaskTwo());
117+
Thread.sleep(CHECK_STATUS_INTERVAL);
118+
}
119+
120+
@Override
121+
public void close() {
122+
executorService.shutdownNow();
123+
}
124+
125+
private class TaskOne implements Runnable {
126+
@Override
127+
public void run() {
128+
try {
129+
lockA.lock();
130+
System.out.println("ThreadOne acquired lockA");
131+
phaser.arriveAndAwaitAdvance();
132+
while (!lockB.tryLock(1, TimeUnit.SECONDS)) {
133+
System.out.println("ThreadOne acquired lockB");
134+
}
135+
} catch (InterruptedException e) {
136+
//e.printStackTrace();
137+
} finally {
138+
lockA.unlock();
139+
}
140+
}
141+
}
142+
143+
private class TaskTwo implements Runnable {
144+
@Override
145+
public void run() {
146+
try {
147+
lockB.lock();
148+
System.out.println("ThreadOne acquired lockB");
149+
phaser.arriveAndAwaitAdvance();
150+
while (!lockA.tryLock(1, TimeUnit.SECONDS)) {
151+
System.out.println("ThreadOne acquired lockA");
152+
}
153+
} catch (InterruptedException e) {
154+
//e.printStackTrace();
155+
} finally {
156+
lockB.unlock();
157+
}
158+
}
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)