Skip to content

Commit b1003fb

Browse files
committed
Revert "simplify locking"
This reverts commit 7ebd324.
1 parent 7ebd324 commit b1003fb

3 files changed

Lines changed: 37 additions & 29 deletions

File tree

lib/gridstore/src/gridstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ impl<V> Gridstore<V> {
573573
/// Create flusher that durably persists all pending changes when invoked
574574
pub fn flusher(&self) -> Flusher {
575575
let pending_updates = self.tracker.read().pending_updates.clone();
576-
// Weak references to detect if the instance has been already dropped
576+
577577
let pages = Arc::downgrade(&self.pages);
578578
let tracker = Arc::downgrade(&self.tracker);
579579
let bitmask = Arc::downgrade(&self.bitmask);

lib/segment/src/common/flags/buffered_dynamic_flags.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ pub(crate) struct BufferedDynamicFlags {
2121
buffer: Arc<RwLock<AHashMap<PointOffsetType, bool>>>,
2222

2323
/// Lock to prevent concurrent flush and drop
24-
flushing_lock: Arc<Mutex<()>>,
24+
is_alive_flush_lock: Arc<Mutex<bool>>,
2525
}
2626

2727
impl BufferedDynamicFlags {
2828
pub fn new(mmap_flags: DynamicMmapFlags) -> Self {
2929
let buffer = Arc::new(RwLock::new(AHashMap::new()));
30-
let flushing_lock = Arc::new(Mutex::new(()));
30+
let is_alive_flush_lock = Arc::new(Mutex::new(true));
3131
Self {
3232
storage: Arc::new(Mutex::new(mmap_flags)),
3333
buffer,
34-
flushing_lock,
34+
is_alive_flush_lock,
3535
}
3636
}
3737

@@ -60,20 +60,23 @@ impl BufferedDynamicFlags {
6060
(updates, required_len)
6161
};
6262

63-
// Weak references to detect if the instance has been already dropped
63+
// Weak reference to detect when the storage has been deleted
6464
let flags_arc = Arc::downgrade(&self.storage);
65-
let flushing_lock = Arc::downgrade(&self.flushing_lock);
65+
let is_alive_flush_lock = self.is_alive_flush_lock.clone();
6666

6767
Box::new(move || {
68-
let (Some(flags_arc), Some(flushing_lock)) =
69-
(flags_arc.upgrade(), flushing_lock.upgrade())
70-
else {
71-
log::debug!("Aborted flushing on a dropped instance");
68+
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
69+
let is_alive_flush_guard = is_alive_flush_lock.lock();
70+
71+
if !*is_alive_flush_guard {
72+
// Storage is removed, skip flush
7273
return Ok(());
73-
};
74+
}
7475

75-
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
76-
let _flushing_lock_guard = flushing_lock.lock();
76+
let Some(flags_arc) = flags_arc.upgrade() else {
77+
log::debug!("skipping flushing on deleted storage");
78+
return Ok(());
79+
};
7780

7881
// lock for the entire flushing process
7982
let mut flags_guard = flags_arc.lock();
@@ -88,6 +91,7 @@ impl BufferedDynamicFlags {
8891
}
8992

9093
flags_guard.flusher()()?;
94+
9195
Ok(())
9296
})
9397
}
@@ -96,7 +100,7 @@ impl BufferedDynamicFlags {
96100
impl Drop for BufferedDynamicFlags {
97101
fn drop(&mut self) {
98102
// Wait for all background flush operations to finish, and cancel future flushes
99-
_ = self.flushing_lock.lock();
103+
*self.is_alive_flush_lock.lock() = false;
100104
}
101105
}
102106

lib/segment/src/common/mmap_bitslice_buffered_update_wrapper.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@ pub struct MmapBitSliceBufferedUpdateWrapper {
1616
len: usize,
1717
pending_updates: Arc<Mutex<AHashMap<usize, bool>>>,
1818
/// Lock to prevent concurrent flush and drop
19-
flushing_lock: Arc<Mutex<()>>,
19+
is_alive_flush_lock: Arc<Mutex<bool>>,
2020
}
2121

2222
impl MmapBitSliceBufferedUpdateWrapper {
2323
pub fn new(bitslice: MmapBitSlice) -> Self {
2424
let len = bitslice.len();
25-
let flushing_lock = Arc::new(Mutex::new(()));
25+
let is_alive_flush_lock = Arc::new(Mutex::new(true));
2626
Self {
2727
bitslice: Arc::new(RwLock::new(bitslice)),
2828
len,
2929
pending_updates: Arc::new(Mutex::new(AHashMap::new())),
30-
flushing_lock,
30+
is_alive_flush_lock,
3131
}
3232
}
3333

@@ -72,23 +72,27 @@ impl MmapBitSliceBufferedUpdateWrapper {
7272

7373
pub fn flusher(&self) -> Flusher {
7474
let pending_updates = self.pending_updates.lock().clone();
75-
// Weak references to detect if the instance has been already dropped
7675
let bitslice = Arc::downgrade(&self.bitslice);
7776
let pending_updates_arc = Arc::downgrade(&self.pending_updates);
78-
let flushing_lock = Arc::downgrade(&self.flushing_lock);
77+
let is_alive_flush_lock = self.is_alive_flush_lock.clone();
7978

8079
Box::new(move || {
81-
let (Some(bitslice), Some(pending_updates_arc), Some(flushing_lock)) = (
82-
bitslice.upgrade(),
83-
pending_updates_arc.upgrade(),
84-
flushing_lock.upgrade(),
85-
) else {
86-
log::debug!("Aborted flushing on a dropped instance");
80+
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
81+
let is_alive_flush_guard = is_alive_flush_lock.lock();
82+
83+
if !*is_alive_flush_guard {
84+
// Already dropped, skip flush
8785
return Ok(());
88-
};
86+
}
8987

90-
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
91-
let _flushing_lock_guard = flushing_lock.lock();
88+
let (Some(bitslice), Some(pending_updates_arc)) =
89+
(bitslice.upgrade(), pending_updates_arc.upgrade())
90+
else {
91+
log::debug!(
92+
"Aborted flushing on a dropped MmapBitSliceBufferedUpdateWrapper instance"
93+
);
94+
return Ok(());
95+
};
9296

9397
let mut mmap_slice_write = bitslice.write();
9498
for (index, value) in pending_updates.iter() {
@@ -104,6 +108,6 @@ impl MmapBitSliceBufferedUpdateWrapper {
104108
impl Drop for MmapBitSliceBufferedUpdateWrapper {
105109
fn drop(&mut self) {
106110
// Wait for all background flush operations to finish, and cancel future flushes
107-
_ = self.flushing_lock.lock();
111+
*self.is_alive_flush_lock.lock() = false;
108112
}
109113
}

0 commit comments

Comments
 (0)