Skip to content

Commit 57512b0

Browse files
authored
Unrolled build for #127567
Rollup merge of #127567 - joboet:once_wait, r=Amanieu std: implement the `once_wait` feature Tracking issue: #127527 This additionally adds a `wait_force` method to `Once` that doesn't panic on poison. I also took the opportunity and cleaned up up the code of the queue-based implementation a bit.
2 parents 28a58f2 + 1d49aad commit 57512b0

File tree

6 files changed

+298
-94
lines changed

6 files changed

+298
-94
lines changed

library/std/src/sync/once.rs

+41
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,47 @@ impl Once {
264264
self.inner.is_completed()
265265
}
266266

267+
/// Blocks the current thread until initialization has completed.
268+
///
269+
/// # Example
270+
///
271+
/// ```rust
272+
/// #![feature(once_wait)]
273+
///
274+
/// use std::sync::Once;
275+
/// use std::thread;
276+
///
277+
/// static READY: Once = Once::new();
278+
///
279+
/// let thread = thread::spawn(|| {
280+
/// READY.wait();
281+
/// println!("everything is ready");
282+
/// });
283+
///
284+
/// READY.call_once(|| println!("performing setup"));
285+
/// ```
286+
///
287+
/// # Panics
288+
///
289+
/// If this [`Once`] has been poisoned because an initialization closure has
290+
/// panicked, this method will also panic. Use [`wait_force`](Self::wait_force)
291+
/// if this behaviour is not desired.
292+
#[unstable(feature = "once_wait", issue = "127527")]
293+
pub fn wait(&self) {
294+
if !self.inner.is_completed() {
295+
self.inner.wait(false);
296+
}
297+
}
298+
299+
/// Blocks the current thread until initialization has completed, ignoring
300+
/// poisoning.
301+
#[unstable(feature = "once_wait", issue = "127527")]
302+
pub fn wait_force(&self) {
303+
if !self.inner.is_completed() {
304+
self.inner.wait(true);
305+
}
306+
}
307+
267308
/// Returns the current state of the `Once` instance.
268309
///
269310
/// Since this takes a mutable reference, no initialization can currently

library/std/src/sync/once/tests.rs

+47
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use super::Once;
2+
use crate::sync::atomic::AtomicBool;
3+
use crate::sync::atomic::Ordering::Relaxed;
24
use crate::sync::mpsc::channel;
5+
use crate::time::Duration;
36
use crate::{panic, thread};
47

58
#[test]
@@ -113,3 +116,47 @@ fn wait_for_force_to_finish() {
113116
assert!(t1.join().is_ok());
114117
assert!(t2.join().is_ok());
115118
}
119+
120+
#[test]
121+
fn wait() {
122+
for _ in 0..50 {
123+
let val = AtomicBool::new(false);
124+
let once = Once::new();
125+
126+
thread::scope(|s| {
127+
for _ in 0..4 {
128+
s.spawn(|| {
129+
once.wait();
130+
assert!(val.load(Relaxed));
131+
});
132+
}
133+
134+
once.call_once(|| val.store(true, Relaxed));
135+
});
136+
}
137+
}
138+
139+
#[test]
140+
fn wait_on_poisoned() {
141+
let once = Once::new();
142+
143+
panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();
144+
panic::catch_unwind(|| once.wait()).unwrap_err();
145+
}
146+
147+
#[test]
148+
fn wait_force_on_poisoned() {
149+
let once = Once::new();
150+
151+
thread::scope(|s| {
152+
panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();
153+
154+
s.spawn(|| {
155+
thread::sleep(Duration::from_millis(100));
156+
157+
once.call_once_force(|_| {});
158+
});
159+
160+
once.wait_force();
161+
})
162+
}

library/std/src/sync/once_lock.rs

+28
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ impl<T> OnceLock<T> {
167167
}
168168
}
169169

170+
/// Blocks the current thread until the cell is initialized.
171+
///
172+
/// # Example
173+
///
174+
/// Waiting for a computation on another thread to finish:
175+
/// ```rust
176+
/// #![feature(once_wait)]
177+
///
178+
/// use std::thread;
179+
/// use std::sync::OnceLock;
180+
///
181+
/// let value = OnceLock::new();
182+
///
183+
/// thread::scope(|s| {
184+
/// s.spawn(|| value.set(1 + 1));
185+
///
186+
/// let result = value.wait();
187+
/// assert_eq!(result, &2);
188+
/// })
189+
/// ```
190+
#[inline]
191+
#[unstable(feature = "once_wait", issue = "127527")]
192+
pub fn wait(&self) -> &T {
193+
self.once.wait_force();
194+
195+
unsafe { self.get_unchecked() }
196+
}
197+
170198
/// Sets the contents of this cell to `value`.
171199
///
172200
/// May block if another thread is currently attempting to initialize the cell. The cell is

library/std/src/sys/sync/once/futex.rs

+88-36
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
66
use crate::sys::futex::{futex_wait, futex_wake_all};
77

88
// On some platforms, the OS is very nice and handles the waiter queue for us.
9-
// This means we only need one atomic value with 5 states:
9+
// This means we only need one atomic value with 4 states:
1010

1111
/// No initialization has run yet, and no thread is currently using the Once.
1212
const INCOMPLETE: u32 = 0;
@@ -17,16 +17,20 @@ const POISONED: u32 = 1;
1717
/// Some thread is currently attempting to run initialization. It may succeed,
1818
/// so all future threads need to wait for it to finish.
1919
const RUNNING: u32 = 2;
20-
/// Some thread is currently attempting to run initialization and there are threads
21-
/// waiting for it to finish.
22-
const QUEUED: u32 = 3;
2320
/// Initialization has completed and all future calls should finish immediately.
24-
const COMPLETE: u32 = 4;
21+
const COMPLETE: u32 = 3;
2522

26-
// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
23+
// An additional bit indicates whether there are waiting threads:
24+
25+
/// May only be set if the state is not COMPLETE.
26+
const QUEUED: u32 = 4;
27+
28+
// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
2729
// variable. When the running thread finishes, it will wake all waiting threads using
2830
// `futex_wake_all`.
2931

32+
const STATE_MASK: u32 = 0b11;
33+
3034
pub struct OnceState {
3135
poisoned: bool,
3236
set_state_to: Cell<u32>,
@@ -45,7 +49,7 @@ impl OnceState {
4549
}
4650

4751
struct CompletionGuard<'a> {
48-
state: &'a AtomicU32,
52+
state_and_queued: &'a AtomicU32,
4953
set_state_on_drop_to: u32,
5054
}
5155

@@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
5458
// Use release ordering to propagate changes to all threads checking
5559
// up on the Once. `futex_wake_all` does its own synchronization, hence
5660
// we do not need `AcqRel`.
57-
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
58-
futex_wake_all(self.state);
61+
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
62+
futex_wake_all(self.state_and_queued);
5963
}
6064
}
6165
}
6266

6367
pub struct Once {
64-
state: AtomicU32,
68+
state_and_queued: AtomicU32,
6569
}
6670

