Skip to content

Conversation

@DDtKey
Copy link
Contributor

@DDtKey DDtKey commented Feb 22, 2024

Which issue does this PR close?

Closes #9317
Closes #6513

Disallows tokio::spawn & spawn_blocking, exceptions only in some tests

Rationale for this change

We need to provide cancel-safe interface, and preferably deny tokio::spawn at all

What changes are included in this PR?

Switch to JoinSet and removing of AbortOnDropSingle and AbortOnDropMany

Are these changes tested?

Not sure if there are any ideas how to test this, but existing tests works.

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Feb 22, 2024
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Feb 22, 2024
@DDtKey DDtKey marked this pull request as ready for review February 22, 2024 23:56
Copy link
Contributor

@devinjdangelo devinjdangelo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, thanks again for cleaning up these instances of tokio::spawn!

My one concern is that usage of JoinSet for single tasks could be confusing for a newcomer to the code without the background/context of this PR and associated issues. It also adds boilerplate handling the possibility of joining multiple tasks, when we know there will only ever be one to join. These readability concerns are a secondary concern to cancellation safety, so imo we can merge this as-is.

We could consider as follow-on work creating some abstractions to improve readability while maintaining cancellation safety. Something like the following skeleton:

/// Light wrapper around a tokio [JoinSet], which only allows 1 task to be spawned,
/// and provides semantics similar to tokio::spawn for managing a single task. This
/// provides cancelation safety for one-off async tasks.
struct OneShotJoinSet<T> {
    inner: JoinSet<T>,
    locked: bool,
}

impl<T: 'static> OneShotJoinSet<T> {
    pub fn new() -> OneShotJoinSet<T> {
        OneShotJoinSet {
            inner: JoinSet::new(),
            locked: false,
        }
    }

    pub fn spawn<F>(&mut self, task: F) -> Result<AbortHandle>
    where
        F: Future<Output = T>,
        F: Send + 'static,
        T: Send,
    {
        if self.locked {
            return internal_err!("OneShotJoinSet only allows spawning one task, but attempted to spawn multiple!");
        }
        self.locked = true;
        Ok(self.inner.spawn(task))
    }

    pub async fn join(&mut self) -> Option<Result<T, JoinError>> {
        self.inner.join_next().await
    }
}

We could also have a similar OrderedJoinSet which wraps a Vec<OneShotJoinSet> for cancellation safety when join order matters.

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 23, 2024

Yes, I completely agree and also thought about something like this approach to have a wrapper for these cases. But didn't want to mix this up with the fix.

Just one thought, I think such API is more intuitive:

struct SpawnedTask<T> {
  inner: JoinSet<T>,
}

impl<T: 'static> SpawnedTask<T> {
   // it's constructor, without `self`
   pub fn spawn<F>(task: F) -> Self
    where
        F: Future<Output = T>,
        F: Send + 'static,
        T: Send,
    {
        let mut inner = JoinSet::new();
        inner.spawn(task);
        Self(inner)
    }   
    // and the same for spawn_blocking actually
    
    pub async fn join(mut self) -> Result<T, JoinError> {
        self.inner.join_next().await.expect("instance always have 1 task")
    }
}
  • There is no way to get runtime exception for attempt to spawn one more task
  • join with owned self disallows the instance to be called several times (i.e it guarantees to have only 1 task in its lifecycle) => guaranteed on compile time, because SpawnedTask can be crated only with public methods

I'll prepare changes

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 23, 2024

Ok, I implemented a wrapper for spawned tasks. Seems reasonable to provide this right away (diff also smaller now)
It simplified the code and now we provides a good interface for newcomers

Generally, there is no need in OrderedSpawnedTasks, this would be just an alias for Vec<SpawnedTask<T>>. I.e:

  • single task: use SpawnedTask
  • many unordered: use JoinSet
  • many ordered: use Vec<SpawnedTask>

See fcf70f1

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 23, 2024

cc @alamb @tustvold

@devinjdangelo
Copy link
Contributor

The SpawnedTask abstraction looks great! Agreed that your API is more intuitive and Vec<SpawnedTask> is sufficient without an additional wrapper. Thanks again for knocking this out!

@tustvold
Copy link
Contributor

tustvold commented Feb 24, 2024

I might be missing something, but what is the issue with the AbortOnDrop interfaces? They seem like less boilerplate than the proposed solution in this PR? The SpawnedTask abstraction seems to do the same thing as AbortOnDrop, so I wonder if we can avoid this being a breaking change?

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 24, 2024

