@@ -133,6 +133,7 @@ impl RecoverableWal {
133133 let mut operations = other
134134 . wal
135135 . lock ( )
136+ . await
136137 . read ( append_from)
137138 . map ( |( _, op) | op)
138139 . collect :: < Vec < _ > > ( ) ;
@@ -261,7 +262,6 @@ mod tests {
261262 use std:: ops:: Range ;
262263 use std:: sync:: Arc ;
263264
264- use parking_lot:: Mutex as ParkingMutex ;
265265 use rand:: prelude:: SliceRandom ;
266266 use rand:: rngs:: StdRng ;
267267 use rand:: seq:: IndexedRandom ;
@@ -289,7 +289,7 @@ mod tests {
289289 let wal = SerdeWal :: new ( dir. path ( ) . to_str ( ) . unwrap ( ) , options) . unwrap ( ) ;
290290 (
291291 RecoverableWal :: new (
292- Arc :: new ( ParkingMutex :: new ( wal) ) ,
292+ Arc :: new ( Mutex :: new ( wal) ) ,
293293 Arc :: new ( Mutex :: new ( ClockMap :: default ( ) ) ) ,
294294 Arc :: new ( Mutex :: new ( ClockMap :: default ( ) ) ) ,
295295 ) ,
@@ -375,7 +375,7 @@ mod tests {
375375 assert_eq ! ( delta_from, 1 ) ;
376376
377377 // Diff should have 1 operation, as C missed just one
378- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , 1 ) ;
378+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , 1 ) ;
379379
380380 // Recover WAL on node C by writing delta from node B to it
381381 c_wal. append_from ( & b_wal, delta_from) . await . unwrap ( ) ;
@@ -384,9 +384,10 @@ mod tests {
384384 a_wal
385385 . wal
386386 . lock ( )
387+ . await
387388 . read ( 0 )
388- . zip ( b_wal. wal . lock ( ) . read ( 0 ) )
389- . zip ( c_wal. wal . lock ( ) . read ( 0 ) )
389+ . zip ( b_wal. wal . lock ( ) . await . read ( 0 ) )
390+ . zip ( c_wal. wal . lock ( ) . await . read ( 0 ) )
390391 . for_each ( |( ( a, b) , c) | {
391392 assert_eq ! ( a, b) ;
392393 assert_eq ! ( b, c) ;
@@ -483,7 +484,7 @@ mod tests {
483484 assert_eq ! ( delta_from, N as u64 ) ;
484485
485486 // Diff should have N operation, as C missed just N of them
486- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , N ) ;
487+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , N ) ;
487488
488489 // Recover WAL on node C by writing delta from node B to it
489490 c_wal. append_from ( & b_wal, delta_from) . await . unwrap ( ) ;
@@ -492,9 +493,10 @@ mod tests {
492493 a_wal
493494 . wal
494495 . lock ( )
496+ . await
495497 . read ( 0 )
496- . zip ( b_wal. wal . lock ( ) . read ( 0 ) )
497- . zip ( c_wal. wal . lock ( ) . read ( 0 ) )
498+ . zip ( b_wal. wal . lock ( ) . await . read ( 0 ) )
499+ . zip ( c_wal. wal . lock ( ) . await . read ( 0 ) )
498500 . for_each ( |( ( a, b) , c) | {
499501 assert_eq ! ( a, b) ;
500502 assert_eq ! ( b, c) ;
@@ -578,7 +580,7 @@ mod tests {
578580 assert_eq ! ( delta_from, N as u64 ) ;
579581
580582 // Diff should have M operations, as node C missed M operations
581- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , M ) ;
583+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , M ) ;
582584
583585 // Recover WAL on node C by writing delta from node B to it
584586 c_wal. append_from ( & b_wal, delta_from) . await . unwrap ( ) ;
@@ -587,9 +589,10 @@ mod tests {
587589 a_wal
588590 . wal
589591 . lock ( )
592+ . await
590593 . read ( 0 )
591- . zip ( b_wal. wal . lock ( ) . read ( 0 ) )
592- . zip ( c_wal. wal . lock ( ) . read ( 0 ) )
594+ . zip ( b_wal. wal . lock ( ) . await . read ( 0 ) )
595+ . zip ( c_wal. wal . lock ( ) . await . read ( 0 ) )
593596 . for_each ( |( ( a, b) , c) | {
594597 assert_eq ! ( a, b) ;
595598 assert_eq ! ( b, c) ;
@@ -683,7 +686,7 @@ mod tests {
683686 assert_eq ! ( delta_from, N as u64 ) ;
684687
685688 // Diff should have M operations, as node C missed M operations
686- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , M ) ;
689+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , M ) ;
687690
688691 // Recover WAL on node C by writing delta from node B to it
689692 c_wal. append_from ( & b_wal, delta_from) . await . unwrap ( ) ;
@@ -692,9 +695,10 @@ mod tests {
692695 a_wal
693696 . wal
694697 . lock ( )
698+ . await
695699 . read ( 0 )
696- . zip ( b_wal. wal . lock ( ) . read ( 0 ) )
697- . zip ( c_wal. wal . lock ( ) . read ( 0 ) )
700+ . zip ( b_wal. wal . lock ( ) . await . read ( 0 ) )
701+ . zip ( c_wal. wal . lock ( ) . await . read ( 0 ) )
698702 . for_each ( |( ( a, b) , c) | {
699703 assert_eq ! ( a, b) ;
700704 assert_eq ! ( b, c) ;
@@ -787,8 +791,8 @@ mod tests {
787791 assert_eq ! ( delta_from, 1 ) ;
788792
789793 // Diff should have 2 operations on both nodes
790- assert_eq ! ( a_wal. wal. lock( ) . read( delta_from) . count( ) , 2 ) ;
791- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , 2 ) ;
794+ assert_eq ! ( a_wal. wal. lock( ) . await . read( delta_from) . count( ) , 2 ) ;
795+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , 2 ) ;
792796
793797 // Recover WAL on node C by writing delta from node B to it
794798 c_wal. append_from ( & b_wal, delta_from) . await . unwrap ( ) ;
@@ -798,23 +802,25 @@ mod tests {
798802 !a_wal
799803 . wal
800804 . lock( )
805+ . await
801806 . read( 0 )
802- . zip( c_wal. wal. lock( ) . read( 0 ) )
807+ . zip( c_wal. wal. lock( ) . await . read( 0 ) )
803808 . all( |( a, c) | a == c) ,
804809 ) ;
805810 assert ! (
806811 b_wal
807812 . wal
808813 . lock( )
814+ . await
809815 . read( 0 )
810- . zip( c_wal. wal. lock( ) . read( 0 ) )
816+ . zip( c_wal. wal. lock( ) . await . read( 0 ) )
811817 . all( |( b, c) | b == c) ,
812818 ) ;
813819
814820 // All WALs should have 3 operations
815- assert_eq ! ( a_wal. wal. lock( ) . read( 0 ) . count( ) , 3 ) ;
816- assert_eq ! ( b_wal. wal. lock( ) . read( 0 ) . count( ) , 3 ) ;
817- assert_eq ! ( c_wal. wal. lock( ) . read( 0 ) . count( ) , 3 ) ;
821+ assert_eq ! ( a_wal. wal. lock( ) . await . read( 0 ) . count( ) , 3 ) ;
822+ assert_eq ! ( b_wal. wal. lock( ) . await . read( 0 ) . count( ) , 3 ) ;
823+ assert_eq ! ( c_wal. wal. lock( ) . await . read( 0 ) . count( ) , 3 ) ;
818824
819825 // All WALs must have operations for point 1, 2 and 3
820826 let get_point = |op| match op {
@@ -830,18 +836,21 @@ mod tests {
830836 let a_wal_point_ids = a_wal
831837 . wal
832838 . lock ( )
839+ . await
833840 . read ( 0 )
834841 . map ( |( _, op) | get_point ( op) . id )
835842 . collect :: < HashSet < _ > > ( ) ;
836843 let b_wal_point_ids = b_wal
837844 . wal
838845 . lock ( )
846+ . await
839847 . read ( 0 )
840848 . map ( |( _, op) | get_point ( op) . id )
841849 . collect :: < HashSet < _ > > ( ) ;
842850 let c_wal_point_ids = c_wal
843851 . wal
844852 . lock ( )
853+ . await
845854 . read ( 0 )
846855 . map ( |( _, op) | get_point ( op) . id )
847856 . collect :: < HashSet < _ > > ( ) ;
@@ -1202,7 +1211,7 @@ mod tests {
12021211 . unwrap ( ) ;
12031212
12041213 // Diff expected
1205- assert_eq ! ( b_wal. wal. lock( ) . read( delta_from) . count( ) , 1 ) ;
1214+ assert_eq ! ( b_wal. wal. lock( ) . await . read( delta_from) . count( ) , 1 ) ;
12061215
12071216 assert_wal_ordering_property ( & a_wal, false ) . await ;
12081217 assert_wal_ordering_property ( & b_wal, false ) . await ;
@@ -1387,16 +1396,16 @@ mod tests {
13871396 }
13881397
13891398 // All WALs must be equal, having exactly the same entries
1390- wals . iter ( )
1391- . map ( |wal| wal . 0 . wal . lock ( ) )
1392- . collect :: < Vec < _ > > ( )
1393- . windows ( 2 )
1394- . for_each ( |wals| {
1395- assert ! (
1396- wals[ 0 ] . read( 0 ) . eq( wals[ 1 ] . read( 0 ) ) ,
1397- "all WALs must have the same entries" ,
1398- ) ;
1399- } ) ;
1399+ let mut opened_wals = Vec :: new ( ) ;
1400+ for wal in & wals {
1401+ opened_wals . push ( wal . 0 . wal . lock ( ) . await ) ;
1402+ }
1403+ opened_wals . windows ( 2 ) . for_each ( |wals| {
1404+ assert ! (
1405+ wals[ 0 ] . read( 0 ) . eq( wals[ 1 ] . read( 0 ) ) ,
1406+ "all WALs must have the same entries" ,
1407+ ) ;
1408+ } ) ;
14001409
14011410 // Release some kept clocks
14021411 kept_clocks. retain ( |( keep_for, _) | * keep_for > 1 ) ;
@@ -1478,7 +1487,7 @@ mod tests {
14781487
14791488 let resolve_result = resolve_wal_delta (
14801489 wal. wal
1481- . lock ( )
1490+ . blocking_lock ( )
14821491 . read_all ( true )
14831492 . map ( |( op_num, op) | ( op_num, op. clock_tag ) ) ,
14841493 recovery_point,
@@ -1505,7 +1514,7 @@ mod tests {
15051514
15061515 let resolve_result = resolve_wal_delta (
15071516 wal. wal
1508- . lock ( )
1517+ . blocking_lock ( )
15091518 . read_all ( true )
15101519 . map ( |( op_num, op) | ( op_num, op. clock_tag ) ) ,
15111520 recovery_point,
@@ -1531,7 +1540,7 @@ mod tests {
15311540
15321541 let resolve_result = resolve_wal_delta (
15331542 wal. wal
1534- . lock ( )
1543+ . blocking_lock ( )
15351544 . read_all ( true )
15361545 . map ( |( op_num, op) | ( op_num, op. clock_tag ) ) ,
15371546 recovery_point,
@@ -1562,7 +1571,7 @@ mod tests {
15621571
15631572 let resolve_result = resolve_wal_delta (
15641573 wal. wal
1565- . lock ( )
1574+ . blocking_lock ( )
15661575 . read_all ( true )
15671576 . map ( |( op_num, op) | ( op_num, op. clock_tag ) ) ,
15681577 recovery_point,
@@ -1588,7 +1597,7 @@ mod tests {
15881597
15891598 let resolve_result = resolve_wal_delta (
15901599 wal. wal
1591- . lock ( )
1600+ . blocking_lock ( )
15921601 . read_all ( true )
15931602 . map ( |( op_num, op) | ( op_num, op. clock_tag ) ) ,
15941603 recovery_point,
@@ -1605,6 +1614,7 @@ mod tests {
16051614 let cutoff = wal. oldest_clocks . lock ( ) . await ;
16061615 wal. wal
16071616 . lock ( )
1617+ . await
16081618 . read ( 0 )
16091619 // Only take records with clock tags
16101620 . filter_map ( |( _, operation) | operation. clock_tag )
0 commit comments