Skip to content

Commit 22088ea

Browse files
authored
feat(sql): support array column type in parquet partitions (#5925)
1 parent 5935cce commit 22088ea

62 files changed

Lines changed: 3201 additions & 595 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

compat/src/test/java/io/questdb/compat/ParquetTest.java

Lines changed: 400 additions & 94 deletions
Large diffs are not rendered by default.

core/rust/qdb-core/src/col_driver/array.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,16 @@ const OFFSET_MAX: u64 = (1u64 << 48) - 1;
5353

5454
#[repr(transparent)]
5555
#[derive(Clone, Copy)]
56-
struct ArrayAuxEntry {
56+
pub struct ArrayAuxEntry {
5757
packed: u128,
5858
}
5959

6060
impl ArrayAuxEntry {
61-
fn size(&self) -> u32 {
61+
pub fn size(&self) -> u32 {
6262
(self.packed >> 64) as u32
6363
}
6464

65-
fn offset(&self) -> u64 {
65+
pub fn offset(&self) -> u64 {
6666
(self.packed as u64) & OFFSET_MAX
6767
}
6868
}

core/rust/qdb-core/src/col_type.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use serde::{Deserialize, Deserializer, Serialize};
2626
use std::fmt::{Debug, Display, Formatter};
2727
use std::num::NonZeroI32;
2828

29+
// Don't forget to update VALUES when modifying this list.
2930
#[repr(u8)]
3031
#[derive(Debug, Copy, Clone, PartialEq)]
3132
pub enum ColumnTypeTag {
@@ -55,6 +56,33 @@ pub enum ColumnTypeTag {
5556
}
5657

5758
impl ColumnTypeTag {
59+
#[cfg(test)]
60+
const VALUES: [Self; 23] = [
61+
Self::Boolean,
62+
Self::Byte,
63+
Self::Short,
64+
Self::Char,
65+
Self::Int,
66+
Self::Long,
67+
Self::Date,
68+
Self::Timestamp,
69+
Self::Float,
70+
Self::Double,
71+
Self::String,
72+
Self::Symbol,
73+
Self::Long256,
74+
Self::GeoByte,
75+
Self::GeoShort,
76+
Self::GeoInt,
77+
Self::GeoLong,
78+
Self::Binary,
79+
Self::Uuid,
80+
Self::Long128,
81+
Self::IPv4,
82+
Self::Varchar,
83+
Self::Array,
84+
];
85+
5886
/// If true, the column is encoded with both data and aux vectors.
5987
pub const fn is_var_size(self) -> bool {
6088
self.fixed_size().is_none()
@@ -172,6 +200,11 @@ fn tag_of(col_type: i32) -> u8 {
172200
}
173201

174202
const TYPE_FLAG_DESIGNATED_TIMESTAMP: i32 = 1i32 << 17;
203+
const ARRAY_ELEMTYPE_FIELD_MASK: i32 = 0x3F;
204+
const ARRAY_ELEMTYPE_FIELD_POS: i32 = 8;
205+
const ARRAY_NDIMS_LIMIT: i32 = 32; // inclusive
206+
const ARRAY_NDIMS_FIELD_MASK: i32 = ARRAY_NDIMS_LIMIT - 1;
207+
const ARRAY_NDIMS_FIELD_POS: i32 = 14;
175208

176209
#[repr(transparent)]
177210
#[derive(Copy, Clone, PartialEq, Serialize, Ord, PartialOrd, Eq)]
@@ -217,6 +250,31 @@ impl ColumnType {
217250
.try_into()
218251
.expect("invalid column type tag, should already be validated")
219252
}
253+
254+
pub fn array_dimensionality(&self) -> CoreResult<i32> {
255+
if self.tag() != ColumnTypeTag::Array {
256+
return Err(fmt_err!(
257+
InvalidType,
258+
"invalid column type {}, only array columns have dimensionality",
259+
self
260+
));
261+
}
262+
let dim = ((self.code() >> ARRAY_NDIMS_FIELD_POS) & ARRAY_NDIMS_FIELD_MASK) + 1;
263+
Ok(dim)
264+
}
265+
266+
pub fn array_element_type(&self) -> CoreResult<ColumnTypeTag> {
267+
if self.tag() != ColumnTypeTag::Array {
268+
return Err(fmt_err!(
269+
InvalidType,
270+
"invalid column type {}, only array columns have element type",
271+
self
272+
));
273+
}
274+
let tag = (self.code() >> ARRAY_ELEMTYPE_FIELD_POS) & ARRAY_ELEMTYPE_FIELD_MASK;
275+
let tag = ColumnTypeTag::try_from(tag as u8)?;
276+
Ok(tag)
277+
}
220278
}
221279

222280
impl Display for ColumnType {
@@ -262,6 +320,23 @@ impl<'de> Deserialize<'de> for ColumnType {
262320
}
263321
}
264322

323+
pub fn encode_array_type(elem_type: ColumnTypeTag, dim: i32) -> CoreResult<ColumnType> {
324+
if !(1..=ARRAY_NDIMS_LIMIT).contains(&dim) {
325+
return Err(fmt_err!(InvalidType, "invalid array dimensionality {dim}",));
326+
}
327+
if elem_type != ColumnTypeTag::Double {
328+
return Err(fmt_err!(
329+
InvalidType,
330+
"unsupported array element type {}",
331+
elem_type.name()
332+
));
333+
}
334+
let extra = ((dim - 1) & ARRAY_NDIMS_FIELD_MASK)
335+
<< (ARRAY_NDIMS_FIELD_POS - ARRAY_ELEMTYPE_FIELD_POS)
336+
| ((elem_type as i32) & ARRAY_ELEMTYPE_FIELD_MASK);
337+
Ok(ColumnType::new(ColumnTypeTag::Array, extra))
338+
}
339+
265340
#[cfg(test)]
266341
mod tests {
267342
use super::*;
@@ -305,6 +380,7 @@ mod tests {
305380
assert!(ColumnTypeTag::Binary.is_var_size());
306381
assert!(ColumnTypeTag::String.is_var_size());
307382
assert!(ColumnTypeTag::Varchar.is_var_size());
383+
assert!(ColumnTypeTag::Array.is_var_size());
308384
}
309385

310386
#[test]
@@ -318,5 +394,51 @@ mod tests {
318394
assert_eq!(ColumnTypeTag::Binary.fixed_size(), None);
319395
assert_eq!(ColumnTypeTag::String.fixed_size(), None);
320396
assert_eq!(ColumnTypeTag::Varchar.fixed_size(), None);
397+
assert_eq!(ColumnTypeTag::Array.fixed_size(), None);
398+
}
399+
400+
#[test]
401+
fn test_array_dimensionality() {
402+
for tag in ColumnTypeTag::VALUES {
403+
if tag != ColumnTypeTag::Array {
404+
assert!(ColumnType::new(tag, 0).array_dimensionality().is_err());
405+
}
406+
}
407+
408+
let typ = encode_array_type(ColumnTypeTag::Double, 3);
409+
assert!(typ.is_ok());
410+
let dim = typ.unwrap().array_dimensionality();
411+
assert!(dim.is_ok());
412+
assert_eq!(dim.unwrap(), 3);
413+
}
414+
415+
#[test]
416+
fn test_array_element_type() {
417+
for tag in ColumnTypeTag::VALUES {
418+
if tag != ColumnTypeTag::Array {
419+
assert!(ColumnType::new(tag, 0).array_element_type().is_err());
420+
}
421+
}
422+
423+
let typ = encode_array_type(ColumnTypeTag::Double, 3);
424+
assert!(typ.is_ok());
425+
let dim = typ.unwrap().array_element_type();
426+
assert!(dim.is_ok());
427+
assert_eq!(dim.unwrap(), ColumnTypeTag::Double);
428+
}
429+
430+
#[test]
431+
fn test_encode_array_type() {
432+
let typ = encode_array_type(ColumnTypeTag::Double, 11);
433+
assert!(typ.is_ok());
434+
let typ = typ.unwrap();
435+
436+
let elem_typ = typ.array_element_type();
437+
assert!(elem_typ.is_ok());
438+
assert_eq!(elem_typ.unwrap(), ColumnTypeTag::Double);
439+
440+
let dim = typ.array_dimensionality();
441+
assert!(dim.is_ok());
442+
assert_eq!(dim.unwrap(), 11);
321443
}
322444
}

core/rust/qdbr/parquet2/src/deserialize/native.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::utils;
1212
pub type Casted<'a, T> = std::iter::Map<std::slice::ChunksExact<'a, u8>, fn(&'a [u8]) -> T>;
1313

1414
/// Views the values of the data page as [`Casted`] to [`NativeType`].
15-
pub fn native_cast<T: NativeType>(page: &DataPage) -> Result<Casted<T>, Error> {
15+
pub fn native_cast<T: NativeType>(page: &DataPage) -> Result<Casted<'_, T>, Error> {
1616
let (_, _, values) = split_buffer(page)?;
1717
if values.len() % std::mem::size_of::<T>() != 0 {
1818
return Err(Error::oos(

core/rust/qdbr/parquet2/src/deserialize/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010

1111
use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter};
1212

13-
pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder, Error> {
13+
pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder<'_>, Error> {
1414
let (_, _, indices_buffer) = split_buffer(page)?;
1515

1616
// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),

core/rust/qdbr/parquet2/src/encoding/hybrid_rle/bitmap.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@ impl<'a> Iterator for BitmapIter<'a> {
7171
pub fn encode_bool<W: Write, I: Iterator<Item = bool>>(
7272
writer: &mut W,
7373
mut iterator: I,
74+
length: usize,
7475
) -> std::io::Result<()> {
75-
// the length of the iterator.
76-
let length = iterator.size_hint().1.unwrap();
77-
7876
let chunks = length / 8;
7977
let reminder = length % 8;
8078

core/rust/qdbr/parquet2/src/encoding/hybrid_rle/encoder.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,19 @@ use super::bitpacked_encode;
99
pub fn encode_u32<W: Write, I: Iterator<Item = u32>>(
1010
writer: &mut W,
1111
iterator: I,
12+
length: usize,
1213
num_bits: u32,
1314
) -> std::io::Result<()> {
1415
let num_bits = num_bits as u8;
15-
encode_header(writer, &iterator)?;
16-
bitpacked_encode_u32(writer, iterator, num_bits as usize)?;
16+
encode_header(writer, length)?;
17+
bitpacked_encode_u32(writer, iterator, length, num_bits as usize)?;
1718
Ok(())
1819
}
1920

20-
fn encode_header<W: Write, T, I: Iterator<Item = T>>(
21+
fn encode_header<W: Write>(
2122
writer: &mut W,
22-
iterator: &I,
23+
length: usize,
2324
) -> std::io::Result<()> {
24-
// the length of the iterator.
25-
let length = iterator.size_hint().1.unwrap();
26-
2725
// write the length + indicator
2826
let mut header = ceil8(length) as u64;
2927
header <<= 1;
@@ -39,11 +37,9 @@ const U32_BLOCK_LEN: usize = 32;
3937
fn bitpacked_encode_u32<W: Write, I: Iterator<Item = u32>>(
4038
writer: &mut W,
4139
mut iterator: I,
40+
length: usize,
4241
num_bits: usize,
4342
) -> std::io::Result<()> {
44-
// the length of the iterator.
45-
let length = iterator.size_hint().1.unwrap();
46-
4743
let chunks = length / U32_BLOCK_LEN;
4844
let remainder = length - chunks * U32_BLOCK_LEN;
4945
let mut buffer = [0u32; U32_BLOCK_LEN];
@@ -81,11 +77,11 @@ fn bitpacked_encode_u32<W: Write, I: Iterator<Item = u32>>(
8177
pub fn encode_bool<W: Write, I: Iterator<Item = bool>>(
8278
writer: &mut W,
8379
iterator: I,
80+
length: usize,
8481
) -> std::io::Result<()> {
85-
encode_header(writer, &iterator)?;
86-
82+
encode_header(writer, length)?;
8783
// encode the iterator
88-
bitpacked_encode(writer, iterator)
84+
bitpacked_encode(writer, iterator, length)
8985
}
9086

9187
#[cfg(test)]
@@ -99,7 +95,8 @@ mod tests {
9995

10096
let mut vec = vec![];
10197

102-
encode_bool(&mut vec, iter)?;
98+
let len = iter.size_hint().1.unwrap();
99+
encode_bool(&mut vec, iter, len)?;
103100

104101
assert_eq!(vec, vec![(2 << 1 | 1), 0b10011101u8, 0b00011101]);
105102

@@ -110,9 +107,12 @@ mod tests {
110107
fn bool_from_iter() -> std::io::Result<()> {
111108
let mut vec = vec![];
112109

110+
let values = vec![true, true, true, true, true, true, true, true];
111+
let iter = values.into_iter();
113112
encode_bool(
114113
&mut vec,
115-
vec![true, true, true, true, true, true, true, true].into_iter(),
114+
iter,
115+
values.len(),
116116
)?;
117117

118118
assert_eq!(vec, vec![(1 << 1 | 1), 0b11111111]);
@@ -123,7 +123,8 @@ mod tests {
123123
fn test_encode_u32() -> std::io::Result<()> {
124124
let mut vec = vec![];
125125

126-
encode_u32(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?;
126+
let values = vec![0, 1, 2, 1, 2, 1, 1, 0, 3];
127+
encode_u32(&mut vec, values.into_iter(), values.len(), 2)?;
127128

128129
assert_eq!(
129130
vec,
@@ -136,9 +137,9 @@ mod tests {
136137
fn test_encode_u32_large() -> std::io::Result<()> {
137138
let mut vec = vec![];
138139

139-
let values = (0..128).map(|x| x % 4);
140+
let values: Vec<_> = (0..128).map(|x| x % 4).collect();
140141

141-
encode_u32(&mut vec, values, 2)?;
142+
encode_u32(&mut vec, values.into_iter(), values.len(), 2)?;
142143

143144
let length = 128;
144145
let expected = 0b11_10_01_00u8;
@@ -152,10 +153,10 @@ mod tests {
152153

153154
#[test]
154155
fn test_u32_other() -> std::io::Result<()> {
155-
let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter();
156+
let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3];
156157

157158
let mut vec = vec![];
158-
encode_u32(&mut vec, values, 2)?;
159+
encode_u32(&mut vec, values.into_iter(), values.len(), 2)?;
159160

160161
let expected = vec![5, 207, 254, 247, 51];
161162
assert_eq!(expected, vec);

core/rust/qdbr/parquet2/tests/it/write/binary.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ fn unzip_option(array: &[Option<Vec<u8>>]) -> Result<(Vec<u8>, Vec<u8>)> {
2626
false
2727
}
2828
});
29-
encode_bool(&mut validity, iter)?;
29+
let len = iter.size_hint().1.unwrap();
30+
encode_bool(&mut validity, iter, len)?;
3031

3132
// write the length, now that it is known
3233
let mut validity = validity.into_inner();

core/rust/qdbr/parquet2/tests/it/write/primitive.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use parquet2::{
99
};
1010

1111
fn unzip_option<T: NativeType>(array: &[Option<T>]) -> Result<(Vec<u8>, Vec<u8>)> {
12-
// leave the first 4 bytes anouncing the length of the def level
12+
// leave the first 4 bytes announcing the length of the def level
1313
// this will be overwritten at the end, once the length is known.
1414
// This is unknown at this point because of the uleb128 encoding,
1515
// whose length is variable.
@@ -25,7 +25,8 @@ fn unzip_option<T: NativeType>(array: &[Option<T>]) -> Result<(Vec<u8>, Vec<u8>)
2525
false
2626
}
2727
});
28-
encode_bool(&mut validity, iter)?;
28+
let length = array.len();
29+
encode_bool(&mut validity, iter, length)?;
2930

3031
// write the length, now that it is known
3132
let mut validity = validity.into_inner();

core/rust/qdbr/rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
[toolchain]
2-
channel = "nightly-2025-01-07"
2+
channel = "nightly-2025-02-07"

0 commit comments

Comments
 (0)