Subjectively, but I find the new interface less boilerplate - you have one wrapper which spawns and wraps the task instead of dealing with JoinHandles + AbortOnDrop + tokio::spawn

But in any case, this is not the main point here. Safety is more important. AbortOnDrop didn't provide the same guarantees, and easily can be misused.

We even may see JoinHandles were sent through channels and only then wrapped into this interface. But we can cancel the execution even before receiver part is awaited/reached/task received.
Some functions returns JoinHandle - which is kinda confusing, they spawn a task and don't care if it's wrapped safely.
Or just a lot of tasks spawned in a loop with await points in between and only then wrapped into AbortOnDropMany, so you probably never will reach the point they are wrapped.

There is no strict rules how to spawn tasks and how to work with them in the current codebase. And I personally encountered cancellation issues several times with datafusion. A mention in documentation of how to work with this just doesn't scale, we still can see sometimes it happens.

So I just believe we need to have a safe way to work with this and intuitive.

Just use SpawnedTask::spawn instead of tokio::spawn and we won't have at least obvious issues. Also compiler + clippy will prevent such code for us.

And as far as I can see it was raised even before, there is a task for that: #6513

@tustvold
Copy link
Contributor

tustvold commented Feb 24, 2024

. AbortOnDrop didn't provide the same guarantees, and easily can be misused.

How about adding an AbortOnDrop::spawn method that handles this, and potentially deprecate the methods that take a JoinHandle down the line? This would avoid making a breaking change, and is also IMO a more descriptive name for such a construction?

I dunno, I don't feel strongly, but if we can avoid overloading people with yet more new abstractions for tokio-nonsense, I think that would be better 😄

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 24, 2024

deprecate the methods that take a JoinHandle down the line

But a lot of usages were already refactored here #6750 🤔
I thought it's a target to disallow working with JoinHandles directly, it was misused too often.

We have a clippy warning and can specify to use SpawnedTask in case anybody would try to use tokio::spawn (see clippy.toml)
I guess it should improve dev experience

AbortOnDrop::spawn can work only for single task and it's kinda confusing naming to me (I mean "Drop::spawn"). When you need AbortOnDropMany in most of cases - you just need JoinSet (exception is ordering).

but if we can avoid overloading people with yet more new abstractions for tokio-nonsense

We actually reduced them here, it used to be 3 ours + JoinSet + spawn/JoinHandle itself. Now it's 2 ours (stream wrapper and SpawnedTask), plus JoinSet

Every month we have datafusion releases with breaking changes for users of the crate. Is that such important not to change internal structures which affects only devs? It has good outcomes at least for a product

@DDtKey
Copy link
Contributor Author

DDtKey commented Feb 24, 2024

The naming is definitely negotiable, I'm not making any claims to the truth with the current version.

Subjectively, it looks like this: we spawn a task and then we have SpawnedTask.
Since it's only allowed place to spawn, AbortOnDrop seems redundant to me here (there is no way to have task without this semantic). It's more like a documentation of behavior and reason of why we have such wrapper.

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing a tokio::spawn like API may increase the task handling robustness. Making the usage guarded by Clippy is quite neat. Overall, LGTM.

@alamb alamb requested a review from crepererum February 26, 2024 18:33
Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

@alamb
Copy link
Contributor

alamb commented Feb 27, 2024

Thank you @DDtKey and everyone who reviewed this PR. I thought it looks very nice and easy to use. Thank you all 🙏

@alamb alamb merged commit 14264d2 into apache:main Feb 27, 2024
DDtKey added a commit to DDtKey/arrow-datafusion that referenced this pull request Feb 29, 2024
* fix: use `JoinSet` to make spawned tasks cancel-safe

* feat: drop `AbortOnDropSingle` and `AbortOnDropMany`

* style: doc lint

* fix: ordering of the tasks in `RepartitionExec`

* fix: replace spawn_blocking with JoinSet

* style: disallow spawn methods

* fixes: preserve ordering of tasks

* style: allow spawning in tests

* chore: exclude clippy.toml from rat

* chore: typo

* feat: introduce `SpawnedTask`

* revert outdated comment

* switch to SpawnedTask missed outdated part

* doc: improve reason for disallowed-method

(cherry picked from commit 14264d2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Non cancel-safe tokio spawns Replace AbortOnDrop / AbortDropOnMany with tokio JoinSet

6 participants