Skip to content

Commit 7ebd324

Browse files
committed
simplify locking
1 parent dde7304 commit 7ebd324

3 files changed

Lines changed: 29 additions & 37 deletions

File tree

lib/gridstore/src/gridstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ impl<V> Gridstore<V> {
573573
/// Create flusher that durably persists all pending changes when invoked
574574
pub fn flusher(&self) -> Flusher {
575575
let pending_updates = self.tracker.read().pending_updates.clone();
576-
576+
// Weak references to detect if the instance has been already dropped
577577
let pages = Arc::downgrade(&self.pages);
578578
let tracker = Arc::downgrade(&self.tracker);
579579
let bitmask = Arc::downgrade(&self.bitmask);

lib/segment/src/common/flags/buffered_dynamic_flags.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ pub(crate) struct BufferedDynamicFlags {
2121
buffer: Arc<RwLock<AHashMap<PointOffsetType, bool>>>,
2222

2323
/// Lock to prevent concurrent flush and drop
24-
is_alive_flush_lock: Arc<Mutex<bool>>,
24+
flushing_lock: Arc<Mutex<()>>,
2525
}
2626

2727
impl BufferedDynamicFlags {
2828
pub fn new(mmap_flags: DynamicMmapFlags) -> Self {
2929
let buffer = Arc::new(RwLock::new(AHashMap::new()));
30-
let is_alive_flush_lock = Arc::new(Mutex::new(true));
30+
let flushing_lock = Arc::new(Mutex::new(()));
3131
Self {
3232
storage: Arc::new(Mutex::new(mmap_flags)),
3333
buffer,
34-
is_alive_flush_lock,
34+
flushing_lock,
3535
}
3636
}
3737

@@ -60,24 +60,21 @@ impl BufferedDynamicFlags {
6060
(updates, required_len)
6161
};
6262

63-
// Weak reference to detect when the storage has been deleted
63+
// Weak references to detect if the instance has been already dropped
6464
let flags_arc = Arc::downgrade(&self.storage);
65-
let is_alive_flush_lock = self.is_alive_flush_lock.clone();
65+
let flushing_lock = Arc::downgrade(&self.flushing_lock);
6666

6767
Box::new(move || {
68-
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
69-
let is_alive_flush_guard = is_alive_flush_lock.lock();
70-
71-
if !*is_alive_flush_guard {
72-
// Storage is removed, skip flush
73-
return Ok(());
74-
}
75-
76-
let Some(flags_arc) = flags_arc.upgrade() else {
77-
log::debug!("skipping flushing on deleted storage");
68+
let (Some(flags_arc), Some(flushing_lock)) =
69+
(flags_arc.upgrade(), flushing_lock.upgrade())
70+
else {
71+
log::debug!("Aborted flushing on a dropped instance");
7872
return Ok(());
7973
};
8074

75+
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
76+
let _flushing_lock_guard = flushing_lock.lock();
77+
8178
// lock for the entire flushing process
8279
let mut flags_guard = flags_arc.lock();
8380

@@ -91,7 +88,6 @@ impl BufferedDynamicFlags {
9188
}
9289

9390
flags_guard.flusher()()?;
94-
9591
Ok(())
9692
})
9793
}
@@ -100,7 +96,7 @@ impl BufferedDynamicFlags {
10096
impl Drop for BufferedDynamicFlags {
10197
fn drop(&mut self) {
10298
// Wait for all background flush operations to finish, and cancel future flushes
103-
*self.is_alive_flush_lock.lock() = false;
99+
_ = self.flushing_lock.lock();
104100
}
105101
}
106102

lib/segment/src/common/mmap_bitslice_buffered_update_wrapper.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@ pub struct MmapBitSliceBufferedUpdateWrapper {
1616
len: usize,
1717
pending_updates: Arc<Mutex<AHashMap<usize, bool>>>,
1818
/// Lock to prevent concurrent flush and drop
19-
is_alive_flush_lock: Arc<Mutex<bool>>,
19+
flushing_lock: Arc<Mutex<()>>,
2020
}
2121

2222
impl MmapBitSliceBufferedUpdateWrapper {
2323
pub fn new(bitslice: MmapBitSlice) -> Self {
2424
let len = bitslice.len();
25-
let is_alive_flush_lock = Arc::new(Mutex::new(true));
25+
let flushing_lock = Arc::new(Mutex::new(()));
2626
Self {
2727
bitslice: Arc::new(RwLock::new(bitslice)),
2828
len,
2929
pending_updates: Arc::new(Mutex::new(AHashMap::new())),
30-
is_alive_flush_lock,
30+
flushing_lock,
3131
}
3232
}
3333

@@ -72,28 +72,24 @@ impl MmapBitSliceBufferedUpdateWrapper {
7272

7373
pub fn flusher(&self) -> Flusher {
7474
let pending_updates = self.pending_updates.lock().clone();
75+
// Weak references to detect if the instance has been already dropped
7576
let bitslice = Arc::downgrade(&self.bitslice);
7677
let pending_updates_arc = Arc::downgrade(&self.pending_updates);
77-
let is_alive_flush_lock = self.is_alive_flush_lock.clone();
78+
let flushing_lock = Arc::downgrade(&self.flushing_lock);
7879

7980
Box::new(move || {
80-
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
81-
let is_alive_flush_guard = is_alive_flush_lock.lock();
82-
83-
if !*is_alive_flush_guard {
84-
// Already dropped, skip flush
85-
return Ok(());
86-
}
87-
88-
let (Some(bitslice), Some(pending_updates_arc)) =
89-
(bitslice.upgrade(), pending_updates_arc.upgrade())
90-
else {
91-
log::debug!(
92-
"Aborted flushing on a dropped MmapBitSliceBufferedUpdateWrapper instance"
93-
);
81+
let (Some(bitslice), Some(pending_updates_arc), Some(flushing_lock)) = (
82+
bitslice.upgrade(),
83+
pending_updates_arc.upgrade(),
84+
flushing_lock.upgrade(),
85+
) else {
86+
log::debug!("Aborted flushing on a dropped instance");
9487
return Ok(());
9588
};
9689

90+
// Keep the guard till the end of the flush to prevent concurrent drop/flushes
91+
let _flushing_lock_guard = flushing_lock.lock();
92+
9793
let mut mmap_slice_write = bitslice.write();
9894
for (index, value) in pending_updates.iter() {
9995
mmap_slice_write.set(*index, *value);
@@ -108,6 +104,6 @@ impl MmapBitSliceBufferedUpdateWrapper {
108104
impl Drop for MmapBitSliceBufferedUpdateWrapper {
109105
fn drop(&mut self) {
110106
// Wait for all background flush operations to finish, and cancel future flushes
111-
*self.is_alive_flush_lock.lock() = false;
107+
_ = self.flushing_lock.lock();
112108
}
113109
}

0 commit comments

Comments
 (0)