6771
impl Once {
6872
#[inline]
6973
pub const fn new() -> Once {
70-
Once { state: AtomicU32::new(INCOMPLETE) }
74+
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
7175
}
7276

7377
#[inline]
7478
pub fn is_completed(&self) -> bool {
7579
// Use acquire ordering to make all initialization changes visible to the
7680
// current thread.
77-
self.state.load(Acquire) == COMPLETE
81+
self.state_and_queued.load(Acquire) == COMPLETE
7882
}
7983

8084
#[inline]
8185
pub(crate) fn state(&mut self) -> ExclusiveState {
82-
match *self.state.get_mut() {
86+
match *self.state_and_queued.get_mut() {
8387
INCOMPLETE => ExclusiveState::Incomplete,
8488
POISONED => ExclusiveState::Poisoned,
8589
COMPLETE => ExclusiveState::Complete,
8690
_ => unreachable!("invalid Once state"),
8791
}
8892
}
8993

90-
// This uses FnMut to match the API of the generic implementation. As this
91-
// implementation is quite light-weight, it is generic over the closure and
92-
// so avoids the cost of dynamic dispatch.
9394
#[cold]
9495
#[track_caller]
95-
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
96-
let mut state = self.state.load(Acquire);
96+
pub fn wait(&self, ignore_poisoning: bool) {
97+
let mut state_and_queued = self.state_and_queued.load(Acquire);
9798
loop {
99+
let state = state_and_queued & STATE_MASK;
100+
let queued = state_and_queued & QUEUED != 0;
98101
match state {
102+
COMPLETE => return,
103+
POISONED if !ignore_poisoning => {
104+
// Panic to propagate the poison.
105+
panic!("Once instance has previously been poisoned");
106+
}
107+
_ => {
108+
// Set the QUEUED bit if it has not already been set.
109+
if !queued {
110+
state_and_queued += QUEUED;
111+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
112+
state,
113+
state_and_queued,
114+
Relaxed,
115+
Acquire,
116+
) {
117+
state_and_queued = new;
118+
continue;
119+
}
120+
}
121+
122+
futex_wait(&self.state_and_queued, state_and_queued, None);
123+
state_and_queued = self.state_and_queued.load(Acquire);
124+
}
125+
}
126+
}
127+
}
128+
129+
#[cold]
130+
#[track_caller]
131+
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
132+
let mut state_and_queued = self.state_and_queued.load(Acquire);
133+
loop {
134+
let state = state_and_queued & STATE_MASK;
135+
let queued = state_and_queued & QUEUED != 0;
136+
match state {
137+
COMPLETE => return,
99138
POISONED if !ignore_poisoning => {
100139
// Panic to propagate the poison.
101140
panic!("Once instance has previously been poisoned");
102141
}
103142
INCOMPLETE | POISONED => {
104143
// Try to register the current thread as the one running.
105-
if let Err(new) =
106-
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
107-
{
108-
state = new;
144+
let next = RUNNING + if queued { QUEUED } else { 0 };
145+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
146+
state_and_queued,
147+
next,
148+
Acquire,
149+
Acquire,
150+
) {
151+
state_and_queued = new;
109152
continue;
110153
}
154+
111155
// `waiter_queue` will manage other waiting threads, and
112156
// wake them up on drop.
113-
let mut waiter_queue =
114-
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
157+
let mut waiter_queue = CompletionGuard {
158+
state_and_queued: &self.state_and_queued,
159+
set_state_on_drop_to: POISONED,
160+
};
115161
// Run the function, letting it know if we're poisoned or not.
116162
let f_state = public::OnceState {
117163
inner: OnceState {
@@ -123,21 +169,27 @@ impl Once {
123169
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
124170
return;
125171
}
126-
RUNNING | QUEUED => {
127-
// Set the state to QUEUED if it is not already.
128-
if state == RUNNING
129-
&& let Err(new) =
130-
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
131-
{
132-
state = new;
133-
continue;
172+
_ => {
173+
// All other values must be RUNNING.
174+
assert!(state == RUNNING);
175+
176+
// Set the QUEUED bit if it is not already set.
177+
if !queued {
178+
state_and_queued += QUEUED;
179+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
180+
state,
181+
state_and_queued,
182+
Relaxed,
183+
Acquire,
184+
) {
185+
state_and_queued = new;
186+
continue;
187+
}
134188
}
135189

136-
futex_wait(&self.state, QUEUED, None);
137-
state = self.state.load(Acquire);
190+
futex_wait(&self.state_and_queued, state_and_queued, None);
191+
state_and_queued = self.state_and_queued.load(Acquire);
138192
}
139-
COMPLETE => return,
140-
_ => unreachable!("state is never set to invalid values"),
141193
}
142194
}
143195
}

library/std/src/sys/sync/once/no_threads.rs

+6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ impl Once {
5555
}
5656
}
5757

58+
#[cold]
59+
#[track_caller]
60+
pub fn wait(&self, _ignore_poisoning: bool) {
61+
panic!("not implementable on this target");
62+
}
63+
5864
#[cold]
5965
#[track_caller]
6066
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {

0 commit comments

Comments
 (0)