@@ -16,18 +16,18 @@ pub struct MmapBitSliceBufferedUpdateWrapper {
1616 len : usize ,
1717 pending_updates : Arc < Mutex < AHashMap < usize , bool > > > ,
1818 /// Lock to prevent concurrent flush and drop
19- flushing_lock : Arc < Mutex < ( ) > > ,
19+ is_alive_flush_lock : Arc < Mutex < bool > > ,
2020}
2121
2222impl MmapBitSliceBufferedUpdateWrapper {
2323 pub fn new ( bitslice : MmapBitSlice ) -> Self {
2424 let len = bitslice. len ( ) ;
25- let flushing_lock = Arc :: new ( Mutex :: new ( ( ) ) ) ;
25+ let is_alive_flush_lock = Arc :: new ( Mutex :: new ( true ) ) ;
2626 Self {
2727 bitslice : Arc :: new ( RwLock :: new ( bitslice) ) ,
2828 len,
2929 pending_updates : Arc :: new ( Mutex :: new ( AHashMap :: new ( ) ) ) ,
30- flushing_lock ,
30+ is_alive_flush_lock ,
3131 }
3232 }
3333
@@ -72,23 +72,27 @@ impl MmapBitSliceBufferedUpdateWrapper {
7272
7373 pub fn flusher ( & self ) -> Flusher {
7474 let pending_updates = self . pending_updates . lock ( ) . clone ( ) ;
75- // Weak references to detect if the instance has been already dropped
7675 let bitslice = Arc :: downgrade ( & self . bitslice ) ;
7776 let pending_updates_arc = Arc :: downgrade ( & self . pending_updates ) ;
78- let flushing_lock = Arc :: downgrade ( & self . flushing_lock ) ;
77+ let is_alive_flush_lock = self . is_alive_flush_lock . clone ( ) ;
7978
8079 Box :: new ( move || {
81- let ( Some ( bitslice) , Some ( pending_updates_arc) , Some ( flushing_lock) ) = (
82- bitslice. upgrade ( ) ,
83- pending_updates_arc. upgrade ( ) ,
84- flushing_lock. upgrade ( ) ,
85- ) else {
86- log:: debug!( "Aborted flushing on a dropped instance" ) ;
80+ // Keep the guard till the end of the flush to prevent concurrent drop/flushes
81+ let is_alive_flush_guard = is_alive_flush_lock. lock ( ) ;
82+
83+ if !* is_alive_flush_guard {
84+ // Already dropped, skip flush
8785 return Ok ( ( ) ) ;
88- } ;
86+ }
8987
90- // Keep the guard till the end of the flush to prevent concurrent drop/flushes
91- let _flushing_lock_guard = flushing_lock. lock ( ) ;
88+ let ( Some ( bitslice) , Some ( pending_updates_arc) ) =
89+ ( bitslice. upgrade ( ) , pending_updates_arc. upgrade ( ) )
90+ else {
91+ log:: debug!(
92+ "Aborted flushing on a dropped MmapBitSliceBufferedUpdateWrapper instance"
93+ ) ;
94+ return Ok ( ( ) ) ;
95+ } ;
9296
9397 let mut mmap_slice_write = bitslice. write ( ) ;
9498 for ( index, value) in pending_updates. iter ( ) {
@@ -104,6 +108,6 @@ impl MmapBitSliceBufferedUpdateWrapper {
104108impl Drop for MmapBitSliceBufferedUpdateWrapper {
105109 fn drop ( & mut self ) {
106110 // Wait for all background flush operations to finish, and cancel future flushes
107- _ = self . flushing_lock . lock ( ) ;
111+ * self . is_alive_flush_lock . lock ( ) = false ;
108112 }
109113}
0 commit comments