Support SortMergeJoin spilling#11218
Conversation
|
all existing spilling tests are okay, I will add 3 more tests to test the spilling |
|
Multi batch spill tests still fails |
|
All initial tests passed, I'm planning to add more tests related to result correctness in separate PR |
| "Spill file {:?} does not exist", | ||
| spill.path() | ||
| ))); | ||
| return internal_err!("Spill file {:?} does not exist", spill.path()); |
|
I will review this in next few days. |
| TestCase::new() | ||
| .with_query( | ||
| "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", | ||
| ) | ||
| .with_memory_limit(1_000) | ||
| .with_config(config) | ||
| .with_disk_manager_config(DiskManagerConfig::NewOs) | ||
| .run() | ||
| .await |
There was a problem hiding this comment.
I wonder how do we know if it triggers spilling or not?
There was a problem hiding this comment.
Yeah, that is great idea, I was overthinking how to check that file spilled to disk but metrics is much easier, I'm adding it
There was a problem hiding this comment.
@viirya I added metrics tests in sort_merge_join.rs like https://github.com/apache/datafusion/pull/11218/files#diff-825342e035aec56595dce761afb00dd54e3ae663a2e24ebf3a597123e636f9e2R3140
For this exact test which runs on SQL level I'm thinking if I can access metrics some how
There was a problem hiding this comment.
It doesnt seem possible to access any metrics in this case. We can rely that if test with disabled spill is failing on mem issues, then the same test with enabled spilling is passing. Hope that is enough
|
I plan to review this PR later today -- sorry for the delay |
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn sort_merge_join_spill() { |
There was a problem hiding this comment.
This test case can only make sure the query can run, it may or may not be spilling.
We should have some ways to verify the spilling is actually happened.
There was a problem hiding this comment.
unfortunately exactly this test case we cannot access any spilling metrics, but there is another test above sort_merge_join_no_spill which is exactly the same but expectedly fails by mem issue and have the spilling disabled explicitly. This test passes without issues and with spilling enabled so we can conclude the spilling happened.
| self.join_type, | ||
| on, | ||
| self.filter | ||
| .as_ref() | ||
| .map(|f| format!(", filter={}", f.expression())) | ||
| .unwrap_or("".to_string()) |
There was a problem hiding this comment.
inlined the display filter and changed map_or_else to map with default
| if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() { | ||
| self.reservation | ||
| .try_shrink(buffered_batch.size_estimation)?; | ||
| } |
There was a problem hiding this comment.
We should also handle else cases, i.e., spilling file is Some and batch is also Some, and both are None, etc.
There was a problem hiding this comment.
I think those cases are not possible but the current code doesn't make that clear
Here is a proposal that I think makes it clearer what states are possible: comphead#297
| // If the batch was spilled to disk, less likely | ||
| (Some(spill_file), None) => { | ||
| let mut buffered_cols: Vec<ArrayRef> = | ||
| Vec::with_capacity(buffered_indices.len()); |
There was a problem hiding this comment.
buffered_indices.len() is the length of arrays. I think the capacity should be the number of columns of the batch.
There was a problem hiding this comment.
the should be the same right? the take kernel will check the bounds
| /// Spill the `RecordBatch` to disk as smaller batches | ||
| /// split by `batch_size_rows` | ||
| /// Return `total_rows` what is spilled | ||
| pub fn spill_record_batch_by_size( |
There was a problem hiding this comment.
Where is this function used other than in test? I don't find it.
There was a problem hiding this comment.
I think it may be left over from an earlier version of this PR
There was a problem hiding this comment.
Yes, Im planning to keep it and reuse it in row_hash in following PR, basically the subbatch slicing is from row_hash.rs
|
Thanks @viirya for your review, I'll address the comments today/tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Thank you @comphead and @viirya
I think this code is now correct, though I also think it could be improved (both with the comments from @viirya , my suggestion in comphead#297 as well as more testing)
Specifically, for testing, given the subtlety of the code involved I am not 100% sure it works for all corner cases. I suggest (as a follow on) we invest in fuzz testing both for SMJ in general as well as for spilling SMJ
I think in particular, making sure we adjust the random inputs to have different numbers of repeated values (as the code in this PR is only going to be exercised when there are many of the same join keys I think)
| /// Spill the `RecordBatch` to disk as smaller batches | ||
| /// split by `batch_size_rows` | ||
| /// Return `total_rows` what is spilled | ||
| pub fn spill_record_batch_by_size( |
There was a problem hiding this comment.
I think it may be left over from an earlier version of this PR
| struct BufferedBatch { | ||
| /// The buffered record batch | ||
| pub batch: RecordBatch, | ||
| pub batch: Option<RecordBatch>, |
There was a problem hiding this comment.
While reviewing this PR, I found having to reason about what the valid batch or spill_file combinations was confusing (like there is an invariant I think that they can't both be Some)
Rather than use two fields, I tried making an enum that encoded the state and I thought it was easier to reason about. Here is a proposal here: comphead#297
There was a problem hiding this comment.
I think its great idea. I'll include this to follow up to simplify double option check in favor of enum.
| if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() { | ||
| self.reservation | ||
| .try_shrink(buffered_batch.size_estimation)?; | ||
| } |
There was a problem hiding this comment.
I think those cases are not possible but the current code doesn't make that clear
Here is a proposal that I think makes it clearer what states are possible: comphead#297
Filed #11541 |
* Support SortMerge spilling
* Support SortMerge spilling
Which issue does this PR close?
Closes #9359 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?