-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix panic propagation in CoalescePartitions, consolidates panic propagation into RecordBatchReceiverStream
#6507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Another try for fixing apache#3104. RepartitionExec might need a similar fix.
| } | ||
| } | ||
|
|
||
| struct MergeStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically taught RecordBatchReceiverStream how to propagate panics and then updated CoalescePartitionsExec to use it
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new test from @nvartolomei
d887201 to
fe8b82d
Compare
b9535e7 to
e1c827a
Compare
|
|
||
| /// Stream wrapper that records `BaselineMetrics` for a particular | ||
| /// partition | ||
| pub(crate) struct ObservedStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from Union so it can be reused
| let num_partitions = 2; | ||
| let input = PanicingExec::new(schema.clone(), num_partitions) | ||
| .with_partition_panic(0, 10) | ||
| .with_partition_panic(1, 3); // partition 1 should panic first (after 3 ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a test showing that when the second partition panic's it is properly reported
| } | ||
| } | ||
|
|
||
| /// Stream wrapper that records `BaselineMetrics` for a particular |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
| // parallelism. | ||
| let (sender, receiver) = | ||
| mpsc::channel::<Result<RecordBatch>>(input_partitions); | ||
| let mut builder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased that this is now all encapsulated into RecordBatchReceiverStream
CoalescePartitions, consolidates panic propagation into RecordBatchReceiverStream
| // unwrap Option / only return the error | ||
| .filter_map(|item| async move { item }); | ||
|
|
||
| let inner = ReceiverStream::new(rx).chain(check_stream).boxed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks much better than mine. Wondering if it can be improved further and if it makes sense at all.
With this implementation, the panics will be propagated only after all input (from other partitions is consumed). Probably fine, as this shouldn't happen during normal operation and is more of a correctness check. Also, the check future will not make any progress up until all the inputs are exhausted. Shouldn't be much work, fine for it to be sequential.
As an alternative, what if we build a "supervisor" task (tokio::spawn) which is launched to do all that work, and then in the check_stream we just check the JoinHandle of the "supervisor" task? This way the supervisor task will be able to make progress concurrently and panic/report errors early.
Thought about this after looking at RepartitionStream which would need something similar (supervisor) to get task failures and then multiplex them to all the output partitions. Then, all "output streams" would only have to ensure that the supervisor didn't die. Currently, in RepartitionStream there are |output partitions| "supervisors" (wait_for_task) which aren't checked for success either. Wondering if it could fail at all though (tokio-rs/tokio#5744).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.take_while is possibly a better way to formulate this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation, the panics will be propagated only after all input
This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to tx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on tests for this behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I fixed this in b1a817c
After trying several other approaches, I found https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.merge which did exactly what I wanted 💯
It is tested in record_batch_receiver_stream_propagates_panics_early_shutdown
|
|
||
| while let Some(item) = stream.next().await { | ||
| // If send fails, plan being torn down, | ||
| // there is no place to send the error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also short-circuit if item is an error, I think it will currently drive execution to completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 56a26eb
Tested in record_batch_receiver_stream_error_does_not_drive_completion
tustvold
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like where this is headed, left some comments to potentially improve it further
crepererum
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to be a bit more eager w/ error reporting, but this is at least better than the status quo.
| // unwrap Option / only return the error | ||
| .filter_map(|item| async move { item }); | ||
|
|
||
| let inner = ReceiverStream::new(rx).chain(check_stream).boxed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation, the panics will be propagated only after all input
This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to tx.
|
I plan to work on improving the panic checks to be more eager later today |
Co-authored-by: Raphael Taylor-Davies <[email protected]>
…sion into alamb/propagate_error
|
I believe I have resolved all outstanding comments in this PR. Please take another look if you have time |
tustvold
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, left some minor comments you can take or leave
|
|
||
| // Merge the streams together so whichever is ready first | ||
| // produces the batch (since futures::stream:StreamExt is | ||
| // already in scope, need to call it explicitly) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW https://docs.rs/futures/latest/futures/stream/fn.select.html is the futures crate version of this, not sure if there is a material difference between the two impls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in fb17af8 -- I didn't see select. TIL!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it also inspired me to do #6565)
| // the JoinSet were aborted, which in turn | ||
| // would imply that the receiver has been | ||
| // dropped and this code is not running | ||
| return Some(Err(DataFusionError::Internal(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is unreachable (which I'm fairly certain it is) I'm not sure why we don't just panic here, making this future infallible and therefore an ideal candidate for https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take_until
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't studied the tokio JoinHandle code or under what conditions it currently or in the future might return an error (like if the task is canceled in some way will it error??) .
Given that the API returns an error I think handling and propagating the error is the most future proof thing to do.
|
great work @alamb! 👏 |
Thank you @nvartolomei for starting the process (and providing the tests!) |
Which issue does this PR close?
Based on #6449 from @nvartolomei
Closes #3104
Closes #6449
Rationale for this change
I wanted to centralize the logic for propagating
panics from tasks.What changes are included in this PR?
RecordBatchReceiverStreamBuilderwhich handles doing the abort-on-drop dance using tokio::task::JoinSet as shown by @nvartolomeiCoalsceExecandAnalyzeExecto use this builderAre these changes tested?
Yes
Are there any user-facing changes?
Yes, panic's are not ignored