Conversation
liurenjie1024
left a comment
There was a problem hiding this comment.
There are two kinds of writers in iceberg:
- Plain position delete writer: https://github.com/apache/iceberg/blob/da2ad389fd9ba8222f6fb3f57922209c239a7045/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java#L49
- Sorting position delete writer:
https://github.com/apache/iceberg/blob/da2ad389fd9ba8222f6fb3f57922209c239a7045/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java#L49
It seems that this pr tries to implement 2, while there are some missing part there. I would suggest to implement 1 first as it's easier, what do you think?
crates/iceberg/src/arrow/schema.rs
Outdated
|
|
||
| /// Convert iceberg field to an arrow field. | ||
| pub fn field_to_arrow_field(field: &crate::spec::NestedFieldRef) -> Result<FieldRef> { | ||
| let mut converter = ToArrowSchemaConverter; |
There was a problem hiding this comment.
The implementation is a little hack to me. How about just create a one field schema, and convert it using arrow schema, then get the result?
| fn write<'life0, 'async_trait>( | ||
| &'life0 mut self, | ||
| input: PositionDeleteInput<'a>, | ||
| ) -> ::core::pin::Pin< | ||
| Box<dyn ::core::future::Future<Output = Result<()>> + ::core::marker::Send + 'async_trait>, | ||
| > | ||
| where | ||
| 'life0: 'async_trait, | ||
| Self: 'async_trait, |
There was a problem hiding this comment.
Please remove these auto generated lifetime markers and prefix of types
There was a problem hiding this comment.
For here we use a sync version so that seems we need to explicitly declare these auto-generated lifetime.
The reason we need the sync version is that the input takes the reference like: struct PositionDeleteInput<'a> , we need to explicitly convert it into a record batch in the sync function part and then return a async future to write this record batch.
| } | ||
|
|
||
| /// The memory position delete writer. | ||
| pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> { |
There was a problem hiding this comment.
| pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> { | |
| pub struct PositionDeleteWriter<B: FileWriterBuilder> { |
I don't think we should add a Memory prefix here since it make people feel that we are storing everything in memory, and it applies to all structs.
|
I think we can resolve #741 first before this PR. |
|
@ZENOTME Are you still working on this? I'm looking to work on one of the two writers |
Sorry for the late, I will work on this later. Would you like to work on sorting position delete writer after this PR? |
|
Hi @liurenjie1024 @jonathanc-n. I have fixed this PR. It's ready for review. |
jonathanc-n
left a comment
There was a problem hiding this comment.
Overall lgtm! I can get started on the sorting position delete writer next after merge
| async fn write(&mut self, input: Vec<PositionDeleteInput>) -> Result<()> { | ||
| let mut path_column_builder = StringBuilder::new(); | ||
| let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new(); | ||
| for input in input.into_iter() { |
There was a problem hiding this comment.
Change variable here? ex. pd_input
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @ZENOTME for this pr, generally LGTM, left some minor suggestions.
| 2147483546, | ||
| "file_path", |
There was a problem hiding this comment.
Please make these constants.
| 2147483545, | ||
| "pos", |
| /// The offset of the position delete. | ||
| pub offsets: Vec<i64>, |
There was a problem hiding this comment.
| /// The offset of the position delete. | |
| pub offsets: Vec<i64>, | |
| /// The row number in data file.. | |
| pub row: i64, |
There was a problem hiding this comment.
We should not ask user to think about the container.
| #[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)] | ||
| pub struct PositionDeleteInput { | ||
| /// The path of the file. | ||
| pub path: String, |
There was a problem hiding this comment.
| pub path: String, | |
| pub path: &'a str, |
| } | ||
|
|
||
| /// Position delete writer. | ||
| pub struct PositionDeleteWriter<B: FileWriterBuilder> { |
There was a problem hiding this comment.
We should buffer in memory about for the input row number.
There was a problem hiding this comment.
Do you mean that for PositionDeleteInput, we should buffer them and write them as a batch?🤔 E.g.
pub struct PositionDeleteWriter {
// path -> row_num
buffer: HashMap<String, Vec<i64>>
}
For here I don't add the buffer because we will add SortPositionDeleteWriter later and it will buffer the input and sort them, so I'm not sure whether we need to add a buffer here. Or we can let it be a optional choice?
There was a problem hiding this comment.
Oh, sorry, I forget this is not sorting position delete writer. Please add comments like java implementation here.
| partition_value: Struct, | ||
| } | ||
|
|
||
| impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>> |
There was a problem hiding this comment.
We can't simply this code using #[async_trait] because PositionDeleteInput take the reference, so in here we should convert them into RecordBatch first in sync code and then return a async function to write them. cc @liurenjie1024
There was a problem hiding this comment.
Oh, sorry, I forgot about this. Maybe we could change PositionDeleteInput to like this:
struct PositionDeleteInput {
path: Arc<str>,
pos: u64
}There was a problem hiding this comment.
Does this cause too much overhead for large input? We need to call clone for each row.🤔
There was a problem hiding this comment.
Yes, that's a trade off. But this approach makes this trait difficult to read and maintain.
There was a problem hiding this comment.
How about using the interface to hide containers used like the following?
struct PositionDeleteVec {
path: String,
pos: Vec<usize>
}
impl PositionDeleteVec {
pub fn new(path: String, pos: into Iterator<usize>) -> Self {}
}
There was a problem hiding this comment.
Sounds reasonable to me, user is free to choose how to buffer outside.
797379e to
391a061
Compare
#704 fail in msrv check and I find that's because `cargo update faststr` will update the munge to `0.4.2` instead of `0.4.1`. The simple fix way is to specify the precise version of munge. But I'm not sure whether it's good practice here. Do you have any suggestions for this? cc @Xuanwo @xxchan Co-authored-by: ZENOTME <[email protected]>
liurenjie1024
left a comment
There was a problem hiding this comment.
Thanks @ZENOTME for this pr, left some comments!
| } | ||
|
|
||
| /// Position delete writer. | ||
| pub struct PositionDeleteWriter<B: FileWriterBuilder> { |
There was a problem hiding this comment.
Oh, sorry, I forget this is not sorting position delete writer. Please add comments like java implementation here.
| use crate::writer::{IcebergWriter, IcebergWriterBuilder}; | ||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
||
| const POS_DELETE_FIELD1_NAME: &str = "file_path"; |
There was a problem hiding this comment.
Could we organize codes like
?const DELETE_FILE_PATH: Lazy<NestedFieldRef> = ...
const DELETE_FILE_POS: Lazy<NestedFieldRef> = ...
| partition_value: Struct, | ||
| } | ||
|
|
||
| impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>> |
There was a problem hiding this comment.
Oh, sorry, I forgot about this. Maybe we could change PositionDeleteInput to like this:
struct PositionDeleteInput {
path: Arc<str>,
pos: u64
}|
hi @liurenjie1024, is there other improvement for this PR? I think it's ready to go. |
|
@liurenjie1024 @Xuanwo Any more comments? Shall we merge this PR? |
It has been a long time and I need to take a review again. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
Complete #340