-
Notifications
You must be signed in to change notification settings - Fork 715
fix: metrics data duplicate when compact #8888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Failed to generate code suggestions for PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR addresses metrics data duplication during compaction by implementing a local lock mechanism to ensure only one compaction job runs per stream with daily retention. Additional changes include cleanup of unused file_list module, improved logging in PromQL, and optimization for trigger stream queries.
Key Changes:
- Introduced stream-level locking using
DashSetto prevent parallel compaction jobs for daily-partitioned streams - Added lock acquisition in
run_merge()before dispatching jobs to workers - Added lock release in worker after job completion
- Removed unused
file_list.rsmodule and related cache status endpoint - Added conditional check to avoid querying non-existent trigger streams in organization summary
Critical Issues Found:
- Race condition in lock acquisition:
is_running()andset_running()are separate non-atomic operations that can allow multiple jobs through - Potential deadlock: Lock cleanup won't execute if worker task panics or is cancelled during shutdown
- Debug
println!statement left inorganization.rsproduction code
Confidence Score: 1/5
- This PR has critical concurrency bugs that could cause the exact data duplication it aims to prevent, plus potential deadlocks
- The locking mechanism has a classic check-then-act race condition that defeats its purpose - two threads can both check
is_running()simultaneously and both proceed. Additionally, the lock cleanup only happens on normal execution path, not on panics or cancellation, which can cause permanent deadlocks. These are fundamental concurrency bugs that make the fix unreliable. - Critical attention needed:
src/service/compact/mod.rs(race condition),src/service/compact/worker.rs(missing cleanup on panic),src/service/db/compact/stream.rs(needs atomic check-and-set API)
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/service/compact/mod.rs | 1/5 | Added stream locking to prevent parallel jobs, but has critical race condition in check-then-set logic |
| src/service/compact/worker.rs | 1/5 | Added lock cleanup after merge completion, but cleanup won't execute if task panics or is cancelled |
| src/service/db/compact/stream.rs | 3/5 | New file implementing stream locking with DashSet, thread-safe but missing atomic check-and-set operation |
| src/service/organization.rs | 3/5 | Added check for trigger stream existence before querying, but contains debug println statement |
Sequence Diagram
sequenceDiagram
participant Compactor as Compactor Thread
participant JobQueue as Job Queue
participant StreamLock as Stream Lock (DashSet)
participant Worker as Worker Thread
participant Merger as merge_by_stream
Compactor->>JobQueue: get_pending_jobs()
JobQueue-->>Compactor: jobs list
loop For each job
Compactor->>Compactor: Check partition_time_level == Daily
alt Daily partition
Compactor->>Compactor: Check node assignment
Compactor->>StreamLock: is_running(stream)?
StreamLock-->>Compactor: false
Note over Compactor,StreamLock: RACE CONDITION: Another thread<br/>could check here too!
Compactor->>StreamLock: set_running(stream)
Compactor->>Worker: send(MergeJob)
else Not daily
Compactor->>Worker: send(MergeJob) directly
end
end
Worker->>Worker: Receive MergeJob
Worker->>Merger: merge_by_stream()
alt Success or Error
Merger-->>Worker: Result
Worker->>StreamLock: clear_running(stream)
else Panic or Task Cancelled
Note over Worker,StreamLock: DEADLOCK: clear_running()<br/>never called, stream locked forever
end
Additional Comments (1)
-
src/service/compact/worker.rs, line 114-139 (link)logic: Lock not released if task panics or is cancelled. If
merge_by_stream()panics or the task is aborted (e.g., on shutdown at line 81),clear_running()at line 139 won't execute, leaving the stream permanently locked.Use a RAII guard pattern or wrap in a defer-like mechanism to ensure cleanup always happens.
9 files reviewed, 3 comments
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 366 | 346 | 0 | 19 | 1 | 95% | 4m 39s |
This PR fixed the issue metrics data maybe duplicated when compact.
The problem is the data retention of metrics default is daily, but compactor generate merge job by hour. Also we support run multiple jobs in parallels. then same metrics maybe have multiple jobs were running with same data.
The solution is we add a local lock only allow one job running for one stream which data retention is daily.