Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Oct 21, 2025

PR Type

Bug fix, Enhancement


Description

  • Fix compactor pending jobs metric aggregation

  • Adjust compaction interval default to 10 seconds

  • Tune default compaction batch size by mode


Diagram Walkthrough

flowchart LR
  CFG["Config defaults"]
  METRIC["Pending jobs aggregation"]
  STORAGE_MY["MySQL impl"]
  STORAGE_PG["Postgres impl"]
  STORAGE_SQL["SQLite impl"]

  CFG -- "interval=10, batch size tuning" --> METRIC
  METRIC -- "sum counts per org/type" --> STORAGE_MY
  METRIC -- "sum counts per org/type" --> STORAGE_PG
  METRIC -- "sum counts per org/type" --> STORAGE_SQL
Loading

File Walkthrough

Relevant files
Enhancement
config.rs
Tune compaction defaults: interval and batch size               

src/config/src/config.rs

  • Lower default compact.interval from 60 to 10.
  • Set compact.batch_size to 100 in local mode.
  • Set compact.batch_size to cpu_num * 4 otherwise.
+6/-2     
Bug fix
mysql.rs
Fix MySQL pending job count aggregation                                   

src/infra/src/file_list/mysql.rs

  • Aggregate pending job counts by summing (+=) per org/type.
+1/-1     
postgres.rs
Fix Postgres pending job count aggregation                             

src/infra/src/file_list/postgres.rs

  • Aggregate pending job counts by summing (+=) per org/type.
+1/-1     
sqlite.rs
Fix SQLite pending job count aggregation                                 

src/infra/src/file_list/sqlite.rs

  • Aggregate pending job counts by summing (+=) per org/type.
+1/-1     

@github-actions github-actions bot added ☢️ Bug Something isn't working Review effort 2/5 labels Oct 21, 2025
@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Behavior Change

The default compact interval is reduced from 60 to 10 seconds, which increases scheduling frequency and system load. Validate impact on resource usage and whether documentation/flags reflect this new default.

if cfg.compact.interval < 1 {
    cfg.compact.interval = 10;
}
Batch Size Logic

Batch size now depends on mode: 100 for local and cpu_num*4 otherwise. Confirm cpu_num is always >=1 and that these defaults are appropriate across environments; consider upper bounds to avoid oversized batches on high-core machines.

if cfg.compact.batch_size < 1 {
    if cfg.common.local_mode {
        cfg.compact.batch_size = 100;
    } else {
        cfg.compact.batch_size = cfg.limit.cpu_num as i64 * 4;
    }
}
Aggregation Semantics

Pending jobs are now summed across entries for the same org/type. Ensure upstream query does not already aggregate, to avoid double counting; verify consistency across MySQL/Postgres/SQLite implementations and metrics consumers.

job_status
    .entry(org)
    .or_default()
    .entry(stream_type)
    .and_modify(|e| {
        *e += counts;
    })
    .or_insert(counts);

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent integer overflow on sum

Adding counts repeatedly can overflow if values are large or untrusted. Use checked
addition with a saturation or error path to prevent wraparound and corrupted
metrics.

src/infra/src/file_list/mysql.rs [1639-1641]

 .entry(stream_type)
-    .and_modify(|e| *e += counts)
+    .and_modify(|e| {
+        if let Some(sum) = e.checked_add(counts) {
+            *e = sum;
+        } else {
+            // Saturate on overflow to avoid wraparound
+            *e = i64::MAX;
+        }
+    })
     .or_insert(counts);
Suggestion importance[1-10]: 8

__

Why: Correctly identifies potential overflow when accumulating counts and proposes a safe, localized fix using checked addition with saturation, enhancing correctness under edge conditions.

Medium
General
Clamp batch size to safe range

Multiplying CPU by 4 for batch size can oversubscribe resources on large machines
and cause memory pressure. Cap the value to a sane maximum and ensure a minimum of 1
to avoid zero or negative edge cases.

src/config/src/config.rs [2806-2812]

 if cfg.compact.batch_size < 1 {
-    if cfg.common.local_mode {
-        cfg.compact.batch_size = 100;
-    } else {
-        cfg.compact.batch_size = cfg.limit.cpu_num as i64 * 4;
-    }
+    let base = if cfg.common.local_mode { 100 } else { (cfg.limit.cpu_num as i64).saturating_mul(4) };
+    // Clamp to a reasonable range to avoid overloads on large hosts
+    cfg.compact.batch_size = base.clamp(1, 10_000);
 }
