@@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
6
6
use crate :: sys:: futex:: { futex_wait, futex_wake_all} ;
7
7
8
8
// On some platforms, the OS is very nice and handles the waiter queue for us.
9
- // This means we only need one atomic value with 5 states:
9
+ // This means we only need one atomic value with 4 states:
10
10
11
11
/// No initialization has run yet, and no thread is currently using the Once.
12
12
const INCOMPLETE : u32 = 0 ;
@@ -17,16 +17,20 @@ const POISONED: u32 = 1;
17
17
/// Some thread is currently attempting to run initialization. It may succeed,
18
18
/// so all future threads need to wait for it to finish.
19
19
const RUNNING : u32 = 2 ;
20
- /// Some thread is currently attempting to run initialization and there are threads
21
- /// waiting for it to finish.
22
- const QUEUED : u32 = 3 ;
23
20
/// Initialization has completed and all future calls should finish immediately.
24
- const COMPLETE : u32 = 4 ;
21
+ const COMPLETE : u32 = 3 ;
25
22
26
- // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
23
+ // An additional bit indicates whether there are waiting threads:
24
+
25
+ /// May only be set if the state is not COMPLETE.
26
+ const QUEUED : u32 = 4 ;
27
+
28
+ // Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
27
29
// variable. When the running thread finishes, it will wake all waiting threads using
28
30
// `futex_wake_all`.
29
31
32
+ const STATE_MASK : u32 = 0b11 ;
33
+
30
34
pub struct OnceState {
31
35
poisoned : bool ,
32
36
set_state_to : Cell < u32 > ,
@@ -45,7 +49,7 @@ impl OnceState {
45
49
}
46
50
47
51
struct CompletionGuard < ' a > {
48
- state : & ' a AtomicU32 ,
52
+ state_and_queued : & ' a AtomicU32 ,
49
53
set_state_on_drop_to : u32 ,
50
54
}
51
55
@@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
54
58
// Use release ordering to propagate changes to all threads checking
55
59
// up on the Once. `futex_wake_all` does its own synchronization, hence
56
60
// we do not need `AcqRel`.
57
- if self . state . swap ( self . set_state_on_drop_to , Release ) == QUEUED {
58
- futex_wake_all ( self . state ) ;
61
+ if self . state_and_queued . swap ( self . set_state_on_drop_to , Release ) & QUEUED != 0 {
62
+ futex_wake_all ( self . state_and_queued ) ;
59
63
}
60
64
}
61
65
}
62
66
63
67
pub struct Once {
64
- state : AtomicU32 ,
68
+ state_and_queued : AtomicU32 ,
65
69
}
66
70
67
71
impl Once {
68
72
#[ inline]
69
73
pub const fn new ( ) -> Once {
70
- Once { state : AtomicU32 :: new ( INCOMPLETE ) }
74
+ Once { state_and_queued : AtomicU32 :: new ( INCOMPLETE ) }
71
75
}
72
76
73
77
#[ inline]
74
78
pub fn is_completed ( & self ) -> bool {
75
79
// Use acquire ordering to make all initialization changes visible to the
76
80
// current thread.
77
- self . state . load ( Acquire ) == COMPLETE
81
+ self . state_and_queued . load ( Acquire ) == COMPLETE
78
82
}
79
83
80
84
#[ inline]
81
85
pub ( crate ) fn state ( & mut self ) -> ExclusiveState {
82
- match * self . state . get_mut ( ) {
86
+ match * self . state_and_queued . get_mut ( ) {
83
87
INCOMPLETE => ExclusiveState :: Incomplete ,
84
88
POISONED => ExclusiveState :: Poisoned ,
85
89
COMPLETE => ExclusiveState :: Complete ,
86
90
_ => unreachable ! ( "invalid Once state" ) ,
87
91
}
88
92
}
89
93
90
- // This uses FnMut to match the API of the generic implementation. As this
91
- // implementation is quite light-weight, it is generic over the closure and
92
- // so avoids the cost of dynamic dispatch.
93
94
#[ cold]
94
95
#[ track_caller]
95
- pub fn call ( & self , ignore_poisoning : bool , f : & mut impl FnMut ( & public :: OnceState ) ) {
96
- let mut state = self . state . load ( Acquire ) ;
96
+ pub fn wait ( & self , ignore_poisoning : bool ) {
97
+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
97
98
loop {
99
+ let state = state_and_queued & STATE_MASK ;
100
+ let queued = state_and_queued & QUEUED != 0 ;
98
101
match state {
102
+ COMPLETE => return ,
103
+ POISONED if !ignore_poisoning => {
104
+ // Panic to propagate the poison.
105
+ panic ! ( "Once instance has previously been poisoned" ) ;
106
+ }
107
+ _ => {
108
+ // Set the QUEUED bit if it has not already been set.
109
+ if !queued {
110
+ state_and_queued += QUEUED ;
111
+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
112
+ state,
113
+ state_and_queued,
114
+ Relaxed ,
115
+ Acquire ,
116
+ ) {
117
+ state_and_queued = new;
118
+ continue ;
119
+ }
120
+ }
121
+
122
+ futex_wait ( & self . state_and_queued , state_and_queued, None ) ;
123
+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
124
+ }
125
+ }
126
+ }
127
+ }
128
+
129
+ #[ cold]
130
+ #[ track_caller]
131
+ pub fn call ( & self , ignore_poisoning : bool , f : & mut dyn FnMut ( & public:: OnceState ) ) {
132
+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
133
+ loop {
134
+ let state = state_and_queued & STATE_MASK ;
135
+ let queued = state_and_queued & QUEUED != 0 ;
136
+ match state {
137
+ COMPLETE => return ,
99
138
POISONED if !ignore_poisoning => {
100
139
// Panic to propagate the poison.
101
140
panic ! ( "Once instance has previously been poisoned" ) ;
102
141
}
103
142
INCOMPLETE | POISONED => {
104
143
// Try to register the current thread as the one running.
105
- if let Err ( new) =
106
- self . state . compare_exchange_weak ( state, RUNNING , Acquire , Acquire )
107
- {
108
- state = new;
144
+ let next = RUNNING + if queued { QUEUED } else { 0 } ;
145
+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
146
+ state_and_queued,
147
+ next,
148
+ Acquire ,
149
+ Acquire ,
150
+ ) {
151
+ state_and_queued = new;
109
152
continue ;
110
153
}
154
+
111
155
// `waiter_queue` will manage other waiting threads, and
112
156
// wake them up on drop.
113
- let mut waiter_queue =
114
- CompletionGuard { state : & self . state , set_state_on_drop_to : POISONED } ;
157
+ let mut waiter_queue = CompletionGuard {
158
+ state_and_queued : & self . state_and_queued ,
159
+ set_state_on_drop_to : POISONED ,
160
+ } ;
115
161
// Run the function, letting it know if we're poisoned or not.
116
162
let f_state = public:: OnceState {
117
163
inner : OnceState {
@@ -123,21 +169,27 @@ impl Once {
123
169
waiter_queue. set_state_on_drop_to = f_state. inner . set_state_to . get ( ) ;
124
170
return ;
125
171
}
126
- RUNNING | QUEUED => {
127
- // Set the state to QUEUED if it is not already.
128
- if state == RUNNING
129
- && let Err ( new) =
130
- self . state . compare_exchange_weak ( RUNNING , QUEUED , Relaxed , Acquire )
131
- {
132
- state = new;
133
- continue ;
172
+ _ => {
173
+ // All other values must be RUNNING.
174
+ assert ! ( state == RUNNING ) ;
175
+
176
+ // Set the QUEUED bit if it is not already set.
177
+ if !queued {
178
+ state_and_queued += QUEUED ;
179
+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
180
+ state,
181
+ state_and_queued,
182
+ Relaxed ,
183
+ Acquire ,
184
+ ) {
185
+ state_and_queued = new;
186
+ continue ;
187
+ }
134
188
}
135
189
136
- futex_wait ( & self . state , QUEUED , None ) ;
137
- state = self . state . load ( Acquire ) ;
190
+ futex_wait ( & self . state_and_queued , state_and_queued , None ) ;
191
+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
138
192
}
139
- COMPLETE => return ,
140
- _ => unreachable ! ( "state is never set to invalid values" ) ,
141
193
}
142
194
}
143
195
}
0 commit comments