Skip to content

Commit 9e00b26

Browse files
authored
sync: add Notify::notify_last (#6520)
1 parent 6c42d28 commit 9e00b26

File tree

3 files changed

+160
-17
lines changed

3 files changed

+160
-17
lines changed

tokio/src/sync/notify.rs

+65-17
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,9 @@ struct Waiter {
223223
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224224
waker: UnsafeCell<Option<Waker>>,
225225

226-
/// Notification for this waiter.
226+
/// Notification for this waiter. Uses 2 bits to store if and how was
227+
/// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
228+
/// the rest of it is unused.
227229
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
228230
/// * if it's `Some`, then `waker` is exclusively owned by the
229231
/// enclosing `Waiter` and can be accessed without locking.
@@ -253,13 +255,16 @@ generate_addr_of_methods! {
253255
}
254256

255257
// No notification.
256-
const NOTIFICATION_NONE: usize = 0;
258+
const NOTIFICATION_NONE: usize = 0b000;
257259

258260
// Notification type used by `notify_one`.
259-
const NOTIFICATION_ONE: usize = 1;
261+
const NOTIFICATION_ONE: usize = 0b001;
262+
263+
// Notification type used by `notify_last`.
264+
const NOTIFICATION_LAST: usize = 0b101;
260265

261266
// Notification type used by `notify_waiters`.
262-
const NOTIFICATION_ALL: usize = 2;
267+
const NOTIFICATION_ALL: usize = 0b010;
263268

264269
/// Notification for a `Waiter`.
265270
/// This struct is equivalent to `Option<Notification>`, but uses
@@ -275,13 +280,20 @@ impl AtomicNotification {
275280
/// Store-release a notification.
276281
/// This method should be called exactly once.
277282
fn store_release(&self, notification: Notification) {
278-
self.0.store(notification as usize, Release);
283+
let data: usize = match notification {
284+
Notification::All => NOTIFICATION_ALL,
285+
Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
286+
Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
287+
};
288+
self.0.store(data, Release);
279289
}
280290

281291
fn load(&self, ordering: Ordering) -> Option<Notification> {
282-
match self.0.load(ordering) {
292+
let data = self.0.load(ordering);
293+
match data {
283294
NOTIFICATION_NONE => None,
284-
NOTIFICATION_ONE => Some(Notification::One),
295+
NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
296+
NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
285297
NOTIFICATION_ALL => Some(Notification::All),
286298
_ => unreachable!(),
287299
}
@@ -296,11 +308,18 @@ impl AtomicNotification {
296308
}
297309
}
298310

311+
#[derive(Debug, PartialEq, Eq)]
312+
#[repr(usize)]
313+
enum NotifyOneStrategy {
314+
Fifo,
315+
Lifo,
316+
}
317+
299318
#[derive(Debug, PartialEq, Eq)]
300319
#[repr(usize)]
301320
enum Notification {
302-
One = NOTIFICATION_ONE,
303-
All = NOTIFICATION_ALL,
321+
One(NotifyOneStrategy),
322+
All,
304323
}
305324

306325
/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
@@ -521,7 +540,7 @@ impl Notify {
521540
}
522541
}
523542

524-
/// Notifies a waiting task.
543+
/// Notifies the first waiting task.
525544
///
526545
/// If a task is currently waiting, that task is notified. Otherwise, a
527546
/// permit is stored in this `Notify` value and the **next** call to
@@ -558,6 +577,23 @@ impl Notify {
558577
// Alias for old name in 0.x
559578
#[cfg_attr(docsrs, doc(alias = "notify"))]
560579
pub fn notify_one(&self) {
580+
self.notify_with_strategy(NotifyOneStrategy::Fifo);
581+
}
582+
583+
/// Notifies the last waiting task.
584+
///
585+
/// This function behaves similar to `notify_one`. The only difference is that it wakes
586+
/// the most recently added waiter instead of the oldest waiter.
587+
///
588+
/// Check the [`notify_one()`] documentation for more info and
589+
/// examples.
590+
///
591+
/// [`notify_one()`]: Notify::notify_one
592+
pub fn notify_last(&self) {
593+
self.notify_with_strategy(NotifyOneStrategy::Lifo);
594+
}
595+
596+
fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
561597
// Load the current state
562598
let mut curr = self.state.load(SeqCst);
563599

@@ -585,7 +621,7 @@ impl Notify {
585621
// transition out of WAITING while the lock is held.
586622
curr = self.state.load(SeqCst);
587623

588-
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
624+
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
589625
drop(waiters);
590626
waker.wake();
591627
}
@@ -708,7 +744,12 @@ impl Default for Notify {
708744
impl UnwindSafe for Notify {}
709745
impl RefUnwindSafe for Notify {}
710746

711-
fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
747+
fn notify_locked(
748+
waiters: &mut WaitList,
749+
state: &AtomicUsize,
750+
curr: usize,
751+
strategy: NotifyOneStrategy,
752+
) -> Option<Waker> {
712753
match get_state(curr) {
713754
EMPTY | NOTIFIED => {
714755
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
@@ -728,8 +769,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
728769
// concurrently change as holding the lock is required to
729770
// transition **out** of `WAITING`.
730771
//
731-
// Get a pending waiter
732-
let waiter = waiters.pop_back().unwrap();
772+
// Get a pending waiter using one of the available dequeue strategies.
773+
let waiter = match strategy {
774+
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
775+
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
776+
};
733777

734778
// Safety: we never make mutable references to waiters.
735779
let waiter = unsafe { waiter.as_ref() };
@@ -738,7 +782,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
738782
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
739783

740784
// This waiter is unlinked and will not be shared ever again, release it.
741-
waiter.notification.store_release(Notification::One);
785+
waiter
786+
.notification
787+
.store_release(Notification::One(strategy));
742788

743789
if waiters.is_empty() {
744790
// As this the **final** waiter in the list, the state
@@ -1137,8 +1183,10 @@ impl Drop for Notified<'_> {
11371183
// See if the node was notified but not received. In this case, if
11381184
// the notification was triggered via `notify_one`, it must be sent
11391185
// to the next waiter.
1140-
if notification == Some(Notification::One) {
1141-
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
1186+
if let Some(Notification::One(strategy)) = notification {
1187+
if let Some(waker) =
1188+
notify_locked(&mut waiters, &notify.state, notify_state, strategy)
1189+
{
11421190
drop(waiters);
11431191
waker.wake();
11441192
}

tokio/src/util/linked_list.rs

+20
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,26 @@ impl<L: Link> LinkedList<L, L::Target> {
137137
}
138138
}
139139

140+
/// Removes the first element from a list and returns it, or None if it is
141+
/// empty.
142+
pub(crate) fn pop_front(&mut self) -> Option<L::Handle> {
143+
unsafe {
144+
let head = self.head?;
145+
self.head = L::pointers(head).as_ref().get_next();
146+
147+
if let Some(new_head) = L::pointers(head).as_ref().get_next() {
148+
L::pointers(new_head).as_mut().set_prev(None);
149+
} else {
150+
self.tail = None;
151+
}
152+
153+
L::pointers(head).as_mut().set_prev(None);
154+
L::pointers(head).as_mut().set_next(None);
155+
156+
Some(L::from_raw(head))
157+
}
158+
}
159+
140160
/// Removes the last element from a list and returns it, or None if it is
141161
/// empty.
142162
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {

tokio/tests/sync_notify.rs

+75
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,38 @@ fn notify_notified_one() {
2121
assert_ready!(notified.poll());
2222
}
2323

24+
#[test]
25+
fn notify_multi_notified_one() {
26+
let notify = Notify::new();
27+
let mut notified1 = spawn(async { notify.notified().await });
28+
let mut notified2 = spawn(async { notify.notified().await });
29+
30+
// add two waiters into the queue
31+
assert_pending!(notified1.poll());
32+
assert_pending!(notified2.poll());
33+
34+
// should wakeup the first one
35+
notify.notify_one();
36+
assert_ready!(notified1.poll());
37+
assert_pending!(notified2.poll());
38+
}
39+
40+
#[test]
41+
fn notify_multi_notified_last() {
42+
let notify = Notify::new();
43+
let mut notified1 = spawn(async { notify.notified().await });
44+
let mut notified2 = spawn(async { notify.notified().await });
45+
46+
// add two waiters into the queue
47+
assert_pending!(notified1.poll());
48+
assert_pending!(notified2.poll());
49+
50+
// should wakeup the last one
51+
notify.notify_last();
52+
assert_pending!(notified1.poll());
53+
assert_ready!(notified2.poll());
54+
}
55+
2456
#[test]
2557
fn notified_one_notify() {
2658
let notify = Notify::new();
@@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() {
105137
assert_ready!(notified2.poll());
106138
}
107139

140+
#[test]
141+
fn notified_multi_notify_one_drop() {
142+
let notify = Notify::new();
143+
let mut notified1 = spawn(async { notify.notified().await });
144+
let mut notified2 = spawn(async { notify.notified().await });
145+
let mut notified3 = spawn(async { notify.notified().await });
146+
147+
// add waiters by order of poll execution
148+
assert_pending!(notified1.poll());
149+
assert_pending!(notified2.poll());
150+
assert_pending!(notified3.poll());
151+
152+
// by default fifo
153+
notify.notify_one();
154+
155+
drop(notified1);
156+
157+
// next waiter should be the one to be to woken up
158+
assert_ready!(notified2.poll());
159+
assert_pending!(notified3.poll());
160+
}
161+
162+
#[test]
163+
fn notified_multi_notify_last_drop() {
164+
let notify = Notify::new();
165+
let mut notified1 = spawn(async { notify.notified().await });
166+
let mut notified2 = spawn(async { notify.notified().await });
167+
let mut notified3 = spawn(async { notify.notified().await });
168+
169+
// add waiters by order of poll execution
170+
assert_pending!(notified1.poll());
171+
assert_pending!(notified2.poll());
172+
assert_pending!(notified3.poll());
173+
174+
notify.notify_last();
175+
176+
drop(notified3);
177+
178+
// latest waiter added should be the one to woken up
179+
assert_ready!(notified2.poll());
180+
assert_pending!(notified1.poll());
181+
}
182+
108183
#[test]
109184
fn notify_in_drop_after_wake() {
110185
use futures::task::ArcWake;

0 commit comments

Comments
 (0)