Partition Writer Support Part 1: add partition splitter#1040
Partition Writer Support Part 1: add partition splitter#1040liurenjie1024 merged 2 commits intoapache:mainfrom
Conversation
|
This PR may conflict with #1014. But I'm not sure whether #1014 can be reviewed and merge recently. I'm ok to merge any one first and I will fix the conflict later. cc @liurenjie1024 @jonathanc-n @Fokko @Xuanwo @sdd |
4ac8823 to
7bc64bb
Compare
|
@ZENOTME I should be able to take a look at this tomorrow |
jonathanc-n
left a comment
There was a problem hiding this comment.
small nits, I don't know if worth to even change if this is just temporary code.
| // # TODO | ||
| // Remove this after partition writer supported. | ||
| #[allow(dead_code)] | ||
| pub struct RecordBatchPartitionsplitter { |
There was a problem hiding this comment.
| pub struct RecordBatchPartitionsplitter { | |
| pub struct RecordBatchPartitionSplitter { |
| partition_batches.push(( | ||
| row, | ||
| filter_record_batch(batch, &filter_array) | ||
| .expect("We should guarantee the filter array is valid"), |
There was a problem hiding this comment.
prefer to propogate error instead of expect
7bc64bb to
f60e82d
Compare
|
This PR is ready to review. cc @liurenjie1024 @Fokko @Xuanwo @sdd |
|
Hi @jonathanc-n & @ZENOTME |
|
@ranjanankur314 I will try my best to give it another review today. At the end of the day we will need a committer's review |
e91af9e to
a482ff3
Compare
|
Hi, I think this PR is ready to review. cc @liurenjie1024 @Xuanwo @Fokko @kevinjqliu @sdd |
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @ZENOTME for this pr!
| } | ||
|
|
||
| /// Split the record batch into multiple record batches according to provided partition columns. | ||
| pub fn split_by_partition( |
There was a problem hiding this comment.
Why this have to be pub?
| &self, | ||
| batch: &RecordBatch, | ||
| partition_columns: &[ArrayRef], | ||
| ) -> Result<Vec<(OwnedRow, RecordBatch)>> { |
There was a problem hiding this comment.
Should we use Struct to replace OwnedRow? I assume the partitioned writer needs Struct?
There was a problem hiding this comment.
Good catch! I noticed that out Struct support Hash, so we don't need OwnedRow. I have remove them and use Struct directlty.
| } | ||
|
|
||
| /// Convert row back to iceberg value. | ||
| pub fn convert_row(&self, rows: Vec<OwnedRow>) -> Result<Vec<Struct>> { |
There was a problem hiding this comment.
Do we really need this function? Why not just convert partition value columnar batch into Vec<Struct>
a482ff3 to
c0d5a12
Compare
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @ZENOTME for this pr! Generally LGTM, just one nit.
| let partition_type = self.partition_spec.partition_type(&self.schema)?; | ||
| let partition_arrow_type = type_to_arrow_type(&Type::Struct(partition_type.clone()))?; |
There was a problem hiding this comment.
Do we need to do this conversion for each batch?
eabc8d0 to
8a62fe2
Compare
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @ZENOTME for this pr!
Which issue does this PR close?
This PR is part 1 to close #342.
What changes are included in this PR?
The partition writer support will be separate into three PR:
This PR is the first part.
Are these changes tested?