Conversation
e79ecf8 to
7dbd4d6
Compare
7dbd4d6 to
ede5115
Compare
arrow/src/ipc/writer.rs
Outdated
| }; | ||
| let len = data.len() as i64; | ||
| // TODO: don't need to pad each buffer, and just need to pad the tail of the message body | ||
| // let pad_len = pad_to_8(len as u32); |
There was a problem hiding this comment.
There was a problem hiding this comment.
If this make sense to you, I will drop these commented code.
Codecov Report
@@ Coverage Diff @@
## master #1855 +/- ##
==========================================
+ Coverage 83.42% 83.47% +0.04%
==========================================
Files 214 215 +1
Lines 57015 57286 +271
==========================================
+ Hits 47567 47819 +252
- Misses 9448 9467 +19
Continue to review full report at Codecov.
|
|
hi @andygrove @alamb , I have implemented the IPC compression, but the ci on AMD 32 fails. |
|
After this pr merged, I will enable the IT test for 2.0.0-compression |
|
The integration test was failed from https://github.com/apache/arrow-rs/runs/6952125461?check_suite_focus=true#step:6:15070 I'm confused about why we need to make 8 bytes aligned for each buffer |
@martin-g Could you please take a look this ci problem? |
I think the issue has been resolved |
|
Thanks @liukun4515 -- I will try and find time tomorrow to review this PR |
arrow/Cargo.toml
Outdated
|
|
||
| [features] | ||
| default = ["csv", "ipc", "test_utils"] | ||
| default = ["csv", "ipc", "test_utils", "zstd", "lz4"] |
There was a problem hiding this comment.
So by default we will include zstd, lz4 dependencies? If we don't use ipc or don't use compressed ipc, seems the dependencies are not necessary?
There was a problem hiding this comment.
I agree -- I suggest something like
| default = ["csv", "ipc", "test_utils", "zstd", "lz4"] | |
| default = ["csv", "ipc", "test_utils"] |
There was a problem hiding this comment.
Yes, include the dependency by default.
buffer compression protocol is common protocol for all languages in the arrow ecosystem, if we use the compression/decompression as optional, we can read the file or stream from compressed side.
2.0.0 compression has been implemented in the Java, C++ and other languages, If your rust server receiver a message compressed by the protocol, we can read them by default.
There was a problem hiding this comment.
I'd be in favour of supporting adding the deps toipc but not creating ipc_compression separately. The inconvenience of a larger binary is a lesser problem than the runtime error of not supporting compression by omission.
The latter would need a rebuild.
There was a problem hiding this comment.
I have encountered an issue on cross-compiling lz4 crate on some platform. We don't use ipc so we can choose not to include ipc feature at all. I'm wondering if ipc compression can't be excluded, there might be some issues like that. Although this sounds like corner case.
There was a problem hiding this comment.
I have add the feature ipc_compression = "lz4,zstd" as an option, if you want to compile with the ipc compression feature.
arrow/src/ipc/reader.rs
Outdated
| // TODO consider the error result | ||
| #[cfg(any(feature = "zstd,lz4", test))] |
There was a problem hiding this comment.
Hmm, if these features are not used, what will it happened? I think an explicit error is necessary.
There was a problem hiding this comment.
agree with you, @viirya . I have no idea and how to control this.
Could you please give me some suggestion or advice?
There was a problem hiding this comment.
I follow the #[cfg(any(feature usage like other model to resolve the error from the CI
cargo build --features=csv,ipc,simd,lz4,zstd --target wasm32-unknown-unknown
If I don't use this method, I can't pass the ci for target wasm32-unknown-unknown
I search the reason and find the issue #180 and #180 which have the same problem.
@alamb
| }) | ||
| } | ||
| } | ||
| z => panic!("Unsupported ipc::MetadataVersion {:?}", z), |
There was a problem hiding this comment.
ipc::MetadataVersion::V4 seems also hitting this error? But actually there is CompressionCodecType::NoCompression indicating no compression is used.
There was a problem hiding this comment.
Also given that this function returns a Result it seems like we could return a proper error here rather than panicing
There was a problem hiding this comment.
I just copy some code from try_new.
try_new just use the CompressionCodecType::NoCompression and it is same with older version.
try_new_with_compression want to open the compression, but I think I made a mistake.
I should use the CompressionType instead of CompressionCodecType and make sure the compression is enable
arrow/src/ipc/writer.rs
Outdated
| #[cfg(any(feature = "zstd,lz4", test))] | ||
| compression_codec | ||
| .compress(buffer.as_slice(), &mut _compression_buffer) | ||
| .unwrap(); |
There was a problem hiding this comment.
I think we can prevent using compression option in try_new_with_compression if these features are not included.
There was a problem hiding this comment.
@viirya comments like above reply
Maybe I can resolve this option or compile issue in the next issue or pull request.
In this pull request, we can focus on the protocal of IPC compression
| write_options, | ||
| schema: schema.clone(), | ||
| block_offsets: meta + data + 8, | ||
| block_offsets: meta + data + header_size, |
There was a problem hiding this comment.
try_new_with_options is used to write schema to the file or stream.
In the arrow format of IPC format, the layout is from the doc https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
First, we will write
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
the size of above part is 8bytes and is the length of header_size
alamb
left a comment
There was a problem hiding this comment.
Thanks @liukun4515 -- I think this is looking pretty good. The biggest thing I would like to sort out the feature flags and keeping the compression code a bit more isolated. Otherwise this is looking close
arrow/Cargo.toml
Outdated
|
|
||
| [features] | ||
| default = ["csv", "ipc", "test_utils"] | ||
| default = ["csv", "ipc", "test_utils", "zstd", "lz4"] |
There was a problem hiding this comment.
I agree -- I suggest something like
| default = ["csv", "ipc", "test_utils", "zstd", "lz4"] | |
| default = ["csv", "ipc", "test_utils"] |
| match self { | ||
| CompressionCodecType::Lz4Frame => { | ||
| let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); | ||
| encoder.write_all(input).unwrap(); |
There was a problem hiding this comment.
I suggest passing the errors back out of here directly (as compress returns a Result) rather than unwrap() which will panic on error
| } | ||
| } | ||
|
|
||
| #[cfg(any(feature = "zstd,lz4", test))] |
There was a problem hiding this comment.
I wonder if you can put this guard on the entire mod ipc_compression statement so that the entire module (including the test) is not compiled unless that feature is active
arrow/src/ipc/writer.rs
Outdated
| _ => { | ||
| if metadata_version != ipc::MetadataVersion::V5 { | ||
| return Err(ArrowError::InvalidArgumentError( | ||
| "Compress buffer just support from metadata v5".to_string(), |
There was a problem hiding this comment.
| "Compress buffer just support from metadata v5".to_string(), | |
| "Compression only supported in metadata v5 and above".to_string(), |
arrow/src/ipc/writer.rs
Outdated
| match batch_compression_type { | ||
| CompressionCodecType::NoCompression => {} | ||
| _ => { | ||
| if metadata_version != ipc::MetadataVersion::V5 { |
There was a problem hiding this comment.
should this check be < instead of != to cover future versions?
| if metadata_version != ipc::MetadataVersion::V5 { | |
| if metadata_version < ipc::MetadataVersion::V5 { |
| }) | ||
| } | ||
| } | ||
| z => panic!("Unsupported ipc::MetadataVersion {:?}", z), |
There was a problem hiding this comment.
Also given that this function returns a Result it seems like we could return a proper error here rather than panicing
| } | ||
|
|
||
| impl IpcWriteOptions { | ||
| pub fn try_new_with_compression( |
There was a problem hiding this comment.
I wonder if you could avoid the duplication here with more of a Builder style:
impl IpcWriteOptions {
pub fn with_compression(mut self, batch_compression_type: CompressionCodecType) -> Result<Self> {
.. // do checks here
self.batch_compresson_type = batch_compression_type;
Ok(self)
}
...
}Then one could use it like:
let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::v5)?
.with_compression(CompressionCodecType::LZ4)?;
...There was a problem hiding this comment.
The constructor may be refactor by follow up PR
arrow/src/ipc/writer.rs
Outdated
| (buffer.as_slice(), origin_buffer_len as i64) | ||
| } | ||
| CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { | ||
| if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { |
There was a problem hiding this comment.
It seems like this code should throw an "Not supported" error (or panic if it is encountered without support compiled in)
|
What is the status of this PR? Is it ready to go? Do we need to mess with the feature flags more? |
|
I think there are some comments are not addressed yet? |
| } | ||
| } | ||
| CompressionCodecType::NoCompression => Buffer::from(buf_data), | ||
| _ => { |
There was a problem hiding this comment.
How to handle this?
If the rust service compiled without the ipc_compression and receive a message with the ipc compression feature.
There was a problem hiding this comment.
I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to Result<Buffer> -- but since decompression can fail we probably need to make the change
There was a problem hiding this comment.
@alamb
I file a new issue to track this, and will submit a sub pr for this.
arrow/src/ipc/reader.rs
Outdated
| } | ||
| match compression_codec { | ||
| CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd | ||
| if cfg!(feature = "ipc_compression") || cfg!(test) => |
There was a problem hiding this comment.
need to check the compile options, if receive a message or ipc message with the compression feature.
| Ok(()) | ||
| } | ||
| CompressionCodecType::Zstd => { | ||
| let mut encoder = zstd::Encoder::new(output, 0).unwrap(); |
There was a problem hiding this comment.
fix, and pass the error out.
| Err(e) => Err(e.into()), | ||
| } | ||
| } | ||
| _ => Ok(input.len()), |
There was a problem hiding this comment.
Is this for CompressionCodecType::NoCompression? If so, do we need to copy form input to output?
There was a problem hiding this comment.
remove the NoCompression in the CompressionCodecType.
There was a problem hiding this comment.
just handle the LZ4 and ZSTD branch.
arrow/src/ipc/reader.rs
Outdated
| Buffer::from(data) | ||
| } else { | ||
| // decompress data using the codec | ||
| let mut _uncompressed_buffer = Vec::new(); |
There was a problem hiding this comment.
We know decompressed_length at this point. Should we allocate the vector with enough capacity?
There was a problem hiding this comment.
good point, we can init the vec with the capacity.
| /// -1: indicate that the data that follows is not compressed | ||
| /// 0: indicate that there is no data | ||
| /// positive number: indicate the uncompressed length for the following data | ||
| fn read_uncompressed_size(buffer: &[u8]) -> i64 { |
arrow/src/ipc/reader.rs
Outdated
| Some(compression) => match compression.codec() { | ||
| CompressionType::ZSTD => CompressionCodecType::Zstd, | ||
| CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, | ||
| _ => CompressionCodecType::NoCompression, |
There was a problem hiding this comment.
No, in current version, just support the LZ4 and zstd.
| /// uncompressed length may be set to -1 to indicate that the data that | ||
| /// follows is not compressed, which can be useful for cases where | ||
| /// compression does not yield appreciable savings. | ||
| fn read_buffer( |
There was a problem hiding this comment.
TODO:
we should change the output arg to Result<Buffer> and return error message if the buffer can't be read.
There was a problem hiding this comment.
do you still intend to make this change? Or is it planned for a subsequent PR?
arrow/src/ipc/writer.rs
Outdated
| } | ||
| CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { | ||
| if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { | ||
| (buffer, 0) |
There was a problem hiding this comment.
| (buffer, 0) | |
| (buffer, LENGTH_EMPTY_COMPRESSED_DATA) |
| // padding and make offset 8 bytes aligned | ||
| let pad_len = pad_to_8(len as u32) as i64; |
There was a problem hiding this comment.
Is it padding to data len? Or total len?
There was a problem hiding this comment.
In each buffer, we have two struct, one is the metadata which store offset and actual len of data, the other is the data.
The actual len is the total_len.
The pad_len is just used to align the buffer.
| lz4 = { version = "1.23", default-features = false } | ||
| zstd = { version = "0.11", default-features = false } |
There was a problem hiding this comment.
I don't think these need to be in dev dependencies do they? If they are already in the dependencies of the crate?
There was a problem hiding this comment.
we can’t remove the lz4 and zstd from dev-dependency.
The ipc_compression is not in the default feature, we can‘t run cargo test with the lz4 and zstd lib.
But we need to run the ipc compiression UT in CI
There was a problem hiding this comment.
I try to remove these from the dev-dependency, can run cargo test.
I got the compile error.
error[E0433]: failed to resolve: use of undeclared crate or module `lz4`
--> arrow/src/ipc/compression/ipc_compression.rs:57:39
|
57 | let mut encoder = lz4::EncoderBuilder::new().build(output)?;
| ^^^ use of undeclared crate or module `lz4`
error[E0433]: failed to resolve: use of undeclared crate or module `zstd`
--> arrow/src/ipc/compression/ipc_compression.rs:65:39
|
65 | let mut encoder = zstd::Encoder::new(output, 0)?;
| ^^^^ use of undeclared crate or module `zstd`
|
help: there is a crate or module with a similar name
|
65 | let mut encoder = std::Encoder::new(output, 0)?;
| ~~~
| /// uncompressed length may be set to -1 to indicate that the data that | ||
| /// follows is not compressed, which can be useful for cases where | ||
| /// compression does not yield appreciable savings. | ||
| fn read_buffer( |
There was a problem hiding this comment.
do you still intend to make this change? Or is it planned for a subsequent PR?
| } | ||
| } | ||
| CompressionCodecType::NoCompression => Buffer::from(buf_data), | ||
| _ => { |
There was a problem hiding this comment.
I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to Result<Buffer> -- but since decompression can fail we probably need to make the change
| // decompress data using the codec | ||
| let mut _uncompressed_buffer = | ||
| Vec::with_capacity(decompressed_length as usize); | ||
| let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; |
There was a problem hiding this comment.
Something still isn't quite right with this code -- instead of gating the code on the test feature, I think the more typical pattern is to gate the entire test on the ipc_compression feature
So something like
#[cfg(ipc_compression)]
#[test]
fn read_generated_streams_200() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "2.0.0-compression";
...
}There was a problem hiding this comment.
#[cfg(any(feature = "ipc_compression", test))] this is not the feature of test, just feature ipc_compression or test.
It only can be compiled with the compile flag feature="ipc_compression" or test.
There are some usage like in parquet.
arrow-rs/parquet/src/compression.rs
Line 76 in 30c94db
|
@liukun4515 -- perhaps I can find some time to try and help with this PR. I will try to do so tomorrow |
Thank you, you can also ping me through the slack. |
Thanks @liukun4515 -- I think I have gotten to the point where I can't offer any more specific suggestions on structure without trying it myself. I hope to try and rearrange the feature-flags and make a proposed PR to your branch. I won't have a chance to do it until tomorrow however. |
|
@liukun4515 -- I made good progress on sorting out the feature flags. A draft PR is here liukun4515#1 in case you want to see the direction I am headed. I sadly ran out of time today, but I will try and finish it up tomorrow. |
thanks for your help!!! |
Thank you -- I hope to push up an update shortly. It turned into a larger change than I was expecting |
|
Here is my proposal (the PR to liukun4515#1 is now rendered terribly): #2369 |
|
Thanks for @alamb cooperation. |
Which issue does this PR close?
Closes #1709
Closes #70
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?