Skip to content

Comments

Simplify and speed up MemoryExec insert #6236

Merged
alamb merged 1 commit intoapache:mainfrom
alamb:alamb/simplify_exec
May 5, 2023
Merged

Simplify and speed up MemoryExec insert #6236
alamb merged 1 commit intoapache:mainfrom
alamb:alamb/simplify_exec

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented May 4, 2023

Draft until #6154 is merged

Which issue does this PR close?

Follow on to #6154

Rationale for this change

We had substantial back and forth on #6154 related to locking or a single batch on each insert or for the entire operation. This required two similar, but not quite the same codepaths

Upon reflection, since everything is being buffered anyways, we can avoid a second mostly copy of the loop and instead just insert the batches once at the end of the stream.

What changes are included in this PR?

  1. Pull state into a struct with names
  2. Buffer batches in a separate Vec and then insert them all at once when the input stream is done
  3. Remove the second mostly copy

Are these changes tested?

covered by existing tests

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label May 4, 2023
@alamb alamb force-pushed the alamb/simplify_exec branch from ebc7c1a to 7feef4e Compare May 4, 2023 20:26

let stream = futures::stream::unfold(state, |mut state| async move {
loop {
match state.data.next().await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version of the loop does not hold the lock at all until the end.

@alamb alamb marked this pull request as ready for review May 4, 2023 20:27
@alamb alamb mentioned this pull request May 4, 2023
@alamb
Copy link
Contributor Author

alamb commented May 5, 2023

@metesynnada -- I wonder if you have some time to consider this change?

Some(Err(e)) => return Some((Err(e), state)),
None => {
// stream is done, transfer all data to target PartitionData
state.batch.write().await.append(&mut state.buffer);
Copy link
Contributor

@metesynnada metesynnada May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is logical, every stream will append once no matter what the partition counts are.

@metesynnada
Copy link
Contributor

LGTM, @alamb. I truly appreciate your help and effort – thank you.

@alamb
Copy link
Contributor Author

alamb commented May 5, 2023

LGTM, @alamb. I truly appreciate your help and effort – thank you.

Thank you ! I am very excited to see this functionality get in. I think we are very close to being able to use datafusion to easily write out files (like parquet) as well efficiently, which will open up a large number of new usecases I think

@alamb alamb merged commit 63026b3 into apache:main May 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants