Fixed parquet path partitioning when only selecting partitioned columns#2000
Fixed parquet path partitioning when only selecting partitioned columns#2000alamb merged 14 commits intoapache:masterfrom pjmore:fix-parquet-hive-partitioning
Conversation
|
cc @rdettai |
|
I think this PR is waiting on responses to @rdettai |
…rmine number of rows to emit
| for partitioned_file in partition { | ||
| let object_reader = | ||
| object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; | ||
| let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?; | ||
| let mut file_rows: usize = file_reader | ||
| .metadata() | ||
| .file_metadata() | ||
| .num_rows() | ||
| .try_into() | ||
| .expect("Row count should always be greater than or equal to 0"); | ||
| let remaining_rows = limit.unwrap_or(usize::MAX); | ||
| if file_rows >= remaining_rows { | ||
| file_rows = remaining_rows; | ||
| limit = Some(0); | ||
| } else if let Some(remaining_limit) = &mut limit { | ||
| *remaining_limit -= file_rows; | ||
| } | ||
|
|
||
| while file_rows > batch_size { | ||
| send_result( | ||
| &response_tx, | ||
| partition_column_projector | ||
| .project_empty(batch_size, &partitioned_file.partition_values), | ||
| )?; | ||
| file_rows -= batch_size; | ||
| } | ||
| if file_rows != 0 { | ||
| send_result( | ||
| &response_tx, | ||
| partition_column_projector | ||
| .project_empty(batch_size, &partitioned_file.partition_values), | ||
| )?; | ||
| } | ||
|
|
||
| if limit == Some(0) { | ||
| break; | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
I still feel this could be simplified and made more readable by using more iterators:
- iterate over file
- map them to their size
- map each size to an iterator that repeats the batch size file_rows/batch_size times + residual
- flat map the whole thing
- apply limit with
take(limit) - for_each(send)
There was a problem hiding this comment.
I couldn't find a good way to implement what you suggested. The error handling when opening the file was the main issue that I ran into. I couldn't figure out another way to short circuit when the limit was met and short circuit on any errors that occured. If you're okay scanning all of the partition files even on an error I'm okay with it, I just figured that for remote object stores that that might be a bad idea.
let mut res: Result<()> = Ok(());
let mut batch_size_partition_iter = partition.iter()
.map(|partitioned_file|{
let mut num_rows: usize = match object_store.file_reader(partitioned_file.file_meta.sized_file.clone()){
Ok(object_reader) => {
match SerializedFileReader::new(ChunkObjectReader(object_reader)){
Ok(file_reader) => {
file_reader
.metadata()
.file_metadata()
.num_rows()
.try_into()
.expect("Row count should always be greater than or equal to 0 and less than usize::MAX")
},
Err(e) =>{
res = Err(e.into());
0
},
}
},
Err(e) => {
res = Err(e);
0
},
};
num_rows = limit.min(num_rows);
limit -= num_rows;
(num_rows, partitioned_file.partition_values.as_slice())
})
.take_while(|(num_rows, _)| *num_rows != 0)
.flat_map(|(num_rows, partition_values)| BatchSizeIter::new(num_rows, batch_size).zip(std::iter::repeat(partition_values)));
Iterator::try_for_each(&mut batch_size_partition_iter,|(batch_size, partition_values)| {
send_result(&response_tx, partition_column_projector.project_empty(batch_size, partition_values))
})?;
res?;
Ok(())
There was a problem hiding this comment.
Right, error management in iterators can quickly become annoying! Then I think the version with loop is fine for now.
…olumns and reuse partition record batch
|
Looks like it just needs some updating to resolve conflicts. @pjmore I am happy to do so, let me know if you would like me to |
|
@alamb I had some extra test cases to add for the limit logic so I just fixed the conflicts then. Should be good to go now! |
|
Thanks @pjmore -- epic work 👍 |
|
I've created apache/arrow-rs#1537 to track pushing this functionality upstream, as I think it is generally useful. I will try to bash it out if I have some spare cycles. |
Which issue does this PR close?
Partially #1999.
Rationale for this change
Fix behaviour when querying only partitioning columns for parquet file format.
What changes are included in this PR?
Use row group level metadata to return the correct number of partition columns.
Are there any user-facing changes?
No