Skip to content

Commit 292144f

Browse files
authored
JVM hangs during shutdown (#287)
1 parent e1f3dfa commit 292144f

3 files changed

Lines changed: 61 additions & 207 deletions

File tree

ddprof-lib/src/main/cpp/flightRecorder.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#include <vector>
4141
#include <unistd.h>
4242

43-
static SpinLock _rec_lock(1);
43+
static SpinLock _rec_lock(0);
4444

4545
static const char *const SETTING_RING[] = {NULL, "kernel", "user", "any"};
4646
static const char *const SETTING_CSTACK[] = {NULL, "no", "fp", "dwarf", "lbr"};
@@ -1460,6 +1460,7 @@ void Recording::addThread(int lock_index, int tid) {
14601460
}
14611461

14621462
Error FlightRecorder::start(Arguments &args, bool reset) {
1463+
ExclusiveLockGuard locker(&_rec_lock);
14631464
const char *file = args.file();
14641465
if (file == NULL || file[0] == 0) {
14651466
_filename = "";
@@ -1473,7 +1474,6 @@ Error FlightRecorder::start(Arguments &args, bool reset) {
14731474
}
14741475

14751476
Error ret = newRecording(reset);
1476-
_rec_lock.unlock();
14771477
return ret;
14781478
}
14791479

@@ -1484,34 +1484,35 @@ Error FlightRecorder::newRecording(bool reset) {
14841484
return Error("Could not open Flight Recorder output file");
14851485
}
14861486

1487-
_rec = new Recording(fd, _args);
1487+
// Given some of reads are not protected by _rec_lock,
1488+
// we want to publish _rec with full fence, so that read
1489+
// side cannot see partially initialized recording.
1490+
Recording* tmp = new Recording(fd, _args);
1491+
__atomic_store_n(&_rec, tmp, __ATOMIC_SEQ_CST);
14881492
return Error::OK;
14891493
}
14901494

14911495
void FlightRecorder::stop() {
1496+
ExclusiveLockGuard locker(&_rec_lock);
14921497
if (_rec != NULL) {
1493-
_rec_lock.lock();
1494-
1495-
Recording *tmp = _rec;
1498+
volatile Recording *tmp = _rec;
14961499
// NULL first, deallocate later
1497-
_rec = NULL;
1500+
__atomic_store_n(&_rec, nullptr, __ATOMIC_RELAXED);
14981501
delete tmp;
14991502
}
15001503
}
15011504

15021505
Error FlightRecorder::dump(const char *filename, const int length) {
1506+
ExclusiveLockGuard locker(&_rec_lock);
15031507
if (_rec != NULL) {
1504-
_rec_lock.lock();
15051508
if (_filename.length() != length ||
15061509
strncmp(filename, _filename.c_str(), length) != 0) {
15071510
// if the filename to dump the recording to is specified move the current
15081511
// working file there
15091512
int copy_fd = open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644);
15101513
_rec->switchChunk(copy_fd);
15111514
close(copy_fd);
1512-
_rec_lock.unlock();
15131515
} else {
1514-
_rec_lock.unlock();
15151516
return Error(
15161517
"Can not dump recording to itself. Provide a different file name!");
15171518
}
@@ -1523,9 +1524,8 @@ Error FlightRecorder::dump(const char *filename, const int length) {
15231524
}
15241525

15251526
void FlightRecorder::flush() {
1527+
ExclusiveLockGuard locker(&_rec_lock);
15261528
if (_rec != NULL) {
1527-
_rec_lock.lock();
1528-
15291529
jvmtiEnv *jvmti = VM::jvmti();
15301530
JNIEnv *env = VM::jni();
15311531

@@ -1542,7 +1542,6 @@ void FlightRecorder::flush() {
15421542
jvmti->Deallocate((unsigned char *)classes[i]);
15431543
}
15441544
}
1545-
_rec_lock.unlock();
15461545
}
15471546
}
15481547

ddprof-lib/src/main/cpp/flightRecorder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ class FlightRecorder {
291291
private:
292292
std::string _filename;
293293
Arguments _args;
294-
Recording *_rec;
294+
Recording* volatile _rec;
295295

296296
Error newRecording(bool reset);
297297

ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java

Lines changed: 48 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -10,224 +10,79 @@
1010
import java.util.concurrent.*;
1111
import java.util.concurrent.atomic.AtomicBoolean;
1212
import java.util.concurrent.atomic.AtomicLong;
13-
import java.util.concurrent.atomic.AtomicIntegerArray;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
import java.util.concurrent.atomic.AtomicLongArray;
1415

1516
@State(Scope.Benchmark)
1617
public class ThreadFilterBenchmark extends Configuration {
1718

1819
@Param({"true", "false"}) // Parameterize the filter usage
1920
public boolean useThreadFilters;
2021

21-
private static final int NUM_THREADS = 15;
22-
private ExecutorService executorService;
2322
private JavaProfiler profiler;
24-
private AtomicBoolean running;
25-
private CountDownLatch startLatch;
26-
private CountDownLatch stopLatch;
27-
private AtomicLong operationCount;
28-
private long startTime;
29-
private long stopTime;
30-
private PrintWriter logWriter;
3123
private static final int ARRAY_SIZE = 1024; // Larger array to stress memory
32-
private static final int[] sharedArray = new int[ARRAY_SIZE];
33-
private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE);
24+
private static final long[] sharedArray = new long[ARRAY_SIZE];
25+
private static final AtomicLongArray atomicArray = new AtomicLongArray(ARRAY_SIZE);
3426
private static final int CACHE_LINE_SIZE = 64; // Typical cache line size
3527
private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line
36-
private AtomicLong addThreadCount = new AtomicLong(0);
37-
private AtomicLong removeThreadCount = new AtomicLong(0);
3828

3929
@Setup(Level.Trial)
4030
public void setup() throws IOException {
41-
System.out.println("Setting up benchmark...");
42-
System.out.println("Thread filters enabled: " + useThreadFilters);
43-
System.out.println("Creating thread pool with " + NUM_THREADS + " threads");
44-
executorService = Executors.newFixedThreadPool(NUM_THREADS);
45-
System.out.println("Getting profiler instance");
4631
profiler = JavaProfiler.getInstance();
47-
48-
// Stop the profiler if it's already running
49-
try {
50-
profiler.stop();
51-
} catch (IllegalStateException e) {
52-
System.out.println("Profiler was not active at setup.");
53-
}
54-
55-
String config = "start,wall=10ms,filter=1,file=/tmp/thread_filter_profile.jfr";
56-
System.out.println("Starting profiler with " + config);
57-
profiler.execute(config);
58-
System.out.println("Started profiler with output file");
59-
60-
running = new AtomicBoolean(true);
61-
operationCount = new AtomicLong(0);
62-
startTime = System.currentTimeMillis();
63-
stopTime = startTime + 30000; // Run for 30 seconds
64-
System.out.println("Benchmark setup completed at " + startTime);
65-
66-
try {
67-
String logFile = "/tmp/thread_filter_benchmark.log";
68-
System.out.println("Attempting to create log file at: " + logFile);
69-
logWriter = new PrintWriter(new FileWriter(logFile));
70-
logWriter.printf("Benchmark started at %d%n", startTime);
71-
logWriter.flush();
72-
System.out.println("Successfully created and wrote to log file");
73-
} catch (IOException e) {
74-
System.err.println("Failed to create log file: " + e.getMessage());
75-
e.printStackTrace();
76-
throw e;
77-
}
78-
}
79-
80-
@TearDown(Level.Trial)
81-
public void tearDown() {
82-
System.out.println("Tearing down benchmark...");
83-
running.set(false);
84-
85-
// Wait for all threads to finish with a timeout
86-
try {
87-
if (stopLatch != null) {
88-
if (!stopLatch.await(30, TimeUnit.SECONDS)) {
89-
System.err.println("Warning: Some threads did not finish within timeout");
90-
}
91-
}
92-
} catch (InterruptedException e) {
93-
Thread.currentThread().interrupt();
94-
}
95-
96-
// Shutdown executor with timeout
97-
executorService.shutdown();
98-
try {
99-
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
100-
executorService.shutdownNow();
101-
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
102-
System.err.println("Warning: Executor did not terminate");
103-
}
104-
}
105-
} catch (InterruptedException e) {
106-
executorService.shutdownNow();
107-
Thread.currentThread().interrupt();
108-
}
109-
110-
long endTime = System.currentTimeMillis();
111-
long totalOps = operationCount.get();
112-
double durationSecs = (endTime - startTime) / 1000.0;
113-
double opsPerSec = totalOps / durationSecs;
114-
double addOpsPerSec = addThreadCount.get() / durationSecs;
115-
double removeOpsPerSec = removeThreadCount.get() / durationSecs;
116-
117-
String stats = String.format("Thread Filter Stats:%n" +
118-
"Total operations: %,d%n" +
119-
"Duration: %.2f seconds%n" +
120-
"Operations/second: %,.0f%n" +
121-
"Operations/second/thread: %,.0f%n" +
122-
"AddThread operations/second: %,.0f%n" +
123-
"RemoveThread operations/second: %,.0f%n",
124-
totalOps, durationSecs, opsPerSec, opsPerSec / NUM_THREADS, addOpsPerSec, removeOpsPerSec);
125-
126-
System.out.print(stats);
127-
if (logWriter != null) {
128-
try {
129-
logWriter.print(stats);
130-
logWriter.flush();
131-
logWriter.close();
132-
System.out.println("Successfully closed log file");
133-
} catch (Exception e) {
134-
System.err.println("Error closing log file: " + e.getMessage());
135-
e.printStackTrace();
136-
}
137-
}
138-
}
139-
140-
public void setUseThreadFilters(boolean useThreadFilters) {
141-
this.useThreadFilters = useThreadFilters;
14232
}
14333

