Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

This PR fixed the issue metrics data maybe duplicated when compact.

The problem is the data retention of metrics default is daily, but compactor generate merge job by hour. Also we support run multiple jobs in parallels. then same metrics maybe have multiple jobs were running with same data.

The solution is we add a local lock only allow one job running for one stream which data retention is daily.

@github-actions github-actions bot added the ☢️ Bug Something isn't working label Oct 27, 2025
@github-actions
Copy link
Contributor

Failed to generate code suggestions for PR

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR addresses metrics data duplication during compaction by implementing a local lock mechanism to ensure only one compaction job runs per stream with daily retention. Additional changes include cleanup of unused file_list module, improved logging in PromQL, and optimization for trigger stream queries.

Key Changes:

  • Introduced stream-level locking using DashSet to prevent parallel compaction jobs for daily-partitioned streams
  • Added lock acquisition in run_merge() before dispatching jobs to workers
  • Added lock release in worker after job completion
  • Removed unused file_list.rs module and related cache status endpoint
  • Added conditional check to avoid querying non-existent trigger streams in organization summary

Critical Issues Found:

  • Race condition in lock acquisition: is_running() and set_running() are separate non-atomic operations that can allow multiple jobs through
  • Potential deadlock: Lock cleanup won't execute if worker task panics or is cancelled during shutdown
  • Debug println! statement left in organization.rs production code

Confidence Score: 1/5

  • This PR has critical concurrency bugs that could cause the exact data duplication it aims to prevent, plus potential deadlocks
  • The locking mechanism has a classic check-then-act race condition that defeats its purpose - two threads can both check is_running() simultaneously and both proceed. Additionally, the lock cleanup only happens on normal execution path, not on panics or cancellation, which can cause permanent deadlocks. These are fundamental concurrency bugs that make the fix unreliable.
  • Critical attention needed: src/service/compact/mod.rs (race condition), src/service/compact/worker.rs (missing cleanup on panic), src/service/db/compact/stream.rs (needs atomic check-and-set API)

Important Files Changed

File Analysis

Filename Score Overview
src/service/compact/mod.rs 1/5 Added stream locking to prevent parallel jobs, but has critical race condition in check-then-set logic
src/service/compact/worker.rs 1/5 Added lock cleanup after merge completion, but cleanup won't execute if task panics or is cancelled
src/service/db/compact/stream.rs 3/5 New file implementing stream locking with DashSet, thread-safe but missing atomic check-and-set operation
src/service/organization.rs 3/5 Added check for trigger stream existence before querying, but contains debug println statement

Sequence Diagram

sequenceDiagram
    participant Compactor as Compactor Thread
    participant JobQueue as Job Queue
    participant StreamLock as Stream Lock (DashSet)
    participant Worker as Worker Thread
    participant Merger as merge_by_stream

    Compactor->>JobQueue: get_pending_jobs()
    JobQueue-->>Compactor: jobs list
    
    loop For each job
        Compactor->>Compactor: Check partition_time_level == Daily
        alt Daily partition
            Compactor->>Compactor: Check node assignment
            Compactor->>StreamLock: is_running(stream)?
            StreamLock-->>Compactor: false
            Note over Compactor,StreamLock: RACE CONDITION: Another thread<br/>could check here too!
            Compactor->>StreamLock: set_running(stream)
            Compactor->>Worker: send(MergeJob)
        else Not daily
            Compactor->>Worker: send(MergeJob) directly
        end
    end
    
    Worker->>Worker: Receive MergeJob
    Worker->>Merger: merge_by_stream()
    
    alt Success or Error
        Merger-->>Worker: Result
        Worker->>StreamLock: clear_running(stream)
    else Panic or Task Cancelled
        Note over Worker,StreamLock: DEADLOCK: clear_running()<br/>never called, stream locked forever
    end
Loading

Additional Comments (1)

  1. src/service/compact/worker.rs, line 114-139 (link)

    logic: Lock not released if task panics or is cancelled. If merge_by_stream() panics or the task is aborted (e.g., on shutdown at line 81), clear_running() at line 139 won't execute, leaving the stream permanently locked.

    Use a RAII guard pattern or wrap in a defer-like mechanism to ensure cleanup always happens.

9 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/compactor-metrics | Commit: c6f9d46

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 366 346 0 19 1 95% 4m 39s

View Detailed Results

@hengfeiyang hengfeiyang merged commit d360151 into main Oct 29, 2025
31 of 32 checks passed
@hengfeiyang hengfeiyang deleted the fix/compactor-metrics branch October 29, 2025 05:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants