Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Aug 29, 2025

User description

The old logic will close write first then remove from WRITERS, but if close got problem, it never remove from WRITERS, then check ttl will get the writer and try to flush it, at the end, got error:

[INGESTER:MEM] writer queue rotate error: channel closed

PR Type

Bug fix, Enhancement


Description

  • Replace writer close with safe flush

  • Remove-before-flush to avoid stale WRITERS

  • Add informative logging around flush lifecycle

  • Maintain metrics on file count decrement


Diagram Walkthrough

flowchart LR
  checkTTL["check_ttl()"] -- "iter WRITERS" --> flushAll["flush_all()"]
  flushAll -- "remove key" --> writerFlush["Writer::flush()"]
  writerFlush -- "send Close; await closed" --> walSync["WAL sync"]
  flushAll -- "dec metric" --> metrics["INGEST_MEMTABLE_FILES dec"]
Loading

File Walkthrough

Relevant files
Bug fix
writer.rs
Safer writer flush and map removal ordering                           

src/ingester/src/writer.rs

  • Add start log for flushing all writers.
  • Change flush_all: remove writer entry, then flush and dec metric.
  • Rename Writer::close to Writer::flush and update behavior.
  • Log when writer queue fully closed before WAL sync.
+7/-6     

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Concurrency Order

Removing entries from WRITERS before a successful flush changes failure semantics; if flush() fails, the writer is already gone and cannot be retried by TTL, potentially dropping data or leaving WAL unsynced. Consider re-inserting on failure or flushing before removal.

log::info!("[INGESTER:MEM] start flush all writers");
for w in WRITERS.iter() {
    let mut w = w.write().await;
    let keys = w.keys().cloned().collect::<Vec<_>>();
    for key in keys {
        if let Some(r) = w.remove(&key) {
            r.flush().await?; // close writer
            metrics::INGEST_MEMTABLE_FILES.with_label_values(&[]).dec();
        }
Error Handling

Errors from flush() are propagated with '?', but metric decrement and removal occur before flush; failures may lead to metric inaccuracies and lost bookkeeping. Ensure metrics and state updates happen only after successful flush or compensate on error.

if let Some(r) = w.remove(&key) {
    r.flush().await?; // close writer
    metrics::INGEST_MEMTABLE_FILES.with_label_values(&[]).dec();
}
Logging Consistency

New logs are added for flush lifecycle, but error log still references "close writer error". Consider aligning terminology (flush vs close) and include key/path context to aid debugging.

pub async fn flush(&self) -> Result<()> {
    // wait for all messages to be processed
    if let Err(e) = self
        .write_queue
        .send((WriterSignal::Close, vec![], true))
        .await
    {
        log::error!("[INGESTER:MEM:{}] close writer error: {}", self.idx, e);
    }
    self.write_queue.closed().await;
    log::info!("[INGESTER:MEM:{}] writer queue closed", self.idx);

    // rotation wal
    let mut wal = self.wal.write().await;
    wal.sync().context(WalSnafu)?;

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR fixes a critical race condition bug in the ingester's writer flush logic. The core issue was in the flush_all() function where the original implementation would attempt to close writers first and then remove them from the WRITERS collection. If the close operation failed, writers would remain in the WRITERS collection indefinitely, causing subsequent TTL checks to find stale writers and attempt operations on closed channels, resulting in "channel closed" errors.

The fix reverses the order of operations - writers are now removed from the WRITERS collection first using w.remove(&key), and then the returned writer is flushed using r.flush().await?. This ensures that even if the flush operation fails, the writer is already cleaned up from the collection and won't cause future issues. Additionally, the method has been renamed from close() to flush() to better reflect its actual behavior, which involves WAL synchronization and memtable rotation rather than just resource cleanup.

The change also adds improved logging with log::info! statements to provide better observability into the flush process, helping with debugging and monitoring. This fix integrates well with the existing ingester architecture by maintaining the same external interface while improving the internal cleanup reliability.

Confidence score: 4/5

  • This PR addresses a well-defined bug with a logical solution that prevents resource leaks
  • Score reflects solid understanding of the race condition and appropriate fix, though the change touches critical ingester logic
  • Pay close attention to the flush operation error handling and ensure proper testing of failure scenarios

1 file reviewed, no comments

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Flush before removal to avoid loss

Avoid removing a writer from the map before ensuring flush() succeeded; otherwise a
failed flush loses the writer and drops pending data. Flush first, handle errors,
then remove and decrement the metric only on success. Log failures and keep the
writer for retry.

src/ingester/src/writer.rs [205-215]

 pub async fn flush_all() -> Result<()> {
     log::info!("[INGESTER:MEM] start flush all writers");
     for w in WRITERS.iter() {
         let mut w = w.write().await;
         let keys = w.keys().cloned().collect::<Vec<_>>();
         for key in keys {
-            if let Some(r) = w.remove(&key) {
-                r.flush().await?; // close writer
+            // Attempt flush without removing first
+            let flush_res = if let Some(r) = w.get(&key) {
+                // clone Arc or use reference if `flush` takes &self
+                r.flush().await
+            } else {
+                // Entry disappeared concurrently; skip
+                continue;
+            };
+
+            if let Err(e) = flush_res {
+                log::error!("[INGESTER:MEM] flush writer '{}' failed: {e}", key);
+                // keep writer for retry; do not remove or decrement metric
+                continue;
+            }
+
+            // Only remove and decrement on successful flush
+            if let Some(_r) = w.remove(&key) {
                 metrics::INGEST_MEMTABLE_FILES.with_label_values(&[]).dec();
             }
         }
     }
     Ok(())
 }
Suggestion importance[1-10]: 7

__

Why: Correctly identifies a potential data-loss scenario by removing before ensuring flush() succeeds and proposes a safe sequence; impact is moderate as it's an operational robustness improvement.

Medium
Log WAL sync failures explicitly

Syncing the WAL after closing the queue should propagate errors with context to
avoid silent data loss. Add an explicit error log on WAL sync failure before
bubbling up the error.

src/ingester/src/writer.rs [499-500]

 let mut wal = self.wal.write().await;
-wal.sync().context(WalSnafu)?;
+if let Err(e) = wal.sync().context(WalSnafu) {
+    log::error!("[INGESTER:MEM:{}] WAL sync failed during flush: {}", self.idx, e);
+    return Err(e);
+}
Suggestion importance[1-10]: 5

__

Why: Adds helpful error logging around wal.sync() with correct placement; useful for observability but not critical, and the existing ? already propagates errors.

Low
General
Fix misleading log messages

The error log still says "close writer" after renaming the method to flush, which
can mislead diagnostics. Update log messages to reflect the flush operation and
include context that the queue might already be closed.

src/ingester/src/writer.rs [488-497]

 if let Err(e) = self
     .write_queue
     .send((WriterSignal::Close, vec![], true))
     .await
 {
-    log::error!("[INGESTER:MEM:{}] close writer error: {}", self.idx, e);
+    log::error!(
+        "[INGESTER:MEM:{}] flush writer signal send error (maybe already closed): {}",
+        self.idx, e
+    );
 }
 self.write_queue.closed().await;
-log::info!("[INGESTER:MEM:{}] writer queue closed", self.idx);
+log::info!("[INGESTER:MEM:{}] writer queue closed after flush", self.idx);
Suggestion importance[1-10]: 6

__

Why: Accurate: the method was renamed to flush but the log still says "close writer"; updating messages improves diagnostics without affecting functionality.

Low

@hengfeiyang hengfeiyang merged commit 9fcfdad into main Sep 1, 2025
29 checks passed
@hengfeiyang hengfeiyang deleted the ingester/channel-close branch September 1, 2025 07:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants