Skip to content

Commit a131243

Browse files
committed
completely refactor how we manage blocking and unblocking threads
1 parent 5fa30f7 commit a131243

18 files changed

+832
-895
lines changed

src/tools/miri/src/alloc_addresses/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
169169
size,
170170
align,
171171
memory_kind,
172-
ecx.get_active_thread(),
172+
ecx.active_thread(),
173173
) {
174174
if let Some(clock) = clock {
175175
ecx.acquire_clock(&clock);
@@ -367,7 +367,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
367367
// `alloc_id_from_addr` any more.
368368
global_state.exposed.remove(&dead_id);
369369
// Also remember this address for future reuse.
370-
let thread = self.threads.get_active_thread_id();
370+
let thread = self.threads.active_thread();
371371
global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
372372
if let Some(data_race) = &self.data_race {
373373
data_race.release_clock(&self.threads).clone()

src/tools/miri/src/concurrency/data_race.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
839839
fn acquire_clock(&self, clock: &VClock) {
840840
let this = self.eval_context_ref();
841841
if let Some(data_race) = &this.machine.data_race {
842-
data_race.acquire_clock(clock, this.get_active_thread());
842+
data_race.acquire_clock(clock, &this.machine.threads);
843843
}
844844
}
845845
}
@@ -1662,13 +1662,14 @@ impl GlobalState {
16621662
/// This should be called strictly before any calls to
16631663
/// `thread_joined`.
16641664
#[inline]
1665-
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) {
1665+
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
16661666
let current_index = self.active_thread_index(thread_mgr);
16671667

16681668
// Increment the clock to a unique termination timestamp.
16691669
let vector_clocks = self.vector_clocks.get_mut();
16701670
let current_clocks = &mut vector_clocks[current_index];
1671-
current_clocks.increment_clock(current_index, current_span);
1671+
current_clocks
1672+
.increment_clock(current_index, thread_mgr.active_thread_ref().current_span());
16721673

16731674
// Load the current thread id for the executing vector.
16741675
let vector_info = self.vector_info.get_mut();
@@ -1722,11 +1723,12 @@ impl GlobalState {
17221723
format!("thread `{thread_name}`")
17231724
}
17241725

1725-
/// Acquire the given clock into the given thread, establishing synchronization with
1726+
/// Acquire the given clock into the current thread, establishing synchronization with
17261727
/// the moment when that clock snapshot was taken via `release_clock`.
17271728
/// As this is an acquire operation, the thread timestamp is not
17281729
/// incremented.
1729-
pub fn acquire_clock(&self, clock: &VClock, thread: ThreadId) {
1730+
pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) {
1731+
let thread = threads.active_thread();
17301732
let (_, mut clocks) = self.thread_state_mut(thread);
17311733
clocks.clock.join(clock);
17321734
}
@@ -1738,7 +1740,7 @@ impl GlobalState {
17381740
&self,
17391741
threads: &ThreadManager<'mir, 'tcx>,
17401742
) -> Ref<'_, VClock> {
1741-
let thread = threads.get_active_thread_id();
1743+
let thread = threads.active_thread();
17421744
let span = threads.active_thread_ref().current_span();
17431745
// We increment the clock each time this happens, to ensure no two releases
17441746
// can be confused with each other.
@@ -1782,7 +1784,7 @@ impl GlobalState {
17821784
&self,
17831785
thread_mgr: &ThreadManager<'_, '_>,
17841786
) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
1785-
self.thread_state(thread_mgr.get_active_thread_id())
1787+
self.thread_state(thread_mgr.active_thread())
17861788
}
17871789

17881790
/// Load the current vector clock in use and the current set of thread clocks
@@ -1792,14 +1794,14 @@ impl GlobalState {
17921794
&self,
17931795
thread_mgr: &ThreadManager<'_, '_>,
17941796
) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
1795-
self.thread_state_mut(thread_mgr.get_active_thread_id())
1797+
self.thread_state_mut(thread_mgr.active_thread())
17961798
}
17971799

17981800
/// Return the current thread, should be the same
17991801
/// as the data-race active thread.
18001802
#[inline]
18011803
fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx {
1802-
let active_thread_id = thread_mgr.get_active_thread_id();
1804+
let active_thread_id = thread_mgr.active_thread();
18031805
self.thread_index(active_thread_id)
18041806
}
18051807

src/tools/miri/src/concurrency/init_once.rs

+26-84
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,11 @@ use rustc_index::Idx;
44
use rustc_middle::ty::layout::TyAndLayout;
55

66
use super::sync::EvalContextExtPriv as _;
7-
use super::thread::MachineCallback;
87
use super::vector_clock::VClock;
98
use crate::*;
109

1110
declare_id!(InitOnceId);
1211

13-
/// A thread waiting on an InitOnce object.
14-
struct InitOnceWaiter<'mir, 'tcx> {
15-
/// The thread that is waiting.
16-
thread: ThreadId,
17-
/// The callback that should be executed, after the thread has been woken up.
18-
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
19-
}
20-
21-
impl<'mir, 'tcx> std::fmt::Debug for InitOnceWaiter<'mir, 'tcx> {
22-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23-
f.debug_struct("InitOnce")
24-
.field("thread", &self.thread)
25-
.field("callback", &"dyn MachineCallback")
26-
.finish()
27-
}
28-
}
29-
3012
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
3113
/// The current status of a one time initialization.
3214
pub enum InitOnceStatus {
@@ -38,68 +20,14 @@ pub enum InitOnceStatus {
3820

3921
/// The one time initialization state.
4022
#[derive(Default, Debug)]
41-
pub(super) struct InitOnce<'mir, 'tcx> {
23+
pub(super) struct InitOnce {
4224
status: InitOnceStatus,
43-
waiters: VecDeque<InitOnceWaiter<'mir, 'tcx>>,
25+
waiters: VecDeque<ThreadId>,
4426
clock: VClock,
4527
}
4628

47-
impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> {
48-
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
49-
for waiter in self.waiters.iter() {
50-
waiter.callback.visit_provenance(visit);
51-
}
52-
}
53-
}
54-
5529
impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
5630
trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
57-
/// Synchronize with the previous initialization attempt of an InitOnce.
58-
#[inline]
59-
fn init_once_observe_attempt(&mut self, id: InitOnceId) {
60-
let this = self.eval_context_mut();
61-
let current_thread = this.get_active_thread();
62-
63-
if let Some(data_race) = &this.machine.data_race {
64-
data_race.acquire_clock(&this.machine.sync.init_onces[id].clock, current_thread);
65-
}
66-
}
67-
68-
#[inline]
69-
fn init_once_wake_waiter(
70-
&mut self,
71-
id: InitOnceId,
72-
waiter: InitOnceWaiter<'mir, 'tcx>,
73-
) -> InterpResult<'tcx> {
74-
let this = self.eval_context_mut();
75-
let current_thread = this.get_active_thread();
76-
77-
this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));
78-
79-
// Call callback, with the woken-up thread as `current`.
80-
this.set_active_thread(waiter.thread);
81-
this.init_once_observe_attempt(id);
82-
waiter.callback.call(this)?;
83-
this.set_active_thread(current_thread);
84-
85-
Ok(())
86-
}
87-
}
88-
89-
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
90-
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
91-
fn init_once_get_or_create_id(
92-
&mut self,
93-
lock_op: &OpTy<'tcx, Provenance>,
94-
lock_layout: TyAndLayout<'tcx>,
95-
offset: u64,
96-
) -> InterpResult<'tcx, InitOnceId> {
97-
let this = self.eval_context_mut();
98-
this.init_once_get_or_create(|ecx, next_id| {
99-
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
100-
})
101-
}
102-
10331
/// Provides the closure with the next InitOnceId. Creates that InitOnce if the closure returns None,
10432
/// otherwise returns the value from the closure.
10533
#[inline]
@@ -120,6 +48,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
12048
Ok(new_index)
12149
}
12250
}
51+
}
52+
53+
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
54+
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
55+
fn init_once_get_or_create_id(
56+
&mut self,
57+
lock_op: &OpTy<'tcx, Provenance>,
58+
lock_layout: TyAndLayout<'tcx>,
59+
offset: u64,
60+
) -> InterpResult<'tcx, InitOnceId> {
61+
let this = self.eval_context_mut();
62+
this.init_once_get_or_create(|ecx, next_id| {
63+
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
64+
})
65+
}
12366