Suggestion importance[1-10]: 7

__

Why: Adds protective clamping to prevent runaway batch sizes on large hosts, improving robustness without changing semantics for typical cases. It’s a reasonable safeguard though not strictly required by the diff.

Medium
Restore safer default interval

Lowering the default interval from 60 to 10 may cause excessive compaction frequency
and load spikes. Consider keeping a safer minimum by clamping to a reasonable floor
or making the value proportional to environment (e.g., local vs. prod).

src/config/src/config.rs [2777-2779]

 if cfg.compact.interval < 1 {
-    cfg.compact.interval = 10;
+    cfg.compact.interval = if cfg.common.local_mode { 10 } else { 60 };
 }
Suggestion importance[1-10]: 6

__

Why: Sensible proposal to avoid overly aggressive compaction by keeping a higher default in non-local mode; aligns with prior behavior. However, it’s a policy choice rather than a correctness bug, so impact is moderate.

Low

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

Fixed a critical bug in compactor pending jobs metrics where counts were being overwritten instead of accumulated when multiple streams share the same organization and stream type.

Key Changes:

  • Fixed get_pending_jobs_count() across all three database implementations (MySQL, Postgres, SQLite) by changing *e = counts to *e += counts when aggregating job counts by stream type
  • Optimized compactor performance by reducing default interval from 60s to 10s
  • Increased default batch size for better throughput: 4x CPU cores in production mode, 100 in local mode

Technical Details:
The bug occurred because the SQL query returns one row per stream (format: org/stream_type/stream_name), and when aggregating by (org, stream_type) pairs, the code was overwriting previous counts instead of summing them. For example, if org1/logs/stream1 had 10 jobs and org1/logs/stream2 had 5 jobs, the metric would incorrectly report 5 instead of 15.

The fix ensures accurate reporting of pending compaction jobs across all streams.

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk - it fixes a critical metrics bug with a simple, correct logic change
  • The changes are straightforward and fix an obvious bug (overwriting vs accumulating counts). The fix is applied consistently across all three database implementations. The config changes improve performance without introducing risks. SQL queries already use proper parameter binding for safety.
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
src/infra/src/file_list/mysql.rs 5/5 Fixed critical bug: changed assignment to accumulation when counting pending jobs by stream type
src/infra/src/file_list/postgres.rs 5/5 Fixed critical bug: changed assignment to accumulation when counting pending jobs by stream type
src/infra/src/file_list/sqlite.rs 5/5 Fixed critical bug: changed assignment to accumulation when counting pending jobs by stream type
src/config/src/config.rs 5/5 Adjusted compactor config defaults: reduced interval from 60s to 10s and increased batch_size (4x CPU cores in production, 100 in local mode)

Sequence Diagram

sequenceDiagram
    participant Compactor as Compactor Job
    participant FileList as File List Service
    participant DB as Database (MySQL/Postgres/SQLite)
    participant Metrics as Prometheus Metrics
    
    Note over Compactor: Every 300s (pending_jobs_metric_interval)
    Compactor->>FileList: get_pending_jobs_count()
    FileList->>DB: SELECT stream, status, count(*)<br/>WHERE status = Pending<br/>GROUP BY stream, status
    DB-->>FileList: Results (one row per stream)
    
    Note over FileList: Parse stream format:<br/>org/stream_type/stream_name
    
    loop For each result row
        FileList->>FileList: Extract org and stream_type
        Note over FileList: OLD BUG: *e = counts (overwrites)<br/>NEW FIX: *e += counts (accumulates)
        FileList->>FileList: Accumulate counts by<br/>(org, stream_type)
    end
    
    FileList-->>Compactor: HashMap<org, HashMap<stream_type, count>>
    
    loop Reset all org metrics
        Compactor->>Metrics: Set COMPACT_PENDING_JOBS = 0
    end
    
    loop Set new metrics
        Compactor->>Metrics: Set COMPACT_PENDING_JOBS<br/>(org, stream_type) = count
    end
Loading

4 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/compactor-metrics | Commit: 1859ffc

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 342 0 19 3 94% 4m 39s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/compactor-metrics | Commit: 1859ffc

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 344 0 19 1 95% 4m 39s

View Detailed Results

@hengfeiyang hengfeiyang merged commit cffdf90 into main Oct 21, 2025
32 checks passed
@hengfeiyang hengfeiyang deleted the fix/compactor-metrics branch October 21, 2025 04:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 2/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants