1010import java .util .concurrent .*;
1111import java .util .concurrent .atomic .AtomicBoolean ;
1212import java .util .concurrent .atomic .AtomicLong ;
13- import java .util .concurrent .atomic .AtomicIntegerArray ;
13+ import java .util .concurrent .atomic .AtomicInteger ;
14+ import java .util .concurrent .atomic .AtomicLongArray ;
1415
1516@ State (Scope .Benchmark )
1617public class ThreadFilterBenchmark extends Configuration {
1718
1819 @ Param ({"true" , "false" }) // Parameterize the filter usage
1920 public boolean useThreadFilters ;
2021
21- private static final int NUM_THREADS = 15 ;
22- private ExecutorService executorService ;
2322 private JavaProfiler profiler ;
24- private AtomicBoolean running ;
25- private CountDownLatch startLatch ;
26- private CountDownLatch stopLatch ;
27- private AtomicLong operationCount ;
28- private long startTime ;
29- private long stopTime ;
30- private PrintWriter logWriter ;
3123 private static final int ARRAY_SIZE = 1024 ; // Larger array to stress memory
32- private static final int [] sharedArray = new int [ARRAY_SIZE ];
33- private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray (ARRAY_SIZE );
24+ private static final long [] sharedArray = new long [ARRAY_SIZE ];
25+ private static final AtomicLongArray atomicArray = new AtomicLongArray (ARRAY_SIZE );
3426 private static final int CACHE_LINE_SIZE = 64 ; // Typical cache line size
3527 private static final int STRIDE = CACHE_LINE_SIZE / Integer .BYTES ; // Elements per cache line
36- private AtomicLong addThreadCount = new AtomicLong (0 );
37- private AtomicLong removeThreadCount = new AtomicLong (0 );
3828
3929 @ Setup (Level .Trial )
4030 public void setup () throws IOException {
41- System .out .println ("Setting up benchmark..." );
42- System .out .println ("Thread filters enabled: " + useThreadFilters );
43- System .out .println ("Creating thread pool with " + NUM_THREADS + " threads" );
44- executorService = Executors .newFixedThreadPool (NUM_THREADS );
45- System .out .println ("Getting profiler instance" );
4631 profiler = JavaProfiler .getInstance ();
47-
48- // Stop the profiler if it's already running
49- try {
50- profiler .stop ();
51- } catch (IllegalStateException e ) {
52- System .out .println ("Profiler was not active at setup." );
53- }
54-
55- String config = "start,wall=10ms,filter=1,file=/tmp/thread_filter_profile.jfr" ;
56- System .out .println ("Starting profiler with " + config );
57- profiler .execute (config );
58- System .out .println ("Started profiler with output file" );
59-
60- running = new AtomicBoolean (true );
61- operationCount = new AtomicLong (0 );
62- startTime = System .currentTimeMillis ();
63- stopTime = startTime + 30000 ; // Run for 30 seconds
64- System .out .println ("Benchmark setup completed at " + startTime );
65-
66- try {
67- String logFile = "/tmp/thread_filter_benchmark.log" ;
68- System .out .println ("Attempting to create log file at: " + logFile );
69- logWriter = new PrintWriter (new FileWriter (logFile ));
70- logWriter .printf ("Benchmark started at %d%n" , startTime );
71- logWriter .flush ();
72- System .out .println ("Successfully created and wrote to log file" );
73- } catch (IOException e ) {
74- System .err .println ("Failed to create log file: " + e .getMessage ());
75- e .printStackTrace ();
76- throw e ;
77- }
78- }
79-
80- @ TearDown (Level .Trial )
81- public void tearDown () {
82- System .out .println ("Tearing down benchmark..." );
83- running .set (false );
84-
85- // Wait for all threads to finish with a timeout
86- try {
87- if (stopLatch != null ) {
88- if (!stopLatch .await (30 , TimeUnit .SECONDS )) {
89- System .err .println ("Warning: Some threads did not finish within timeout" );
90- }
91- }
92- } catch (InterruptedException e ) {
93- Thread .currentThread ().interrupt ();
94- }
95-
96- // Shutdown executor with timeout
97- executorService .shutdown ();
98- try {
99- if (!executorService .awaitTermination (30 , TimeUnit .SECONDS )) {
100- executorService .shutdownNow ();
101- if (!executorService .awaitTermination (30 , TimeUnit .SECONDS )) {
102- System .err .println ("Warning: Executor did not terminate" );
103- }
104- }
105- } catch (InterruptedException e ) {
106- executorService .shutdownNow ();
107- Thread .currentThread ().interrupt ();
108- }
109-
110- long endTime = System .currentTimeMillis ();
111- long totalOps = operationCount .get ();
112- double durationSecs = (endTime - startTime ) / 1000.0 ;
113- double opsPerSec = totalOps / durationSecs ;
114- double addOpsPerSec = addThreadCount .get () / durationSecs ;
115- double removeOpsPerSec = removeThreadCount .get () / durationSecs ;
116-
117- String stats = String .format ("Thread Filter Stats:%n" +
118- "Total operations: %,d%n" +
119- "Duration: %.2f seconds%n" +
120- "Operations/second: %,.0f%n" +
121- "Operations/second/thread: %,.0f%n" +
122- "AddThread operations/second: %,.0f%n" +
123- "RemoveThread operations/second: %,.0f%n" ,
124- totalOps , durationSecs , opsPerSec , opsPerSec / NUM_THREADS , addOpsPerSec , removeOpsPerSec );
125-
126- System .out .print (stats );
127- if (logWriter != null ) {
128- try {
129- logWriter .print (stats );
130- logWriter .flush ();
131- logWriter .close ();
132- System .out .println ("Successfully closed log file" );
133- } catch (Exception e ) {
134- System .err .println ("Error closing log file: " + e .getMessage ());
135- e .printStackTrace ();
136- }
137- }
138- }
139-
140- public void setUseThreadFilters (boolean useThreadFilters ) {
141- this .useThreadFilters = useThreadFilters ;
14232 }
14333
14434 @ Benchmark
14535 @ BenchmarkMode (Mode .Throughput )
14636 @ Fork (value = 1 , warmups = 1 )
14737 @ Warmup (iterations = 1 , time = 1 )
14838 @ Measurement (iterations = 1 , time = 2 )
149- @ Threads (1 )
39+ @ Threads (15 )
15040 @ OutputTimeUnit (TimeUnit .MILLISECONDS )
151- public long threadFilterStress () throws InterruptedException {
152- System .out .println ("Starting benchmark iteration..." );
153- startLatch = new CountDownLatch (NUM_THREADS );
154- stopLatch = new CountDownLatch (NUM_THREADS );
41+ public void threadFilterStress () throws InterruptedException {
42+ long threadId = Thread .currentThread ().getId ();
43+ // Memory-intensive operations that would be sensitive to false sharing
44+ for (int j = 0 ; j < ARRAY_SIZE ; j += STRIDE ) {
45+ if (useThreadFilters ) {
46+ // Register thread at the start of each cache line operation
47+ profiler .addThread ();
48+ }
15549
156- // Start all worker threads[]
157- for (int i = 0 ; i < NUM_THREADS ; i ++) {
158- final int threadId = i ;
159- executorService .submit (() -> {
160- try {
161- startLatch .countDown ();
162- startLatch .await (30 , TimeUnit .SECONDS );
163- String startMsg = String .format ("Thread %d started%n" , threadId );
164- System .out .print (startMsg );
165- if (logWriter != null ) {
166- logWriter .print (startMsg );
167- logWriter .flush ();
168- }
169-
170- while (running .get () && System .currentTimeMillis () < stopTime ) {
171- // Memory-intensive operations that would be sensitive to false sharing
172- for (int j = 0 ; j < ARRAY_SIZE ; j += STRIDE ) {
173- if (useThreadFilters ) {
174- // Register thread at the start of each cache line operation
175- profiler .addThread ();
176- addThreadCount .incrementAndGet ();
177- }
178-
179- // Each thread writes to its own cache line
180- int baseIndex = (threadId * STRIDE ) % ARRAY_SIZE ;
181- for (int k = 0 ; k < STRIDE ; k ++) {
182- int index = (baseIndex + k ) % ARRAY_SIZE ;
183- // Write to shared array
184- sharedArray [index ] = threadId ;
185- // Read and modify
186- int value = sharedArray [index ] + 1 ;
187- // Atomic operation
188- atomicArray .set (index , value );
189- }
190-
191- if (useThreadFilters ) {
192- // Remove thread after cache line operation
193- profiler .removeThread ();
194- removeThreadCount .incrementAndGet ();
195- }
196- operationCount .incrementAndGet ();
197- }
50+ // Each thread writes to its own cache line
51+ long baseIndex = (threadId * STRIDE ) % ARRAY_SIZE ;
52+ for (int k = 0 ; k < STRIDE ; k ++) {
53+ int index = (int )(baseIndex + k ) % ARRAY_SIZE ;
54+ // Write to shared array
55+ sharedArray [index ] = threadId ;
56+ // Read and modify
57+ long value = sharedArray [index ] + 1 ;
58+ // Atomic operation
59+ atomicArray .set (index , value );
60+ }
19861
199- // More memory operations with thread registration
200- for (int j = 0 ; j < ARRAY_SIZE ; j += STRIDE ) {
201- if (useThreadFilters ) {
202- // Register thread at the start of each cache line operation
203- profiler .addThread ();
204- addThreadCount .incrementAndGet ();
205- }
206-
207- int baseIndex = (threadId * STRIDE ) % ARRAY_SIZE ;
208- for (int k = 0 ; k < STRIDE ; k ++) {
209- int index = (baseIndex + k ) % ARRAY_SIZE ;
210- int value = atomicArray .get (index );
211- sharedArray [index ] = value * 2 ;
212- }
213-
214- if (useThreadFilters ) {
215- // Remove thread after cache line operation
216- profiler .removeThread ();
217- removeThreadCount .incrementAndGet ();
218- }
219- operationCount .incrementAndGet ();
220- }
221- }
222- } catch (InterruptedException e ) {
223- Thread .currentThread ().interrupt ();
224- } finally {
225- stopLatch .countDown ();
226- }
227- });
228- }
62+ if (useThreadFilters ) {
63+ // Remove thread after cache line operation
64+ profiler .removeThread ();
65+ }
66+ }
67+
68+ // More memory operations with thread registration
69+ for (int j = 0 ; j < ARRAY_SIZE ; j += STRIDE ) {
70+ if (useThreadFilters ) {
71+ // Register thread at the start of each cache line operation
72+ profiler .addThread ();
73+ }
22974
230- stopLatch .await ();
231- return operationCount .get ();
75+ long baseIndex = (threadId * STRIDE ) % ARRAY_SIZE ;
76+ for (int k = 0 ; k < STRIDE ; k ++) {
77+ int index = (int )(baseIndex + k ) % ARRAY_SIZE ;
78+ long value = atomicArray .get (index );
79+ sharedArray [index ] = value * 2 ;
80+ }
81+
82+ if (useThreadFilters ) {
83+ // Remove thread after cache line operation
84+ profiler .removeThread ();
85+ }
86+ }
23287 }
23388}
0 commit comments