@@ -16,18 +16,18 @@ pub struct MmapBitSliceBufferedUpdateWrapper {
1616 len : usize ,
1717 pending_updates : Arc < Mutex < AHashMap < usize , bool > > > ,
1818 /// Lock to prevent concurrent flush and drop
19- is_alive_flush_lock : Arc < Mutex < bool > > ,
19+ flushing_lock : Arc < Mutex < ( ) > > ,
2020}
2121
2222impl MmapBitSliceBufferedUpdateWrapper {
2323 pub fn new ( bitslice : MmapBitSlice ) -> Self {
2424 let len = bitslice. len ( ) ;
25- let is_alive_flush_lock = Arc :: new ( Mutex :: new ( true ) ) ;
25+ let flushing_lock = Arc :: new ( Mutex :: new ( ( ) ) ) ;
2626 Self {
2727 bitslice : Arc :: new ( RwLock :: new ( bitslice) ) ,
2828 len,
2929 pending_updates : Arc :: new ( Mutex :: new ( AHashMap :: new ( ) ) ) ,
30- is_alive_flush_lock ,
30+ flushing_lock ,
3131 }
3232 }
3333
@@ -72,28 +72,24 @@ impl MmapBitSliceBufferedUpdateWrapper {
7272
7373 pub fn flusher ( & self ) -> Flusher {
7474 let pending_updates = self . pending_updates . lock ( ) . clone ( ) ;
75+ // Weak references to detect if the instance has been already dropped
7576 let bitslice = Arc :: downgrade ( & self . bitslice ) ;
7677 let pending_updates_arc = Arc :: downgrade ( & self . pending_updates ) ;
77- let is_alive_flush_lock = self . is_alive_flush_lock . clone ( ) ;
78+ let flushing_lock = Arc :: downgrade ( & self . flushing_lock ) ;
7879
7980 Box :: new ( move || {
80- // Keep the guard till the end of the flush to prevent concurrent drop/flushes
81- let is_alive_flush_guard = is_alive_flush_lock. lock ( ) ;
82-
83- if !* is_alive_flush_guard {
84- // Already dropped, skip flush
85- return Ok ( ( ) ) ;
86- }
87-
88- let ( Some ( bitslice) , Some ( pending_updates_arc) ) =
89- ( bitslice. upgrade ( ) , pending_updates_arc. upgrade ( ) )
90- else {
91- log:: debug!(
92- "Aborted flushing on a dropped MmapBitSliceBufferedUpdateWrapper instance"
93- ) ;
81+ let ( Some ( bitslice) , Some ( pending_updates_arc) , Some ( flushing_lock) ) = (
82+ bitslice. upgrade ( ) ,
83+ pending_updates_arc. upgrade ( ) ,
84+ flushing_lock. upgrade ( ) ,
85+ ) else {
86+ log:: debug!( "Aborted flushing on a dropped instance" ) ;
9487 return Ok ( ( ) ) ;
9588 } ;
9689
90+ // Keep the guard till the end of the flush to prevent concurrent drop/flushes
91+ let _flushing_lock_guard = flushing_lock. lock ( ) ;
92+
9793 let mut mmap_slice_write = bitslice. write ( ) ;
9894 for ( index, value) in pending_updates. iter ( ) {
9995 mmap_slice_write. set ( * index, * value) ;
@@ -108,6 +104,6 @@ impl MmapBitSliceBufferedUpdateWrapper {
108104impl Drop for MmapBitSliceBufferedUpdateWrapper {
109105 fn drop ( & mut self ) {
110106 // Wait for all background flush operations to finish, and cancel future flushes
111- * self . is_alive_flush_lock . lock ( ) = false ;
107+ _ = self . flushing_lock . lock ( ) ;
112108 }
113109}
0 commit comments