Skip to content

Fixed parquet path partitioning when only selecting partitioned columns#2000

Merged
alamb merged 14 commits intoapache:masterfrom
pjmore:fix-parquet-hive-partitioning
Apr 4, 2022
Merged

Fixed parquet path partitioning when only selecting partitioned columns#2000
alamb merged 14 commits intoapache:masterfrom
pjmore:fix-parquet-hive-partitioning

Conversation

@pjmore
Copy link
Contributor

@pjmore pjmore commented Mar 12, 2022

Which issue does this PR close?

Partially #1999.

Rationale for this change

Fix behaviour when querying only partitioning columns for parquet file format.

What changes are included in this PR?

Use row group level metadata to return the correct number of partition columns.

Are there any user-facing changes?

No

@xudong963 xudong963 added the bug Something isn't working label Mar 17, 2022
Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@alamb
Copy link
Contributor

alamb commented Mar 20, 2022

cc @rdettai

@alamb
Copy link
Contributor

alamb commented Mar 23, 2022

I think this PR is waiting on responses to @rdettai

Comment on lines 460 to 499
for partitioned_file in partition {
let object_reader =
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
let mut file_rows: usize = file_reader
.metadata()
.file_metadata()
.num_rows()
.try_into()
.expect("Row count should always be greater than or equal to 0");
let remaining_rows = limit.unwrap_or(usize::MAX);
if file_rows >= remaining_rows {
file_rows = remaining_rows;
limit = Some(0);
} else if let Some(remaining_limit) = &mut limit {
*remaining_limit -= file_rows;
}

while file_rows > batch_size {
send_result(
&response_tx,
partition_column_projector
.project_empty(batch_size, &partitioned_file.partition_values),
)?;
file_rows -= batch_size;
}
if file_rows != 0 {
send_result(
&response_tx,
partition_column_projector
.project_empty(batch_size, &partitioned_file.partition_values),
)?;
}

if limit == Some(0) {
break;
}
}
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel this could be simplified and made more readable by using more iterators:

  • iterate over file
  • map them to their size
  • map each size to an iterator that repeats the batch size file_rows/batch_size times + residual
  • flat map the whole thing
  • apply limit with take(limit)
  • for_each(send)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a good way to implement what you suggested. The error handling when opening the file was the main issue that I ran into. I couldn't figure out another way to short circuit when the limit was met and short circuit on any errors that occured. If you're okay scanning all of the partition files even on an error I'm okay with it, I just figured that for remote object stores that that might be a bad idea.

    let mut res: Result<()> = Ok(());
    let mut batch_size_partition_iter = partition.iter() 
        .map(|partitioned_file|{
            let mut num_rows: usize = match object_store.file_reader(partitioned_file.file_meta.sized_file.clone()){
                Ok(object_reader) => {
                    match SerializedFileReader::new(ChunkObjectReader(object_reader)){
                        Ok(file_reader) => {
                            file_reader
                                .metadata()
                                .file_metadata()
                                .num_rows()
                                .try_into()
                                .expect("Row count should always be greater than or equal to 0 and less than usize::MAX")
                        },
                        Err(e) =>{
                            res = Err(e.into());
                            0
                        },
                    }
                },
                Err(e) => {
                    res = Err(e);
                    0
                },
            };            
            num_rows = limit.min(num_rows); 
            limit -= num_rows;
            (num_rows, partitioned_file.partition_values.as_slice())
        })
        .take_while(|(num_rows, _)| *num_rows != 0)
        .flat_map(|(num_rows, partition_values)| BatchSizeIter::new(num_rows, batch_size).zip(std::iter::repeat(partition_values)));
        Iterator::try_for_each(&mut batch_size_partition_iter,|(batch_size, partition_values)| {
            send_result(&response_tx, partition_column_projector.project_empty(batch_size, partition_values))
        })?;
        res?;
        Ok(())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, error management in iterators can quickly become annoying! Then I think the version with loop is fine for now.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdettai / @tustvold would you like to review this PR again?

Copy link
Contributor

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the amount of repetition in read_partition_no_file_columns now reached a very satisfying level. @alamb do you agree?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thank you @pjmore and @rdettai

@alamb
Copy link
Contributor

alamb commented Apr 3, 2022

Looks like it just needs some updating to resolve conflicts. @pjmore I am happy to do so, let me know if you would like me to

@pjmore
Copy link
Contributor Author

pjmore commented Apr 3, 2022

@alamb I had some extra test cases to add for the limit logic so I just fixed the conflicts then. Should be good to go now!

@alamb
Copy link
Contributor

alamb commented Apr 4, 2022

Thanks @pjmore -- epic work 👍

@tustvold
Copy link
Contributor

I've created apache/arrow-rs#1537 to track pushing this functionality upstream, as I think it is generally useful. I will try to bash it out if I have some spare cycles.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants