fix: ipc decode panic with invalid data#8931
Conversation
|
Do you have an example test that shows this failing? Is there a way to exercise this unwrap with the public API? |
arrow-ipc/src/reader.rs
Outdated
| fn next_buffer(&mut self) -> Result<Buffer, ArrowError> { | ||
| read_buffer(self.buffers.next().unwrap(), self.data, self.compression) | ||
| read_buffer( | ||
| self.buffers.next().ok_or_else(|| { |
There was a problem hiding this comment.
I agree with @tustvold we should have a test if possible
Also, can we make the error more specific? It seems like the error is that there are fewer buffers than expected
There was a problem hiding this comment.
It will happen if metadata claims more buffers than it supplied.
For example, in our case, a user constructed a primitive integer array without validity mask(for some unknown reason). Therefore only one buffer is encoded while RecordBatchDecoder requires two at here, which will panic during the second invocation of next_buffer.
arrow-rs/arrow-ipc/src/reader.rs
Line 240 in 1c90efe
It's not convenient to construct such kind of corrupted input for testing, do you have any suggestion?
There was a problem hiding this comment.
@leiysky You can make use of #8931 (comment).
But it is a manual constructed test vector so I'd expect use it as a snapshot. If the internal code changes, the snapshot may be outdated and need update then.
I can't find other way to "reliably construct a dedicated invalid data".
|
Here is a test that can verify the issue: diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 987188548..078b96297 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -569,7 +569,10 @@ impl<'a> RecordBatchDecoder<'a> {
}
fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
- read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
+ let next_buffer = self.buffers.next().ok_or_else(|| {
+ ArrowError::IpcError("Fewer buffers than expected in IPC RecordBatch".to_string())
+ })?;
+ read_buffer(next_buffer, self.data, self.compression)
}
fn skip_buffer(&mut self) {
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 4a849c116..1224a2c90 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -539,6 +539,7 @@ impl IpcDataGenerator {
arrow_data.extend_from_slice(&PADDING[..pad_len]);
// write data
+ buffers.pop();
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
diff --git a/arrow-ipc/tests/test_invalid_data.rs b/arrow-ipc/tests/test_invalid_data.rs
index e69de29bb..cceb4b278 100644
--- a/arrow-ipc/tests/test_invalid_data.rs
+++ b/arrow-ipc/tests/test_invalid_data.rs
@@ -0,0 +1,21 @@
+use arrow_array::record_batch;
+use arrow_ipc::reader::StreamReader;
+use arrow_ipc::writer::StreamWriter;
+
+#[test]
+fn test_invalid_data() {
+ let mut stream = vec![];
+
+ let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+ let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+ dbg!(&stream);
+
+ let stream = stream.as_slice();
+ let reader = StreamReader::try_new(stream, None).unwrap();
+
+ for batch in reader {
+ batch.unwrap();
+ }The test vector is: And thus the final test: use arrow_ipc::reader::StreamReader;
#[test]
fn test_invalid_data() {
let stream = [
255, 255, 255, 255, 120, 0, 0, 0, 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 10, 0, 9, 0, 4, 0, 10,
0, 0, 0, 16, 0, 0, 0, 0, 1, 4, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0,
0, 20, 0, 0, 0, 16, 0, 20, 0, 16, 0, 14, 0, 15, 0, 4, 0, 0, 0, 8, 0, 16, 0, 0, 0, 24, 0, 0,
0, 32, 0, 0, 0, 0, 0, 1, 2, 28, 0, 0, 0, 8, 0, 12, 0, 4, 0, 11, 0, 8, 0, 0, 0, 32, 0, 0, 0,
0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 97, 0, 0, 0, 255, 255, 255, 255, 120, 0, 0, 0, 16, 0,
0, 0, 12, 0, 26, 0, 24, 0, 23, 0, 4, 0, 8, 0, 12, 0, 0, 0, 32, 0, 0, 0, 128, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 0, 10, 0, 24, 0, 12, 0, 8, 0, 4, 0, 10, 0, 0, 0, 44, 0, 0,
0, 16, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0,
]
.as_slice();
let mut reader = StreamReader::try_new(stream, None).unwrap();
let batch = reader.next().unwrap();
let err = batch.unwrap_err().to_string();
assert!(
err.contains("Fewer buffers than expected in IPC RecordBatch"),
"unexpected error message: {err}",
);
} |
|
This is a valid issue we encounter in the real world while user self-constructed arrow ipc data are invalid (due to their software bug). Since Arrow IPC data can come from any source, we can't assume that it's always valid/constructed with this library. |
| #[test] | ||
| fn test_missing_buffer_metadata_error() { | ||
| use crate::r#gen::Message::*; | ||
| use flatbuffers::FlatBufferBuilder; | ||
|
|
||
| let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)])); | ||
|
|
||
| // create RecordBatch buffer metadata with invalid buffer count | ||
| // Int32Array needs 2 buffers (validity + data) but we provide only 1 | ||
| let mut fbb = FlatBufferBuilder::new(); | ||
| let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]); | ||
| let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]); | ||
| let batch_offset = RecordBatch::create( | ||
| &mut fbb, | ||
| &RecordBatchArgs { | ||
| length: 2, | ||
| nodes: Some(nodes), | ||
| buffers: Some(buffers), | ||
| compression: None, | ||
| variadicBufferCounts: None, | ||
| }, | ||
| ); | ||
| fbb.finish_minimal(batch_offset); | ||
| let batch_bytes = fbb.finished_data().to_vec(); | ||
| let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap(); | ||
|
|
||
| let data_buffer = Buffer::from(vec![0u8; 8]); | ||
| let dictionaries: HashMap<i64, ArrayRef> = HashMap::new(); | ||
| let metadata = MetadataVersion::V5; | ||
|
|
||
| let decoder = RecordBatchDecoder::try_new( | ||
| &data_buffer, | ||
| batch, | ||
| schema.clone(), | ||
| &dictionaries, | ||
| &metadata, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let result = decoder.read_record_batch(); | ||
|
|
||
| match result { | ||
| Err(ArrowError::IpcError(msg)) => { | ||
| assert_eq!(msg, "Buffer count mismatched with metadata"); | ||
| } | ||
| other => panic!("unexpected error: {other:?}"), | ||
| } | ||
| } | ||
|
|
Will panic if input buffer is invalid.
CLOUDFLARE ALERT!!!