feat: add max_row_group_bytes option to WriterProperties#9357
feat: add max_row_group_bytes option to WriterProperties#9357alamb merged 15 commits intoapache:mainfrom
Conversation
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to parquet-mr's `parquet.block.size` behavior. |
There was a problem hiding this comment.
parquet-mr is just the official java implementation for parquet, you can rewrite the comment to clarify that this match the official parquet Java implementation
There was a problem hiding this comment.
Also, parquet-mr is I think now officially called "parquet-java" https://github.com/apache/parquet-java
618f003 to
0e07315
Compare
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Should we deprecate this function function?
There was a problem hiding this comment.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
There was a problem hiding this comment.
I do agree that this API should be deprecated. Thanks for pointing it out!
this is a breaking change
This is inherently not a breaking change; the purpose of marking APIs as deprecated is to warn users before making a breaking change, without actually making this change.
This PR already calls for a minor bump due to the new APIs introduced; deprecating the old one does not change the version semantics for this PR.
as clippy will fail for users
It's a rustc warning:
The deprecated attribute marks an item as deprecated. rustc will issue warnings on usage of #[deprecated] items
So unless users add -D warnings, compilation won't break.
it might be in a different pr
I don't mind either way: Leaving it here or opening a new PR for deprecating the old API. LMK what's your preference and I'll do it.
if you keep it please update the pr description under changes to users
Done!
There was a problem hiding this comment.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
Yeah, in general I agree with @yonipeleg33 -- I don't think a clippy failure is a breaking change per-se -- the rust compiler will be happy to compile it. If downstream projects want to take a more strict "clippy must pass" stance I donthink that is technically an API breakage
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
There was a problem hiding this comment.
| /// Each row is approximately 4 bytes (one i32). | |
| /// Each row is 4 bytes (one `i32`). |
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
There was a problem hiding this comment.
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); | |
| let array = Int32Array::from_iter(0..num_rows as i32); |
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
There was a problem hiding this comment.
the comment is not on the correct line
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
the comment is not on the correct line
There was a problem hiding this comment.
Done (moved to above the test function)
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
There was a problem hiding this comment.
the comment is not on the correct line (it should be on the Some(3500) one)
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { | ||
| // When both limits are set, the row limit triggers first |
There was a problem hiding this comment.
the comment is not on the correct line
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; | ||
| /// Default value for [`WriterProperties::max_row_group_size`] | ||
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size) |
There was a problem hiding this comment.
same as my other parquet-mr comment
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
There was a problem hiding this comment.
| if let Some(v) = value { | |
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | |
| } | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
Can you add a similar test when bytes win with the same structure as this, i.e. writing single large batch, but only changing the config (same test with only conf change)
There was a problem hiding this comment.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
There was a problem hiding this comment.
you don't need statistics, you can calculate it from the data types you need to encocde
There was a problem hiding this comment.
AFAICT, that beats the purpose of this configuration: Its purpose is to control the IO profile of the writer (i.e. how much and when it writes to disk), and for that, the data needs to at least be already encoded before calculating the row group size.
This is also backed by the Java source code:
it calculates memSize using columnStore.getBufferedSize(), which is documented as follows:
@return approximate size of the buffered encoded binary data
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Can you have a similar test with rows wins that have the same structure but only config change?
There was a problem hiding this comment.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
According to the comment on the method, the way you write batches should not affect, only the config, that is if the byte based got hit first, it should write that, if the row hit first it should write that.
and also, it should work regardless of how you feed the data
There was a problem hiding this comment.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to the official `parquet.block.size` behavior. |
There was a problem hiding this comment.
| /// This is similar to the official `parquet.block.size` behavior. | |
| /// This is similar to the official Java implementation `parquet.block.size` behavior. |
this is not part of the spec so there is no official about it
yonipeleg33
left a comment
There was a problem hiding this comment.
Thanks @rluvaton, PTAL
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
Done (moved to above the test function)
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
etseidl
left a comment
There was a problem hiding this comment.
Thanks @yonipeleg33. Flushing partial review for now, but I think this is looking pretty sound so far.
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is `Some(0)`. | ||
| pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self { | ||
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
There was a problem hiding this comment.
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group row count"); |
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is set to 0. | ||
| #[deprecated( | ||
| since = "57.3.0", |
There was a problem hiding this comment.
| since = "57.3.0", | |
| since = "58.0.0", |
57.3.0 has already been released
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
This constant appears to be unused. I'd vote for less clutter and get rid of it
There was a problem hiding this comment.
Removed. Good catch! (Leftover from previous implementations)
There was a problem hiding this comment.
Or should we set it?
@alamb I think not, as it changes behaviour without users opting-in for that new behaviour. None preserves the existing behaviour by default, which is no byte count limit at all.
parquet/src/file/properties.rs
Outdated
| /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
parquet/src/file/properties.rs
Outdated
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
| @@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> { | |||
| /// Encodes the provided [`RecordBatch`] | |||
| /// | |||
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |||
There was a problem hiding this comment.
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`] |
?
- Change deprecation notice to 58.0.0 - Improve wording in comments - Cleanup references to the newly deprecated API
yonipeleg33
left a comment
There was a problem hiding this comment.
Thanks @etseidl, PTAL
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Removed. Good catch! (Leftover from previous implementations)
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is `Some(0)`. | ||
| pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self { | ||
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
parquet/src/file/properties.rs
Outdated
| /// # Panics | ||
| /// If the value is set to 0. | ||
| #[deprecated( | ||
| since = "57.3.0", |
parquet/src/file/properties.rs
Outdated
| /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
parquet/src/file/properties.rs
Outdated
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
| @@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> { | |||
| /// Encodes the provided [`RecordBatch`] | |||
| /// | |||
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |||
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
| /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. | ||
| /// | ||
| /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] | ||
| pub fn max_row_group_size(&self) -> usize { |
There was a problem hiding this comment.
Given the introduction of max_row_group_count, what would you think about deprecating max_row_group_size and directing people to that new setting?
There was a problem hiding this comment.
That makes sense, as we also deprecate the corresponding setter. Done!
parquet/src/file/properties.rs
Outdated
| data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, | ||
| write_batch_size: DEFAULT_WRITE_BATCH_SIZE, | ||
| max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, | ||
| max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), |
There was a problem hiding this comment.
Could we also please align the constant name to the parameter name (eg. DEFAULT_MAX_ROW_GROUP_COUNT)
|
Thank you @yonipeleg33 -- sorry I forgot to submit my review from the other day when I reviewed this PR |
yonipeleg33
left a comment
There was a problem hiding this comment.
Thank you @yonipeleg33 -- sorry I forgot to submit my review from the other day when I reviewed this PR
Happens to the best of us 😄
Done, thanks for the review! PTAL
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
Or should we set it?
@alamb I think not, as it changes behaviour without users opting-in for that new behaviour. None preserves the existing behaviour by default, which is no byte count limit at all.
| /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. | ||
| /// | ||
| /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] | ||
| pub fn max_row_group_size(&self) -> usize { |
There was a problem hiding this comment.
That makes sense, as we also deprecate the corresponding setter. Done!
parquet/src/file/properties.rs
Outdated
| data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, | ||
| write_batch_size: DEFAULT_WRITE_BATCH_SIZE, | ||
| max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, | ||
| max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), |
etseidl
left a comment
There was a problem hiding this comment.
Thanks @yonipeleg33, this looks good to me.
|
Thanks so much for the review, guys! |
alamb
left a comment
There was a problem hiding this comment.
Looks good to me too -- thanks @yonipeleg33 @etseidl and @rluvaton
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
Yeah, in general I agree with @yonipeleg33 -- I don't think a clippy failure is a breaking change per-se -- the rust compiler will be happy to compile it. If downstream projects want to take a more strict "clippy must pass" stance I donthink that is technically an API breakage
| let a = batch.slice(0, to_write); | ||
| let b = batch.slice(to_write, batch.num_rows() - to_write); | ||
| self.write(&a)?; | ||
| return self.write(&b); |
There was a problem hiding this comment.
Since this recurses, this could potentially blow out the stack with pathalogical inputs (e.g. a RecordBatch with 1M rows with a max_row_group_count of 1). I don't think it is necessary to fix now, I just wanted to point it out
There was a problem hiding this comment.
here is a reproducer (I will file a follow on ticket)
#[test]
fn test_row_group_limit_rows_only_pathological_stack_overflow_demo() {
let schema = Arc::new(Schema::new(vec![Field::new(
"int",
ArrowDataType::Int32,
false,
)]));
let array = Int32Array::from((0..1_000_000_i32).collect::<Vec<_>>());
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(1))
.set_max_row_group_bytes(None)
.build();
let file = tempfile::tempfile().unwrap();
let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap();
// This currently recurses once per row-group split and can overflow the stack.
writer.write(&batch).unwrap();
}There was a problem hiding this comment.
I filed a ticket to track
Given the prior code path for max_row_group_size also uses recursion I don't think this is a new bug introduced by this PR (though the max bytes path is now also susceptible to the same issue
There was a problem hiding this comment.
Thanks for looking into it!
There was a problem hiding this comment.
Thanks for looking into it!
|
I made a small follow on PR to add some additional tests |
…oup_row_count,max_row_group_size} (#9387) # Which issue does this PR close? - Follow on to #9357 # Rationale for this change While reviewing this PR, I found (with codex) some additional code paths that I think would be valuable to test: 1. That you can't set `Some(0)` for the max sizes 2. Certain codepaths # What changes are included in this PR? Add tets # Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 4. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. -->
Which issue does this PR close?
This PR implements another suggestion introduced in #1213:
So not "Closes" anything new.
Rationale for this change
A best effort to match Spark's (or more specifically, Hadoop's)
parquet.block.sizeconfiguration behaviour, as documented in parquet-hadoop's README:Since arrow's parquet writer writes batches, it's inherently different than Hadoop's per-record writer behaviour - so the behaviour of
max_row_group_byteswill be different than Hadoop'sparquet.block.size, but this is the closest I could reasonably get (see details below).What changes are included in this PR?
Configuration changes
max_row_group_bytesconfiguration option inWriterPropertiesmax_row_group_sizeprivate property tomax_row_group_row_countset_max_row_group_size()andmax_row_group_size()still remain with their existing signatures.set_max_row_group_row_count()andmax_row_group_row_count()which expose theOption<usize>type.set_max_row_group_row_count(None)is called,max_row_group_size()will returnusize::MAX.Writer changes
ArrowWriter::writenow supports any combination of these two properties (row count and row bytes):Byte limit is calculated once per batch (as opposed to Hadoop's per-record calculation):
Before writing each batch, compute the average row size in bytes based on previous writes, and flush or split the batch according to that average before hitting the limit.
This means that the first batch will always be written as a whole (unless row count limit is also set).
Are these changes tested?
Yes - added unit tests to check all different combinations of these two properties being set.
Are there any user-facing changes?
Yes:
usize::MAXfrommax_row_group_size()if it was unset by the user).set_max_row_group_sizeandmax_row_group_sizeAPIs.