Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions lib/segment/src/common/flags/buffered_dynamic_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ pub(crate) struct BufferedDynamicFlags {

/// Pending changes to the storage flags.
buffer: Arc<RwLock<AHashMap<PointOffsetType, bool>>>,

/// Lock to prevent concurrent flush and drop
pub is_alive_flush_lock: Arc<Mutex<bool>>,
Comment on lines +23 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some consideration, I am not sure this will actually prevent trying to interact with an inexistent file/directory. In PayloadIndex::drop_index we actually remove the directory BEFORE dropping the index.

This lock only makes the dropping wait for the end of the flush, but the files may already be deleted externally.

Please correct me if I missed something 🤔

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. That is a problem!

We must only delete files once we unload the payload index structure.

I am quite sure it's important to only delete after also releasing all files handles. Because Windows doesn't allow us to delete files which we still have open. Though then I'm also a bit puzzled on why we haven't seen issues about this. Maybe it needs a specific setup, like explicitly deleting a payload index which may not be very common.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought:

image

I do remember quite a few reports where Windows users complained about 'permission denied' errors. It may very well be caused by this.

Copy link
Copy Markdown
Member Author

@agourlay agourlay Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I did push a fix to order the drop properly 34d031d

This lock only makes the dropping wait for the end of the flush, but the files may already be deleted externally.

My guess it that the original patch worked for me locally because the flush would happen after the drop anyway.

There is not a lot of time to squeeze the flusher between the FS remove and the implicit drop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI started an audit here #7626

}

impl BufferedDynamicFlags {
pub fn new(mmap_flags: DynamicMmapFlags) -> Self {
let buffer = Arc::new(RwLock::new(AHashMap::new()));

let is_alive_flush_lock = Arc::new(Mutex::new(true));
Self {
storage: Arc::new(Mutex::new(mmap_flags)),
buffer,
is_alive_flush_lock,
}
}

Expand Down Expand Up @@ -56,8 +60,24 @@ impl BufferedDynamicFlags {
(updates, required_len)
};

let flags_arc = self.storage.clone();
// Weak reference to detect when the storage has been deleted
let flags_arc = Arc::downgrade(&self.storage);
Comment on lines +63 to +64
Copy link
Copy Markdown
Contributor

@coszio coszio Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did similarly for #7624, this is an improvement so that we don't start flushing a dropped component 👍

let is_alive_flush_lock = self.is_alive_flush_lock.clone();

Box::new(move || {
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
let is_alive_flush_guard = is_alive_flush_lock.lock();

if !*is_alive_flush_guard {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to avoid this check by also using a weak reference to the lock. Then it would also simplify the mutex to just hold a unit struct ()

// Storage is removed, skip flush
return Ok(());
}

let Some(flags_arc) = flags_arc.upgrade() else {
log::debug!("skipping flushing on deleted storage");
return Ok(());
};

// lock for the entire flushing process
let mut flags_guard = flags_arc.lock();

Expand All @@ -77,6 +97,13 @@ impl BufferedDynamicFlags {
}
}

impl Drop for BufferedDynamicFlags {
fn drop(&mut self) {
// Wait for all background flush operations to finish
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Wait for all background flush operations to finish
// Wait for all background flush operations to finish, and cancel future flushes

*self.is_alive_flush_lock.lock() = false;
}
}

#[cfg(test)]
mod tests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ impl PayloadFieldIndex for MutableNullIndex {
}

fn cleanup(self) -> OperationResult<()> {
if self.base_dir.is_dir() {
fs::remove_dir_all(&self.base_dir)?;
let base_dir = self.base_dir.clone();
// drop mmap handles before deleting files
drop(self);
if base_dir.is_dir() {
fs::remove_dir_all(&base_dir)?;
}
Ok(())
}
Expand Down