12467
#[inline]
12568
fn init_once_status(&mut self, id: InitOnceId) -> InitOnceStatus {
@@ -132,14 +75,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
13275
fn init_once_enqueue_and_block(
13376
&mut self,
13477
id: InitOnceId,
135-
thread: ThreadId,
136-
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
78+
callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
13779
) {
13880
let this = self.eval_context_mut();
81+
let thread = this.active_thread();
13982
let init_once = &mut this.machine.sync.init_onces[id];
14083
assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
141-
init_once.waiters.push_back(InitOnceWaiter { thread, callback });
142-
this.block_thread(thread, BlockReason::InitOnce(id));
84+
init_once.waiters.push_back(thread);
85+
this.block_thread(BlockReason::InitOnce(id), None, callback);
14386
}
14487

14588
/// Begin initializing this InitOnce. Must only be called after checking that it is currently
@@ -177,7 +120,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
177120
// Wake up everyone.
178121
// need to take the queue to avoid having `this` be borrowed multiple times
179122
for waiter in std::mem::take(&mut init_once.waiters) {
180-
this.init_once_wake_waiter(id, waiter)?;
123+
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
181124
}
182125

183126
Ok(())
@@ -192,6 +135,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
192135
InitOnceStatus::Begun,
193136
"failing already completed or uninit init once"
194137
);
138+
// This is again uninitialized.
139+
init_once.status = InitOnceStatus::Uninitialized;
195140

196141
// Each complete happens-before the end of the wait
197142
if let Some(data_race) = &this.machine.data_race {
@@ -200,10 +145,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
200145

201146
// Wake up one waiting thread, so they can go ahead and try to init this.
202147
if let Some(waiter) = init_once.waiters.pop_front() {
203-
this.init_once_wake_waiter(id, waiter)?;
204-
} else {
205-
// Nobody there to take this, so go back to 'uninit'
206-
init_once.status = InitOnceStatus::Uninitialized;
148+
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
207149
}
208150

209151
Ok(())
@@ -221,6 +163,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
221163
"observing the completion of incomplete init once"
222164
);
223165

224-
this.init_once_observe_attempt(id);
166+
this.acquire_clock(&this.machine.sync.init_onces[id].clock);
225167
}
226168
}

0 commit comments

Comments
 (0)