99#include " os.h"
1010#include " arch_dd.h"
1111#include " common.h"
12+ #include " primeProbing.h"
1213#include < string.h>
1314#include < signal.h>
1415#include < pthread.h>
@@ -70,8 +71,6 @@ class LongHashTable {
7071
7172 CallTraceSample *values () { return (CallTraceSample *)(keys () + _capacity); }
7273
73- u32 nextSlot (u32 slot) const { return (slot + 1 ) & (_capacity - 1 ); }
74-
7574 void clear () {
7675 memset (keys (), 0 , (sizeof (u64 ) + sizeof (CallTraceSample)) * _capacity);
7776 _size = 0 ;
@@ -178,21 +177,23 @@ CallTrace *CallTraceHashTable::storeCallTrace(int num_frames,
178177
179178CallTrace *CallTraceHashTable::findCallTrace (LongHashTable *table, u64 hash) {
180179 u64 *keys = table->keys ();
181- u32 capacity = table->capacity ();
182- u32 slot = hash & (capacity - 1 );
183- u32 step = 0 ;
180+ HashProbe probe (hash, table->capacity ());
184181
185- while (keys[slot] != hash) {
182+ u32 slot = probe.slot ();
183+ while (true ) {
184+ if (keys[slot] == hash) {
185+ return table->values ()[slot].trace ;
186+ }
186187 if (keys[slot] == 0 ) {
187188 return nullptr ;
188189 }
189- if (++step >= capacity ) {
190- return nullptr ;
190+ if (!probe. hasNext () ) {
191+ break ;
191192 }
192- slot = (slot + step) & (capacity - 1 );
193- }
193+ slot = probe. next ( );
194+ };
194195
195- return table-> values ()[slot]. trace ;
196+ return nullptr ;
196197}
197198
198199u64 CallTraceHashTable::put (int num_frames, ASGCT_CallFrame *frames,
@@ -207,10 +208,9 @@ u64 CallTraceHashTable::put(int num_frames, ASGCT_CallFrame *frames,
207208 }
208209
209210 u64 *keys = table->keys ();
210- u32 capacity = table->capacity ();
211- u32 slot = hash & (capacity - 1 );
212- u32 step = 0 ;
213-
211+ HashProbe probe (hash, table->capacity ());
212+
213+ u32 slot = probe.slot ();
214214 while (true ) {
215215 u64 key_value = __atomic_load_n (&keys[slot], __ATOMIC_RELAXED);
216216 if (key_value == hash) {
@@ -278,6 +278,8 @@ u64 CallTraceHashTable::put(int num_frames, ASGCT_CallFrame *frames,
278278 u32 new_size = table->incSize ();
279279 u32 capacity = table->capacity ();
280280
281+ probe.updateCapacity (new_size);
282+
281283 // EXPANSION LOGIC: Check if 75% capacity reached after incrementing size
282284 if (new_size == capacity * 3 / 4 ) {
283285 // Allocate new table with double capacity using LinearAllocator
@@ -314,18 +316,18 @@ u64 CallTraceHashTable::put(int num_frames, ASGCT_CallFrame *frames,
314316 return trace->trace_id ;
315317 }
316318
317- if (++step >= capacity ) {
319+ if (!probe. hasNext () ) {
318320 // Table overflow - very unlikely with expansion logic
319321 atomicIncRelaxed (_overflow);
320322 return OVERFLOW_TRACE_ID;
321323 }
322- // Linear probing with step increment
323- slot = (slot + step) & (capacity - 1 );
324+ // Prime probing for better distribution
325+ slot = probe. next ( );
324326 }
325327}
326328
327- void CallTraceHashTable::collect (std::unordered_set<CallTrace *> &traces) {
328- // Lock-free collection for read-only tables (after atomic swap)
329+ void CallTraceHashTable::collect (std::unordered_set<CallTrace *> &traces, std::function< void (CallTrace*)> trace_hook ) {
330+ // Lock-free collection for read-only tables
329331 // No new put() operations can occur, so no synchronization needed
330332
331333 // Collect from all tables in the chain (current and previous tables)
@@ -337,6 +339,9 @@ void CallTraceHashTable::collect(std::unordered_set<CallTrace *> &traces) {
337339 if (keys[slot] != 0 ) {
338340 CallTrace *trace = values[slot].acquireTrace ();
339341 if (trace != nullptr && trace != CallTraceSample::PREPARING) {
342+ if (trace_hook) {
343+ trace_hook (trace); // Call hook first if provided
344+ }
340345 traces.insert (trace);
341346 }
342347 }
@@ -345,10 +350,14 @@ void CallTraceHashTable::collect(std::unordered_set<CallTrace *> &traces) {
345350
346351 // Handle overflow trace
347352 if (_overflow > 0 ) {
353+ if (trace_hook) {
354+ trace_hook (&_overflow_trace); // Call hook for overflow trace too
355+ }
348356 traces.insert (&_overflow_trace);
349357 }
350358}
351359
360+
352361void CallTraceHashTable::putWithExistingId (CallTrace* source_trace, u64 weight) {
353362 // Trace preservation for standby tables (no contention with new puts)
354363 // This is safe because new put() operations go to the new active table
@@ -371,22 +380,22 @@ void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight)
371380
372381 u64 *keys = table->keys ();
373382 u32 capacity = table->capacity ();
374- u32 slot = hash & (capacity - 1 );
375-
383+
384+ HashProbe probe (hash, capacity);
385+ u32 slot = probe.slot ();
386+
376387 // Look for existing entry or empty slot - no locking needed
377388 while (true ) {
378389 u64 key_value = __atomic_load_n (&keys[slot], __ATOMIC_RELAXED);
379- if (key_value == hash) {
380- // Found existing entry - just use it (trace already preserved)
381- break ;
382- }
383390 if (key_value == 0 ) {
384391 // Found empty slot - claim it atomically
385392 u64 expected = 0 ;
386393 if (!__atomic_compare_exchange_n (&keys[slot], &expected, hash, false , __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
387394 // Another thread claimed it, try next slot
388- slot = table->nextSlot (slot);
389- continue ;
395+ if (probe.hasNext ()) {
396+ slot = probe.next ();
397+ continue ;
398+ }
390399 }
391400
392401 // Create a copy of the source trace preserving its exact ID
@@ -403,14 +412,15 @@ void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight)
403412 Counters::increment (CALLTRACE_STORAGE_TRACES);
404413
405414 // Increment table size
406- table->incSize ();
415+ u32 new_size = table->incSize ();
416+ probe.updateCapacity (new_size);
407417 } else {
408418 // Allocation failure - clear the key we claimed
409419 __atomic_store_n (&keys[slot], 0 , __ATOMIC_RELEASE);
410420 }
411421 break ;
412422 }
413423
414- slot = table-> nextSlot (slot );
424+ slot = probe. next ( );
415425 }
416426}
0 commit comments