14434
@Benchmark
14535
@BenchmarkMode(Mode.Throughput)
14636
@Fork(value = 1, warmups = 1)
14737
@Warmup(iterations = 1, time = 1)
14838
@Measurement(iterations = 1, time = 2)
149-
@Threads(1)
39+
@Threads(15)
15040
@OutputTimeUnit(TimeUnit.MILLISECONDS)
151-
public long threadFilterStress() throws InterruptedException {
152-
System.out.println("Starting benchmark iteration...");
153-
startLatch = new CountDownLatch(NUM_THREADS);
154-
stopLatch = new CountDownLatch(NUM_THREADS);
41+
public void threadFilterStress() throws InterruptedException {
42+
long threadId = Thread.currentThread().getId();
43+
// Memory-intensive operations that would be sensitive to false sharing
44+
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
45+
if (useThreadFilters) {
46+
// Register thread at the start of each cache line operation
47+
profiler.addThread();
48+
}
15549

156-
// Start all worker threads[]
157-
for (int i = 0; i < NUM_THREADS; i++) {
158-
final int threadId = i;
159-
executorService.submit(() -> {
160-
try {
161-
startLatch.countDown();
162-
startLatch.await(30, TimeUnit.SECONDS);
163-
String startMsg = String.format("Thread %d started%n", threadId);
164-
System.out.print(startMsg);
165-
if (logWriter != null) {
166-
logWriter.print(startMsg);
167-
logWriter.flush();
168-
}
169-
170-
while (running.get() && System.currentTimeMillis() < stopTime) {
171-
// Memory-intensive operations that would be sensitive to false sharing
172-
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
173-
if (useThreadFilters) {
174-
// Register thread at the start of each cache line operation
175-
profiler.addThread();
176-
addThreadCount.incrementAndGet();
177-
}
178-
179-
// Each thread writes to its own cache line
180-
int baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
181-
for (int k = 0; k < STRIDE; k++) {
182-
int index = (baseIndex + k) % ARRAY_SIZE;
183-
// Write to shared array
184-
sharedArray[index] = threadId;
185-
// Read and modify
186-
int value = sharedArray[index] + 1;
187-
// Atomic operation
188-
atomicArray.set(index, value);
189-
}
190-
191-
if (useThreadFilters) {
192-
// Remove thread after cache line operation
193-
profiler.removeThread();
194-
removeThreadCount.incrementAndGet();
195-
}
196-
operationCount.incrementAndGet();
197-
}
50+
// Each thread writes to its own cache line
51+
long baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
52+
for (int k = 0; k < STRIDE; k++) {
53+
int index = (int)(baseIndex + k) % ARRAY_SIZE;
54+
// Write to shared array
55+
sharedArray[index] = threadId;
56+
// Read and modify
57+
long value = sharedArray[index] + 1;
58+
// Atomic operation
59+
atomicArray.set(index, value);
60+
}
19861

199-
// More memory operations with thread registration
200-
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
201-
if (useThreadFilters) {
202-
// Register thread at the start of each cache line operation
203-
profiler.addThread();
204-
addThreadCount.incrementAndGet();
205-
}
206-
207-
int baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
208-
for (int k = 0; k < STRIDE; k++) {
209-
int index = (baseIndex + k) % ARRAY_SIZE;
210-
int value = atomicArray.get(index);
211-
sharedArray[index] = value * 2;
212-
}
213-
214-
if (useThreadFilters) {
215-
// Remove thread after cache line operation
216-
profiler.removeThread();
217-
removeThreadCount.incrementAndGet();
218-
}
219-
operationCount.incrementAndGet();
220-
}
221-
}
222-
} catch (InterruptedException e) {
223-
Thread.currentThread().interrupt();
224-
} finally {
225-
stopLatch.countDown();
226-
}
227-
});
228-
}
62+
if (useThreadFilters) {
63+
// Remove thread after cache line operation
64+
profiler.removeThread();
65+
}
66+
}
67+
68+
// More memory operations with thread registration
69+
for (int j = 0; j < ARRAY_SIZE; j += STRIDE) {
70+
if (useThreadFilters) {
71+
// Register thread at the start of each cache line operation
72+
profiler.addThread();
73+
}
22974

230-
stopLatch.await();
231-
return operationCount.get();
75+
long baseIndex = (threadId * STRIDE) % ARRAY_SIZE;
76+
for (int k = 0; k < STRIDE; k++) {
77+
int index = (int)(baseIndex + k) % ARRAY_SIZE;
78+
long value = atomicArray.get(index);
79+
sharedArray[index] = value * 2;
80+
}
81+
82+
if (useThreadFilters) {
83+
// Remove thread after cache line operation
84+
profiler.removeThread();
85+
}
86+
}
23287
}
23388
}

0 commit comments

Comments
 (0)