Simplify and speed up MemoryExec insert #6236
Merged
alamb merged 1 commit intoapache:mainfrom May 5, 2023
Merged
Conversation
alamb
commented
May 4, 2023
|
|
||
| let stream = futures::stream::unfold(state, |mut state| async move { | ||
| loop { | ||
| match state.data.next().await { |
Contributor
Author
There was a problem hiding this comment.
This version of the loop does not hold the lock at all until the end.
Contributor
Author
|
@metesynnada -- I wonder if you have some time to consider this change? |
metesynnada
reviewed
May 5, 2023
| Some(Err(e)) => return Some((Err(e), state)), | ||
| None => { | ||
| // stream is done, transfer all data to target PartitionData | ||
| state.batch.write().await.append(&mut state.buffer); |
Contributor
There was a problem hiding this comment.
I think this is logical, every stream will append once no matter what the partition counts are.
Contributor
|
LGTM, @alamb. I truly appreciate your help and effort – thank you. |
Contributor
Author
Thank you ! I am very excited to see this functionality get in. I think we are very close to being able to use datafusion to easily write out files (like parquet) as well efficiently, which will open up a large number of new usecases I think |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Draft until #6154 is mergedWhich issue does this PR close?
Follow on to #6154
Rationale for this change
We had substantial back and forth on #6154 related to locking or a single batch on each insert or for the entire operation. This required two similar, but not quite the same codepaths
Upon reflection, since everything is being buffered anyways, we can avoid a second mostly copy of the loop and instead just insert the batches once at the end of the stream.
What changes are included in this PR?
structwith namesVecand then insert them all at once when the input stream is doneAre these changes tested?
covered by existing tests
Are there any user-facing changes?