-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Labels
Description
Describe the bug
REE arrays whose values are dictionaries can't be successfully encoded and then decoded when using streaming IPC.
To Reproduce
#[test]
fn test_read_ree_dict_record_batches_from_buffer() {
let schema = Schema::new(vec![
Field::new(
"test1",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)),
Arc::new(Field::new_dict(
"values".to_string(),
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
0,
false,
)),
),
true,
),
]);
let batch = RecordBatch::try_new(
schema.clone().into(),
vec![
Arc::new(
RunArray::try_new(
&Int32Array::from(vec![1, 2, 3]),
&vec![Some("a"), None, Some("a")]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
)
.expect("Failed to create RunArray"),
),
],
)
.expect("Failed to create RecordBatch");
let mut buffer = vec![];
{
let mut writer = StreamWriter::try_new_with_options(
&mut buffer,
&schema,
IpcWriteOptions::default().with_preserve_dict_id(false),
)
.expect("Failed to create StreamWriter");
writer.write(&batch).expect("Failed to write RecordBatch");
writer.finish().expect("Failed to finish StreamWriter");
}
let mut decoder = StreamDecoder::new();
let buf = &mut Buffer::from(buffer.as_slice());
while let Some(_) = decoder
.decode(buf)
.map_err(|e| {
ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into())
})
.expect("Failed to decode record batch")
{
}
decoder.finish().expect("Failed to finish decoder");
}
Expected behavior
The above test works.
Additional context
n/a
Reactions are currently unavailable