-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix: use JoinSet to make spawned tasks cancel-safe
#9318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
devinjdangelo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, thanks again for cleaning up these instances of tokio::spawn!
My one concern is that usage of JoinSet for single tasks could be confusing for a newcomer to the code without the background/context of this PR and associated issues. It also adds boilerplate handling the possibility of joining multiple tasks, when we know there will only ever be one to join. These readability concerns are a secondary concern to cancellation safety, so imo we can merge this as-is.
We could consider as follow-on work creating some abstractions to improve readability while maintaining cancellation safety. Something like the following skeleton:
/// Light wrapper around a tokio [JoinSet], which only allows 1 task to be spawned,
/// and provides semantics similar to tokio::spawn for managing a single task. This
/// provides cancelation safety for one-off async tasks.
struct OneShotJoinSet<T> {
inner: JoinSet<T>,
locked: bool,
}
impl<T: 'static> OneShotJoinSet<T> {
pub fn new() -> OneShotJoinSet<T> {
OneShotJoinSet {
inner: JoinSet::new(),
locked: false,
}
}
pub fn spawn<F>(&mut self, task: F) -> Result<AbortHandle>
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
if self.locked {
return internal_err!("OneShotJoinSet only allows spawning one task, but attempted to spawn multiple!");
}
self.locked = true;
Ok(self.inner.spawn(task))
}
pub async fn join(&mut self) -> Option<Result<T, JoinError>> {
self.inner.join_next().await
}
}We could also have a similar OrderedJoinSet which wraps a Vec<OneShotJoinSet> for cancellation safety when join order matters.
|
Yes, I completely agree and also thought about something like this approach to have a wrapper for these cases. But didn't want to mix this up with the fix. Just one thought, I think such API is more intuitive: struct SpawnedTask<T> {
inner: JoinSet<T>,
}
impl<T: 'static> SpawnedTask<T> {
// it's constructor, without `self`
pub fn spawn<F>(task: F) -> Self
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
let mut inner = JoinSet::new();
inner.spawn(task);
Self(inner)
}
// and the same for spawn_blocking actually
pub async fn join(mut self) -> Result<T, JoinError> {
self.inner.join_next().await.expect("instance always have 1 task")
}
}
I'll prepare changes |
|
Ok, I implemented a wrapper for spawned tasks. Seems reasonable to provide this right away (diff also smaller now) Generally, there is no need in
See fcf70f1 |
|
The |
|
I might be missing something, but what is the issue with the AbortOnDrop interfaces? They seem like less boilerplate than the proposed solution in this PR? The SpawnedTask abstraction seems to do the same thing as AbortOnDrop, so I wonder if we can avoid this being a breaking change? |
|
Subjectively, but I find the new interface less boilerplate - you have one wrapper which spawns and wraps the task instead of dealing with But in any case, this is not the main point here. Safety is more important. We even may see There is no strict rules how to spawn tasks and how to work with them in the current codebase. And I personally encountered cancellation issues several times with datafusion. A mention in documentation of how to work with this just doesn't scale, we still can see sometimes it happens. So I just believe we need to have a safe way to work with this and intuitive. Just use And as far as I can see it was raised even before, there is a task for that: #6513 |
How about adding an I dunno, I don't feel strongly, but if we can avoid overloading people with yet more new abstractions for tokio-nonsense, I think that would be better 😄 |
But a lot of usages were already refactored here #6750 🤔 We have a clippy warning and can specify to use
We actually reduced them here, it used to be 3 ours + Every month we have |
|
The naming is definitely negotiable, I'm not making any claims to the truth with the current version. Subjectively, it looks like this: we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing a tokio::spawn like API may increase the task handling robustness. Making the usage guarded by Clippy is quite neat. Overall, LGTM.
crepererum
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
|
Thank you @DDtKey and everyone who reviewed this PR. I thought it looks very nice and easy to use. Thank you all 🙏 |
* fix: use `JoinSet` to make spawned tasks cancel-safe * feat: drop `AbortOnDropSingle` and `AbortOnDropMany` * style: doc lint * fix: ordering of the tasks in `RepartitionExec` * fix: replace spawn_blocking with JoinSet * style: disallow spawn methods * fixes: preserve ordering of tasks * style: allow spawning in tests * chore: exclude clippy.toml from rat * chore: typo * feat: introduce `SpawnedTask` * revert outdated comment * switch to SpawnedTask missed outdated part * doc: improve reason for disallowed-method (cherry picked from commit 14264d2)
Which issue does this PR close?
Closes #9317
Closes #6513
Disallows
tokio::spawn&spawn_blocking, exceptions only in some testsRationale for this change
We need to provide cancel-safe interface, and preferably deny
tokio::spawnat allWhat changes are included in this PR?
Switch to
JoinSetand removing ofAbortOnDropSingleandAbortOnDropManyAre these changes tested?
Not sure if there are any ideas how to test this, but existing tests works.
Are there any user-facing changes?
No